This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e1f1386395 [fix](cooldown) Rewrite update cooldown conf (#16488)
e1f1386395 is described below
commit e1f13863950237d96e8488dd42fa288cc6df94b7
Author: plat1ko <[email protected]>
AuthorDate: Thu Feb 9 09:12:55 2023 +0800
[fix](cooldown) Rewrite update cooldown conf (#16488)
Remove error-prone CooldownJob, and use CooldownConfHandler to update
Tablet's cooldown conf.
Some bug fix about cooldown.
---
be/src/agent/task_worker_pool.cpp | 11 +-
be/src/olap/olap_server.cpp | 2 +-
be/src/olap/tablet.cpp | 16 +-
be/src/olap/tablet.h | 7 +-
.../main/java/org/apache/doris/common/Config.java | 14 +-
.../doris/alter/MaterializedViewHandler.java | 2 +-
.../java/org/apache/doris/alter/RollupJobV2.java | 2 +-
.../apache/doris/alter/SchemaChangeHandler.java | 2 +-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 2 +-
.../java/org/apache/doris/backup/RestoreJob.java | 9 +-
.../apache/doris/catalog/CatalogRecycleBin.java | 6 +-
.../main/java/org/apache/doris/catalog/Env.java | 30 +-
.../main/java/org/apache/doris/catalog/Tablet.java | 30 +-
.../apache/doris/catalog/TabletInvertedIndex.java | 103 ++++--
.../java/org/apache/doris/catalog/TabletMeta.java | 60 +--
.../org/apache/doris/cooldown/CooldownConf.java | 76 +---
.../apache/doris/cooldown/CooldownConfHandler.java | 138 +++++++
.../apache/doris/cooldown/CooldownConfList.java | 53 +++
.../apache/doris/cooldown/CooldownException.java | 32 --
.../org/apache/doris/cooldown/CooldownHandler.java | 192 ----------
.../org/apache/doris/cooldown/CooldownJob.java | 401 ---------------------
.../apache/doris/datasource/InternalCatalog.java | 12 +-
.../org/apache/doris/journal/JournalEntity.java | 6 +-
.../org/apache/doris/master/ReportHandler.java | 31 +-
.../java/org/apache/doris/persist/EditLog.java | 15 +-
.../org/apache/doris/persist/OperationType.java | 4 +-
.../doris/persist/meta/MetaPersistMethod.java | 6 -
.../doris/persist/meta/PersistMetaModules.java | 2 +-
.../org/apache/doris/backup/CatalogMocker.java | 10 +-
.../org/apache/doris/catalog/CatalogTestUtil.java | 4 +-
.../java/org/apache/doris/catalog/TabletTest.java | 2 +-
.../doris/clone/ClusterLoadStatisticsTest.java | 6 +-
.../org/apache/doris/clone/RebalancerTestUtil.java | 2 +-
.../org/apache/doris/common/util/UnitTestUtil.java | 2 +-
.../org/apache/doris/cooldown/CooldownJobTest.java | 130 -------
.../org/apache/doris/http/DorisHttpTestCase.java | 2 +-
.../org/apache/doris/load/DeleteHandlerTest.java | 2 +-
gensrc/thrift/MasterService.thrift | 4 +-
.../cold_heat_separation/policy/alter.groovy | 2 -
.../cold_heat_separation/policy/create.groovy | 2 -
.../suites/cold_heat_separation/policy/drop.groovy | 2 -
.../suites/cold_heat_separation/policy/show.groovy | 2 -
.../use_policy/alter_table_add_policy.groovy | 2 -
.../create_table_use_partition_policy.groovy | 2 -
.../use_policy/create_table_use_policy.groovy | 2 -
.../use_policy/modify_partition_add_policy.groovy | 2 -
.../use_policy/use_default_storage_policy.groovy | 2 -
47 files changed, 401 insertions(+), 1045 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index a61ea17ba9..2c6b757c0f 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -268,7 +268,8 @@ void TaskWorkerPool::notify_thread() {
}
bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type,
int64_t signature) {
- if (task_type == TTaskType::type::PUSH_STORAGE_POLICY) {
+ if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
+ task_type == TTaskType::type::PUSH_COOLDOWN_CONF) {
// no need to report task of these types
return true;
}
@@ -1716,14 +1717,6 @@ void
TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
agent_task_req = _tasks.front();
_tasks.pop_front();
}
- // FIXME(plat1ko): no need to save cooldown conf job state in FE
- TFinishTaskRequest finish_task_request;
- finish_task_request.__set_backend(_backend);
- finish_task_request.__set_task_type(agent_task_req.task_type);
- finish_task_request.__set_signature(agent_task_req.signature);
- finish_task_request.__set_task_status(Status::OK().to_thrift());
- _finish_task(finish_task_request);
- _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
TPushCooldownConfReq& push_cooldown_conf_req =
agent_task_req.push_cooldown_conf;
for (auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 98dca4ea1c..9764680ff9 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -729,7 +729,7 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
Status st = tablet->cooldown();
if (!st.ok()) {
LOG(WARNING) << "failed to cooldown, tablet: " <<
tablet->tablet_id()
- << " err: " << st.to_string();
+ << " err: " << st;
} else {
LOG(INFO) << "succeed to cooldown, tablet: " <<
tablet->tablet_id()
<< " cooldown progress ("
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 009513748f..92b203b339 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1392,9 +1392,9 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info,
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
tablet_info->__set_replica_id(replica_id());
tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size());
- tablet_info->__set_is_cooldown(_tablet_meta->storage_policy_id() > 0);
- if (tablet_info->is_cooldown) {
+ if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id()
> 0) {
tablet_info->__set_cooldown_replica_id(_cooldown_replica_id);
+ tablet_info->__set_cooldown_term(_cooldown_term);
}
}
@@ -1645,7 +1645,7 @@ Status Tablet::cooldown() {
}
int64_t cooldown_replica_id = _cooldown_replica_id;
if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
- return Status::OK();
+ return Status::InternalError("invalid cooldown_replica_id");
}
auto storage_policy = get_storage_policy(storage_policy_id());
if (storage_policy == nullptr) {
@@ -1768,6 +1768,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem*
fs, RowsetMeta* new_rs
}
Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t
cooldown_replica_id) {
+ LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
+ << " cooldown_replica_id=" << cooldown_replica_id;
TabletMetaPB cooldown_meta_pb;
RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id,
&cooldown_meta_pb));
DCHECK(cooldown_meta_pb.rs_metas_size() > 0);
@@ -1788,12 +1790,13 @@ Status
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
}
}
if (!version_aligned) {
- LOG(INFO) << "cooldowned version is not aligned";
- return Status::OK();
+ return Status::InternalError("cooldowned version is not aligned");
}
for (auto& [v, rs] : _rs_version_map) {
if (v.second <= cooldowned_version) {
overlap_rowsets.push_back(rs);
+ } else if (!rs->is_local()) {
+ return Status::InternalError("cooldowned version larger than that
to follow");
}
}
std::sort(overlap_rowsets.begin(), overlap_rowsets.end(),
Rowset::comparator);
@@ -1854,6 +1857,9 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() {
}
}
}
+ if (!rowset) {
+ return nullptr;
+ }
if (min_local_version != cooldowned_version + 1) { // ensure version
continuity
if (UNLIKELY(cooldowned_version != -1)) {
LOG(WARNING) << "version not continuous. tablet_id=" << tablet_id()
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 934dd04298..c56808e8b5 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -305,9 +305,10 @@ public:
void update_cooldown_conf(int64_t cooldown_term, int64_t
cooldown_replica_id) {
if (cooldown_term > _cooldown_term) {
- LOG(INFO) << "update cooldown conf. cooldown_replica_id: " <<
_cooldown_replica_id
- << " -> " << cooldown_replica_id << ", cooldown_term: "
<< _cooldown_term
- << " -> " << cooldown_term;
+ LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
+ << " cooldown_replica_id: " << _cooldown_replica_id << "
-> "
+ << cooldown_replica_id << ", cooldown_term: " <<
_cooldown_term << " -> "
+ << cooldown_term;
_cooldown_replica_id = cooldown_replica_id;
_cooldown_term = cooldown_term;
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6edbc5a160..ef325d357c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -802,13 +802,6 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static int alter_table_timeout_second = 86400 * 30; // 1month
- /**
- * Maximal timeout of push cooldown conf request.
- */
- @ConfField(mutable = false, masterOnly = true)
- public static boolean cooldown_single_remote_file = false;
- @ConfField(mutable = false, masterOnly = true)
- public static int push_cooldown_conf_timeout_second = 600; // 10 min
/**
* If a backend is down for *max_backend_down_time_second*, a BACKEND_DOWN
event will be triggered.
* Do not set this if you know what you are doing.
@@ -1932,11 +1925,10 @@ public class Config extends ConfigBase {
public static int max_same_name_catalog_trash_num = 3;
/**
- * The storage policy is still under developement.
- * Disable it by default.
+ * NOTE: The storage policy is still under developement.
*/
- @ConfField(mutable = true, masterOnly = true)
- public static boolean enable_storage_policy = false;
+ @ConfField(mutable = false, masterOnly = true)
+ public static boolean enable_storage_policy = true;
/**
* This config is mainly used in the k8s cluster environment.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index ef760bd0a6..851e4c7016 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -379,7 +379,7 @@ public class MaterializedViewHandler extends AlterHandler {
short replicationNum =
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
for (Tablet baseTablet : baseIndex.getTablets()) {
TabletMeta mvTabletMeta = new TabletMeta(
- dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium, -1, 0);
+ dbId, tableId, partitionId, mvIndexId, mvSchemaHash,
medium);
long baseTabletId = baseTablet.getId();
long mvTabletId = idGeneratorBuffer.getNextId();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index d83c9a6f70..b47052c601 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -658,7 +658,7 @@ public class RollupJobV2 extends AlterJobV2 implements
GsonPostProcessable {
for (Tablet rollupTablet : rollupIndex.getTablets()) {
TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId,
partitionId, rollupIndexId,
- rollupSchemaHash, medium, -1, 0);
+ rollupSchemaHash, medium);
invertedIndex.addTablet(rollupTablet.getId(),
rollupTabletMeta);
for (Replica rollupReplica : rollupTablet.getReplicas()) {
invertedIndex.addReplica(rollupTablet.getId(),
rollupReplica);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index a330923582..29ff80ec79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1493,7 +1493,7 @@ public class SchemaChangeHandler extends AlterHandler {
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
for (Tablet originTablet : originIndex.getTablets()) {
TabletMeta shadowTabletMeta = new TabletMeta(dbId,
tableId, partitionId, shadowIndexId,
- newSchemaHash, medium, -1, 0);
+ newSchemaHash, medium);
long originTabletId = originTablet.getId();
long shadowTabletId = idGeneratorBuffer.getNextId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 3731f7cb88..757898d0ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -746,7 +746,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
for (Tablet shadownTablet : shadowIndex.getTablets()) {
TabletMeta shadowTabletMeta = new TabletMeta(dbId,
tableId, partitionId, shadowIndexId,
-
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium, -1, 0);
+
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);
invertedIndex.addTablet(shadownTablet.getId(),
shadowTabletMeta);
for (Replica shadowReplica : shadownTablet.getReplicas()) {
invertedIndex.addReplica(shadownTablet.getId(),
shadowReplica);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index c01aa2b685..8b894fc920 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -988,8 +988,7 @@ public class RestoreJob extends AbstractJob {
MaterializedIndexMeta indexMeta =
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
- restoredIdx.getId(), indexMeta.getSchemaHash(),
TStorageMedium.HDD,
- restoreTablet.getCooldownReplicaId(),
restoreTablet.getCooldownTerm());
+ restoredIdx.getId(), indexMeta.getSchemaHash(),
TStorageMedium.HDD);
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(),
tabletMeta);
for (Replica restoreReplica : restoreTablet.getReplicas()) {
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1177,8 +1176,7 @@ public class RestoreJob extends AbstractJob {
int schemaHash =
localTbl.getSchemaHashByIndexId(restoreIdx.getId());
for (Tablet restoreTablet : restoreIdx.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(db.getId(),
localTbl.getId(), restorePart.getId(),
- restoreIdx.getId(), schemaHash,
TStorageMedium.HDD, restoreTablet.getCooldownReplicaId(),
- restoreTablet.getCooldownTerm());
+ restoreIdx.getId(), schemaHash,
TStorageMedium.HDD);
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
for (Replica restoreReplica : restoreTablet.getReplicas())
{
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1210,8 +1208,7 @@ public class RestoreJob extends AbstractJob {
int schemaHash =
olapRestoreTbl.getSchemaHashByIndexId(restoreIdx.getId());
for (Tablet restoreTablet : restoreIdx.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(db.getId(),
restoreTbl.getId(), restorePart.getId(),
- restoreIdx.getId(), schemaHash,
TStorageMedium.HDD,
- restoreTablet.getCooldownReplicaId(),
restoreTablet.getCooldownTerm());
+ restoreIdx.getId(), schemaHash,
TStorageMedium.HDD);
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
for (Replica restoreReplica :
restoreTablet.getReplicas()) {
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index ec2588ae2e..dadb0f722e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -840,8 +840,7 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium,
- tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -893,8 +892,7 @@ public class CatalogRecycleBin extends MasterDaemon
implements Writable {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium,
- tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId,
partitionId, indexId, schemaHash, medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 34f0dafdd2..15d38f61c6 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -122,7 +122,7 @@ import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.consistency.ConsistencyChecker;
-import org.apache.doris.cooldown.CooldownHandler;
+import org.apache.doris.cooldown.CooldownConfHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.EsExternalCatalog;
@@ -314,7 +314,7 @@ public class Env {
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
- private CooldownHandler cooldownHandler;
+ private CooldownConfHandler cooldownConfHandler;
private MetastoreEventsProcessor metastoreEventsProcessor;
private MasterDaemon labelCleaner; // To clean old LabelInfo,
ExportJobInfos
@@ -551,7 +551,9 @@ public class Env {
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInMemoryInfoCollector = new
PartitionInMemoryInfoCollector();
- this.cooldownHandler = new CooldownHandler();
+ if (Config.enable_storage_policy) {
+ this.cooldownConfHandler = new CooldownConfHandler();
+ }
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.replayedJournalId = new AtomicLong(0L);
@@ -1398,8 +1400,8 @@ public class Env {
dbUsedDataQuotaInfoCollector.start();
// start daemon thread to update global partition in memory
information periodically
partitionInMemoryInfoCollector.start();
- if (Config.cooldown_single_remote_file) {
- cooldownHandler.start();
+ if (Config.enable_storage_policy) {
+ cooldownConfHandler.start();
}
streamLoadRecordMgr.start();
getInternalCatalog().getIcebergTableCreationRecordMgr().start();
@@ -1738,12 +1740,6 @@ public class Env {
return checksum;
}
- public long loadCooldownJob(DataInputStream dis, long checksum) throws
IOException {
- cooldownHandler.readField(dis);
- LOG.info("finished replay loadCooldownJob from image");
- return checksum;
- }
-
public long loadAlterJob(DataInputStream dis, long checksum) throws
IOException {
long newChecksum = checksum;
for (JobType type : JobType.values()) {
@@ -2155,14 +2151,6 @@ public class Env {
return checksum;
}
- /**
- * Save CooldownJob.
- */
- public long saveCooldownJob(CountingDataOutputStream out, long checksum)
throws IOException {
- Env.getCurrentEnv().getCooldownHandler().write(out);
- return checksum;
- }
-
public long saveMTMVJobManager(CountingDataOutputStream out, long
checksum) throws IOException {
if (Config.enable_mtmv_scheduler_framework) {
Env.getCurrentEnv().getMTMVJobManager().write(out, checksum);
@@ -3387,8 +3375,8 @@ public class Env {
return (MaterializedViewHandler)
this.alter.getMaterializedViewHandler();
}
- public CooldownHandler getCooldownHandler() {
- return cooldownHandler;
+ public CooldownConfHandler getCooldownConfHandler() {
+ return cooldownConfHandler;
}
public SystemHandler getClusterHandler() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 7f594afa7d..69964693c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -49,6 +49,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -90,10 +91,13 @@ public class Tablet extends MetaObject implements Writable {
private long checkedVersionHash;
@SerializedName(value = "isConsistent")
private boolean isConsistent;
+
+ // cooldown conf
@SerializedName(value = "cooldownReplicaId")
- private long cooldownReplicaId;
+ private long cooldownReplicaId = -1;
@SerializedName(value = "cooldownTerm")
- private long cooldownTerm;
+ private long cooldownTerm = -1;
+ private ReentrantReadWriteLock cooldownConfLock = new
ReentrantReadWriteLock();
// last time that the tablet checker checks this tablet.
// no need to persist
@@ -143,20 +147,20 @@ public class Tablet extends MetaObject implements
Writable {
return isConsistent;
}
- public long getCooldownReplicaId() {
- return cooldownReplicaId;
- }
-
- public void setCooldownReplicaId(long cooldownReplicaId) {
+ public void setCooldownConf(long cooldownReplicaId, long cooldownTerm) {
+ cooldownConfLock.writeLock().lock();
this.cooldownReplicaId = cooldownReplicaId;
+ this.cooldownTerm = cooldownTerm;
+ cooldownConfLock.writeLock().unlock();
}
- public long getCooldownTerm() {
- return cooldownTerm;
- }
-
- public void setCooldownTerm(long cooldownTerm) {
- this.cooldownTerm = cooldownTerm;
+ public Pair<Long, Long> getCooldownConf() {
+ cooldownConfLock.readLock().lock();
+ try {
+ return Pair.of(cooldownReplicaId, cooldownTerm);
+ } finally {
+ cooldownConfLock.readLock().unlock();
+ }
}
private boolean deleteRedundantReplica(long backendId, long version) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 98fcf380f6..0ed99185ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -19,6 +19,8 @@ package org.apache.doris.catalog;
import org.apache.doris.catalog.Replica.ReplicaState;
import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.cooldown.CooldownConf;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTablet;
@@ -46,7 +48,6 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -66,7 +67,7 @@ public class TabletInvertedIndex {
public static final int NOT_EXIST_VALUE = -1;
public static final TabletMeta NOT_EXIST_TABLET_META = new
TabletMeta(NOT_EXIST_VALUE, NOT_EXIST_VALUE,
- NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE,
TStorageMedium.HDD, -1, -1);
+ NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE,
TStorageMedium.HDD);
private StampedLock lock = new StampedLock();
@@ -126,7 +127,8 @@ public class TabletInvertedIndex {
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
List<Triple<Long, Integer, Boolean>>
tabletToInMemory,
- Map<Long, TabletMeta> syncCooldownTabletMap) {
+ List<CooldownConf> cooldownConfToPush,
+ List<CooldownConf> cooldownConfToUpdate) {
long stamp = readLock();
long start = System.currentTimeMillis();
try {
@@ -188,9 +190,9 @@ public class TabletInvertedIndex {
}
}
- if (Config.cooldown_single_remote_file
- && needChangeCooldownConf(tabletMeta,
backendTabletInfo)) {
-
syncCooldownTabletMap.put(backendTabletInfo.getTabletId(), tabletMeta);
+ if (Config.enable_storage_policy) {
+ handleCooldownConf(tabletMeta,
backendTabletInfo, cooldownConfToPush,
+ cooldownConfToUpdate);
}
long partitionId = tabletMeta.getPartitionId();
@@ -335,48 +337,81 @@ public class TabletInvertedIndex {
return false;
}
- private boolean needChangeCooldownConf(TabletMeta tabletMeta, TTabletInfo
beTabletInfo) {
- if (!beTabletInfo.isIsCooldown()) {
- return false;
- }
- // check cooldown type in fe and be, they need to be the same.
- if (tabletMeta.getCooldownReplicaId() !=
beTabletInfo.getCooldownReplicaId()) {
- LOG.warn("cooldownReplicaId is wrong for tablet: {}, Fe: {}, Be:
{}", beTabletInfo.getTabletId(),
- tabletMeta.getCooldownReplicaId(),
beTabletInfo.getCooldownReplicaId());
- return true;
+ private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo
beTabletInfo,
+ List<CooldownConf> cooldownConfToPush, List<CooldownConf>
cooldownConfToUpdate) {
+ if (!beTabletInfo.isSetCooldownReplicaId()) {
+ return;
}
- // check cooldown type in one tablet, One UPLOAD_DATA is needed in the
replicas.
- long stamp = readLock();
+ Tablet tablet;
try {
- boolean replicaInvalid = true;
- Map<Long, Replica> replicaMap =
replicaMetaTable.row(beTabletInfo.getTabletId());
- for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
- if (entry.getValue().getId() ==
beTabletInfo.getCooldownReplicaId()) {
- replicaInvalid = false;
- break;
- }
+ OlapTable table = (OlapTable)
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())
+ .getTable(tabletMeta.getTableId())
+ .get();
+ table.readLock();
+ try {
+ tablet =
table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId())
+ .getTablet(beTabletInfo.tablet_id);
+ } finally {
+ table.readUnlock();
}
- if (replicaInvalid) {
- return true;
+ } catch (RuntimeException e) {
+ LOG.warn("failed to get tablet. tabletId={}",
beTabletInfo.tablet_id);
+ return;
+ }
+ Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
+ if (beTabletInfo.getCooldownTerm() > cooldownConf.second) { // should
not be here
+ LOG.warn("report cooldownTerm({}) > cooldownTerm in
TabletMeta({}), tabletId={}",
+ beTabletInfo.getCooldownTerm(), cooldownConf.second,
beTabletInfo.tablet_id);
+ return;
+ }
+
+ if (cooldownConf.first <= 0) { // invalid cooldownReplicaId
+ CooldownConf conf = new CooldownConf(tabletMeta.getDbId(),
tabletMeta.getTableId(),
+ tabletMeta.getPartitionId(), tabletMeta.getIndexId(),
beTabletInfo.tablet_id, cooldownConf.second);
+ synchronized (cooldownConfToUpdate) {
+ cooldownConfToUpdate.add(conf);
}
- } finally {
- readUnlock(stamp);
+ return;
+ }
+
+ // validate replica is active
+ Map<Long, Replica> replicaMap =
replicaMetaTable.row(beTabletInfo.getTabletId());
+ if (replicaMap.isEmpty()) {
+ return;
+ }
+ boolean replicaInvalid = true;
+ for (Replica replica : replicaMap.values()) {
+ if (replica.getId() == cooldownConf.first) {
+ replicaInvalid = false;
+ break;
+ }
+ }
+ if (replicaInvalid) {
+ CooldownConf conf = new CooldownConf(tabletMeta.getDbId(),
tabletMeta.getTableId(),
+ tabletMeta.getPartitionId(), tabletMeta.getIndexId(),
beTabletInfo.tablet_id, cooldownConf.second);
+ synchronized (cooldownConfToUpdate) {
+ cooldownConfToUpdate.add(conf);
+ }
+ return;
+ }
+
+ if (cooldownConf.first != beTabletInfo.getCooldownReplicaId()) {
+ CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id,
cooldownConf.first, cooldownConf.second);
+ synchronized (cooldownConfToPush) {
+ cooldownConfToPush.add(conf);
+ }
+ return;
}
- return false;
}
public List<Replica> getReplicas(Long tabletId) {
- List<Replica> replicas = new LinkedList<>();
long stamp = readLock();
try {
Map<Long, Replica> replicaMap = replicaMetaTable.row(tabletId);
- for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
- replicas.add(entry.getValue());
- }
+ return replicaMap.values().stream().collect(Collectors.toList());
} finally {
readUnlock(stamp);
}
- return replicas;
}
/**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
index bfaaba3eb2..be291d93ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
@@ -22,8 +22,6 @@ import org.apache.doris.thrift.TStorageMedium;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
public class TabletMeta {
private static final Logger LOG = LogManager.getLogger(TabletMeta.class);
@@ -37,14 +35,8 @@ public class TabletMeta {
private TStorageMedium storageMedium;
- private long cooldownReplicaId;
-
- private long cooldownTerm;
-
- private ReentrantReadWriteLock lock;
-
public TabletMeta(long dbId, long tableId, long partitionId, long indexId,
int schemaHash,
- TStorageMedium storageMedium, long cooldownReplicaId, long
cooldownTerm) {
+ TStorageMedium storageMedium) {
this.dbId = dbId;
this.tableId = tableId;
this.partitionId = partitionId;
@@ -54,10 +46,6 @@ public class TabletMeta {
this.newSchemaHash = -1;
this.storageMedium = storageMedium;
- this.cooldownReplicaId = cooldownReplicaId;
- this.cooldownTerm = cooldownTerm;
-
- lock = new ReentrantReadWriteLock();
}
public long getDbId() {
@@ -84,46 +72,20 @@ public class TabletMeta {
this.storageMedium = storageMedium;
}
- public long getCooldownReplicaId() {
- return cooldownReplicaId;
- }
-
- public void setCooldownReplicaId(long cooldownReplicaId) {
- this.cooldownReplicaId = cooldownReplicaId;
- }
-
- public long getCooldownTerm() {
- return cooldownTerm;
- }
-
- public void setCooldownTerm(long cooldownTerm) {
- this.cooldownTerm = cooldownTerm;
- }
-
public int getOldSchemaHash() {
- lock.readLock().lock();
- try {
- return this.oldSchemaHash;
- } finally {
- lock.readLock().unlock();
- }
+ return this.oldSchemaHash;
}
@Override
public String toString() {
- lock.readLock().lock();
- try {
- StringBuilder sb = new StringBuilder();
- sb.append("dbId=").append(dbId);
- sb.append(" tableId=").append(tableId);
- sb.append(" partitionId=").append(partitionId);
- sb.append(" indexId=").append(indexId);
- sb.append(" oldSchemaHash=").append(oldSchemaHash);
- sb.append(" newSchemaHash=").append(newSchemaHash);
-
- return sb.toString();
- } finally {
- lock.readLock().unlock();
- }
+ StringBuilder sb = new StringBuilder();
+ sb.append("dbId=").append(dbId);
+ sb.append(" tableId=").append(tableId);
+ sb.append(" partitionId=").append(partitionId);
+ sb.append(" indexId=").append(indexId);
+ sb.append(" oldSchemaHash=").append(oldSchemaHash);
+ sb.append(" newSchemaHash=").append(newSchemaHash);
+
+ return sb.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
index 9275a2633c..b0dcec8732 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
@@ -22,8 +22,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import lombok.Data;
import java.io.DataInput;
import java.io.DataOutput;
@@ -32,9 +31,8 @@ import java.io.IOException;
/**
* This class represents the olap replica related metadata.
*/
+@Data
public class CooldownConf implements Writable {
- private static final Logger LOG = LogManager.getLogger(CooldownConf.class);
-
@SerializedName(value = "dbId")
protected long dbId;
@SerializedName(value = "tableId")
@@ -46,74 +44,27 @@ public class CooldownConf implements Writable {
@SerializedName(value = "tabletId")
protected long tabletId;
@SerializedName(value = "cooldownReplicaId")
- protected long cooldownReplicaId;
+ protected long cooldownReplicaId = -1;
@SerializedName(value = "cooldownTerm")
- protected long cooldownTerm;
-
- public CooldownConf(long dbId, long tableId, long partitionId, long
indexId, long tabletId, long cooldownReplicaId,
- long cooldownTerm) {
- this.dbId = dbId;
- this.tableId = tableId;
- this.partitionId = partitionId;
- this.indexId = indexId;
- this.tabletId = tabletId;
- this.cooldownReplicaId = cooldownReplicaId;
- this.cooldownTerm = cooldownTerm;
- }
+ protected long cooldownTerm = -1;
- public long getDbId() {
- return dbId;
+ public CooldownConf() {
}
- public void setDbId(long dbId) {
+ // for update
+ public CooldownConf(long dbId, long tableId, long partitionId, long
indexId, long tabletId, long cooldownTerm) {
this.dbId = dbId;
- }
-
- public long getTableId() {
- return tableId;
- }
-
- public void setTableId(long tableId) {
this.tableId = tableId;
- }
-
- public long getPartitionId() {
- return partitionId;
- }
-
- public void setPartitionId(long partitionId) {
this.partitionId = partitionId;
- }
-
- public long getIndexId() {
- return indexId;
- }
-
- public void setIndexId(long indexId) {
this.indexId = indexId;
- }
-
- public long getTabletId() {
- return tabletId;
- }
-
- public void setTabletId(long tabletId) {
this.tabletId = tabletId;
+ this.cooldownTerm = cooldownTerm;
}
- public long getCooldownReplicaId() {
- return cooldownReplicaId;
- }
-
- public void setCooldownReplicaId(long cooldownReplicaId) {
+ // for push
+ public CooldownConf(long tabletId, long cooldownReplicaId, long
cooldownTerm) {
+ this.tabletId = tabletId;
this.cooldownReplicaId = cooldownReplicaId;
- }
-
- public long getCooldownTerm() {
- return cooldownTerm;
- }
-
- public void setCooldownTerm(long cooldownTerm) {
this.cooldownTerm = cooldownTerm;
}
@@ -123,8 +74,9 @@ public class CooldownConf implements Writable {
Text.writeString(out, json);
}
- public static CooldownJob read(DataInput in) throws IOException {
+ public static CooldownConf read(DataInput in) throws IOException {
String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json, CooldownJob.class);
+ return GsonUtils.GSON.fromJson(json, CooldownConf.class);
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java
new file mode 100644
index 0000000000..f840a304fc
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.util.MasterDaemon;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class CooldownConfHandler extends MasterDaemon {
+ private static final Logger LOG =
LogManager.getLogger(CooldownConfHandler.class);
+
+ // TODO(plat1ko): better to use `Condition`?
+ private static final long INTERVAL_MS = 5000; // 5s
+ private static final int UPDATE_BATCH_SIZE = 512;
+
+ private final Map<Long, CooldownConf> cooldownConfToUpdate =
Maps.newConcurrentMap();
+
+ public CooldownConfHandler() {
+ super("CooldownConfHandler", INTERVAL_MS);
+ }
+
+ public void addCooldownConfToUpdate(List<CooldownConf> cooldownConfs) {
+ cooldownConfs.forEach(conf ->
cooldownConfToUpdate.put(conf.getTabletId(), conf));
+ }
+
+ @Override
+ protected void runAfterCatalogReady() {
+ if (cooldownConfToUpdate.isEmpty()) {
+ return;
+ }
+ List<CooldownConf> cooldownConfList =
cooldownConfToUpdate.values().stream().collect(Collectors.toList());
+ for (int start = 0; start < cooldownConfList.size(); start +=
UPDATE_BATCH_SIZE) {
+ updateCooldownConf(
+ cooldownConfList.subList(start, Math.min(start +
UPDATE_BATCH_SIZE, cooldownConfList.size())));
+ }
+ }
+
+ private void updateCooldownConf(List<CooldownConf> confToUpdate) {
+ ArrayList<CooldownConf> updatedConf = new ArrayList<>();
+ updatedConf.ensureCapacity(confToUpdate.size());
+
+ Map<Long, Tablet> tabletMap = new HashMap<>(); // cache tablet
+
+ TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+ for (CooldownConf conf : confToUpdate) {
+ // choose cooldown replica
+ List<Replica> replicas =
invertedIndex.getReplicas(conf.getTabletId());
+ if (replicas.isEmpty()) {
+ continue;
+ }
+ Random rand = new Random(System.currentTimeMillis());
+ int index = rand.nextInt(replicas.size());
+ conf.setCooldownReplicaId(replicas.get(index).getId());
+ // find TabletMeta to get cooldown term
+ Tablet tablet = getTablet(conf);
+ if (tablet == null || tablet.getCooldownConf().second !=
conf.cooldownTerm) {
+ // If tablet.cooldownTerm != conf.cooldownTerm, means cooldown
conf of this tablet has been updated,
+ // should skip this update.
+ continue;
+ }
+ ++conf.cooldownTerm;
+ updatedConf.add(conf);
+ tabletMap.put(conf.tabletId, tablet);
+ }
+
+ // write editlog
+ CooldownConfList list = new CooldownConfList(updatedConf);
+ Env.getCurrentEnv().getEditLog().logUpdateCooldownConf(list);
+
+ // update Tablet
+ for (CooldownConf conf : updatedConf) {
+ Tablet tablet = tabletMap.get(conf.tabletId);
+ tablet.setCooldownConf(conf.cooldownReplicaId, conf.cooldownTerm);
+ LOG.info("update cooldown conf. tabletId={} cooldownReplicaId={}
cooldownTerm={}", conf.tabletId,
+ conf.cooldownReplicaId, conf.cooldownTerm);
+ }
+
+ // update finish, remove from map
+ confToUpdate.forEach(conf ->
cooldownConfToUpdate.remove(conf.getTabletId()));
+
+ // TODO(plat1ko): push CooldownConf to BE?
+ }
+
+ private static Tablet getTablet(CooldownConf conf) {
+ try {
+ OlapTable table = (OlapTable)
Env.getCurrentInternalCatalog().getDbNullable(conf.dbId)
+ .getTable(conf.tableId)
+ .get();
+ table.readLock();
+ try {
+ return
table.getPartition(conf.partitionId).getIndex(conf.indexId).getTablet(conf.tabletId);
+ } finally {
+ table.readUnlock();
+ }
+ } catch (RuntimeException e) {
+ LOG.warn("failed to get tablet. tabletId={}", conf.tabletId);
+ return null;
+ }
+ }
+
+ public static void replayUpdateCooldownConf(CooldownConfList
cooldownConfList) {
+ cooldownConfList.getCooldownConf().forEach(conf -> {
+ Tablet tablet = getTablet(conf);
+ if (tablet != null) {
+ tablet.setCooldownConf(conf.cooldownReplicaId,
conf.cooldownTerm);
+ }
+ });
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java
new file mode 100644
index 0000000000..06bca7a42b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class CooldownConfList implements Writable {
+ @SerializedName(value = "cooldownConf")
+ private List<CooldownConf> cooldownConf;
+
+ CooldownConfList(List<CooldownConf> cooldownConf) {
+ this.cooldownConf = cooldownConf;
+ }
+
+ List<CooldownConf> getCooldownConf() {
+ return cooldownConf;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ String json = GsonUtils.GSON.toJson(this);
+ Text.writeString(out, json);
+ }
+
+ public static CooldownConfList read(DataInput in) throws IOException {
+ String json = Text.readString(in);
+ return GsonUtils.GSON.fromJson(json, CooldownConfList.class);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
deleted file mode 100644
index c0ec6bd5e7..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.common.DdlException;
-
-/*
- * This exception will be thrown when the cooldown job(v2) running failed.
- */
-public class CooldownException extends DdlException {
-
- private static final long serialVersionUID = 4844951783432954268L;
-
- public CooldownException(String msg) {
- super(msg);
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
deleted file mode 100644
index 4ad601e5f4..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.TabletMeta;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.common.util.MasterDaemon;
-
-import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadPoolExecutor;
-
-public class CooldownHandler extends MasterDaemon {
- private static final Logger LOG =
LogManager.getLogger(CooldownHandler.class);
- private static final int MAX_ACTIVE_COOLDOWN_JOB_SIZE = 10;
-
- private static final int MAX_RUNABLE_COOLDOWN_JOB_SIZE = 100;
-
- private static final int MAX_TABLET_PER_JOB = 100;
-
- private static final long timeoutMs = 1000L *
Config.push_cooldown_conf_timeout_second;
-
- // jobId -> CooldownJob, it is used to hold CooldownJob which is used to
sent conf to be.
- private final Map<Long, CooldownJob> runableCooldownJobs =
Maps.newConcurrentMap();
- private final Map<Long, Boolean> resetingTablet = Maps.newConcurrentMap();
- // jobId -> CooldownJob,
- public final Map<Long, CooldownJob> activeCooldownJobs =
Maps.newConcurrentMap();
-
- public final ThreadPoolExecutor cooldownThreadPool =
ThreadPoolManager.newDaemonCacheThreadPool(
- MAX_ACTIVE_COOLDOWN_JOB_SIZE, "cooldown-pool", true);
-
- // syncCooldownTabletMap: tabletId -> TabletMeta
- public void handleCooldownConf(Map<Long, TabletMeta>
syncCooldownTabletMap) {
- List<CooldownConf> cooldownConfList = new LinkedList<>();
- for (Map.Entry<Long, TabletMeta> entry :
syncCooldownTabletMap.entrySet()) {
- if (runableCooldownJobs.size() >= MAX_RUNABLE_COOLDOWN_JOB_SIZE) {
- return;
- }
- Long tabletId = entry.getKey();
- TabletMeta tabletMeta = entry.getValue();
- if (resetingTablet.containsKey(tabletId)) {
- continue;
- }
- long cooldownReplicaId = -1;
- List<Replica> replicas =
Env.getCurrentInvertedIndex().getReplicas(tabletId);
- if (replicas.size() == 0) {
- continue;
- }
- for (Replica replica : replicas) {
- if (tabletMeta.getCooldownReplicaId() == replica.getId()) {
- cooldownReplicaId = tabletMeta.getCooldownReplicaId();
- break;
- }
- }
- long cooldownTerm = tabletMeta.getCooldownTerm();
- if (cooldownReplicaId == -1) {
- Random rand = new Random(System.currentTimeMillis());
- int index = rand.nextInt(replicas.size());
- cooldownReplicaId = replicas.get(index).getId();
- ++cooldownTerm;
- }
- CooldownConf cooldownConf = new CooldownConf(tabletMeta.getDbId(),
tabletMeta.getTableId(),
- tabletMeta.getPartitionId(), tabletMeta.getIndexId(),
tabletId, cooldownReplicaId, cooldownTerm);
- cooldownConfList.add(cooldownConf);
- if (cooldownConfList.size() >= MAX_TABLET_PER_JOB) {
- long jobId = Env.getCurrentEnv().getNextId();
- CooldownJob cooldownJob = new CooldownJob(jobId,
cooldownConfList, timeoutMs);
- runableCooldownJobs.put(jobId, cooldownJob);
- for (CooldownConf conf : cooldownConfList) {
- resetingTablet.put(conf.getTabletId(), true);
- }
- cooldownConfList = new LinkedList<>();
- }
- }
- if (cooldownConfList.size() > 0) {
- long jobId = Env.getCurrentEnv().getNextId();
- CooldownJob cooldownJob = new CooldownJob(jobId, cooldownConfList,
timeoutMs);
- runableCooldownJobs.put(jobId, cooldownJob);
- for (CooldownConf conf : cooldownConfList) {
- resetingTablet.put(conf.getTabletId(), true);
- }
- }
- }
-
- @Override
- protected void runAfterCatalogReady() {
- clearFinishedOrCancelledCooldownJob();
- runableCooldownJobs.values().forEach(cooldownJob -> {
- if (!cooldownJob.isDone() &&
!activeCooldownJobs.containsKey(cooldownJob.getJobId())
- && activeCooldownJobs.size() <
MAX_ACTIVE_COOLDOWN_JOB_SIZE) {
- if (FeConstants.runningUnitTest) {
- cooldownJob.run();
- } else {
- cooldownThreadPool.submit(() -> {
- if
(activeCooldownJobs.putIfAbsent(cooldownJob.getJobId(), cooldownJob) == null) {
- try {
- cooldownJob.run();
- } finally {
-
activeCooldownJobs.remove(cooldownJob.getJobId());
- }
- }
- });
- }
- }
- });
- }
-
- public void write(DataOutput out) throws IOException {
- if (Config.cooldown_single_remote_file) {
- out.writeInt(runableCooldownJobs.size());
- for (CooldownJob cooldownJob : runableCooldownJobs.values()) {
- cooldownJob.write(out);
- }
- }
- }
-
- public void readField(DataInput in) throws IOException {
- if (Config.cooldown_single_remote_file) {
- if (Env.getCurrentEnvJournalVersion() >=
FeMetaVersion.VERSION_115) {
- int size = in.readInt();
- for (int i = 0; i < size; i++) {
- CooldownJob cooldownJob = CooldownJob.read(in);
- replayCooldownJob(cooldownJob);
- }
- }
- }
- }
-
- public void replayCooldownJob(CooldownJob cooldownJob) {
- CooldownJob replayCooldownJob;
- if (!runableCooldownJobs.containsKey(cooldownJob.getJobId())) {
- replayCooldownJob = new CooldownJob(cooldownJob.jobId,
cooldownJob.getCooldownConfList(),
- cooldownJob.timeoutMs);
- runableCooldownJobs.put(cooldownJob.getJobId(), replayCooldownJob);
- for (CooldownConf cooldownConf :
cooldownJob.getCooldownConfList()) {
- resetingTablet.put(cooldownConf.getTabletId(), true);
- }
- } else {
- replayCooldownJob =
runableCooldownJobs.get(cooldownJob.getJobId());
- }
- replayCooldownJob.replay(cooldownJob);
- if (replayCooldownJob.isDone()) {
- runableCooldownJobs.remove(cooldownJob.getJobId());
- for (CooldownConf cooldownConf :
cooldownJob.getCooldownConfList()) {
- resetingTablet.remove(cooldownConf.getTabletId());
- }
- }
- }
-
- private void clearFinishedOrCancelledCooldownJob() {
- Iterator<Map.Entry<Long, CooldownJob>> iterator =
runableCooldownJobs.entrySet().iterator();
- while (iterator.hasNext()) {
- CooldownJob cooldownJob = iterator.next().getValue();
- if (cooldownJob.isDone()) {
- iterator.remove();
- for (CooldownConf cooldownConf :
cooldownJob.getCooldownConfList()) {
- resetingTablet.remove(cooldownConf.getTabletId());
- }
- }
- }
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
deleted file mode 100644
index 99643b874a..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
+++ /dev/null
@@ -1,401 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.task.AgentBatchTask;
-import org.apache.doris.task.AgentTask;
-import org.apache.doris.task.AgentTaskExecutor;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.PushCooldownConfTask;
-import org.apache.doris.thrift.TTaskType;
-
-import com.google.common.base.Preconditions;
-import com.google.gson.annotations.SerializedName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class CooldownJob implements Writable {
- private static final Logger LOG = LogManager.getLogger(CooldownJob.class);
-
- public enum JobState {
- PENDING, // Job is created
- SEND_CONF, // send cooldown task to BE.
- RUNNING, // cooldown tasks are sent to BE, and waiting for them
finished.
- FINISHED, // job is done
- CANCELLED; // job is cancelled(failed or be cancelled by user)
-
- public boolean isFinalState() {
- return this == CooldownJob.JobState.FINISHED || this ==
CooldownJob.JobState.CANCELLED;
- }
- }
-
- @SerializedName(value = "jobId")
- protected long jobId;
- @SerializedName(value = "jobState")
- protected CooldownJob.JobState jobState;
- @SerializedName(value = "cooldownConfList")
- protected List<CooldownConf> cooldownConfList;
-
- @SerializedName(value = "errMsg")
- protected String errMsg = "";
- @SerializedName(value = "createTimeMs")
- protected long createTimeMs = -1;
- @SerializedName(value = "finishedTimeMs")
- protected long finishedTimeMs = -1;
- @SerializedName(value = "timeoutMs")
- protected long timeoutMs = -1;
-
- public long getJobId() {
- return jobId;
- }
-
- public JobState getJobState() {
- return jobState;
- }
-
- public List<CooldownConf> getCooldownConfList() {
- return cooldownConfList;
- }
-
- public long getTimeoutMs() {
- return timeoutMs;
- }
-
- private AgentBatchTask cooldownBatchTask = new AgentBatchTask();
-
- public CooldownJob(long jobId, List<CooldownConf> cooldownConfList, long
timeoutMs) {
- this.jobId = jobId;
- this.jobState = JobState.PENDING;
- this.cooldownConfList = cooldownConfList;
- this.createTimeMs = System.currentTimeMillis();
- this.timeoutMs = timeoutMs;
- }
-
- protected void runPendingJob() throws CooldownException {
- Preconditions.checkState(jobState == CooldownJob.JobState.PENDING,
jobState);
- this.jobState = JobState.SEND_CONF;
- // write edit log
- for (CooldownConf cooldownConf : cooldownConfList) {
- setCooldownReplica(cooldownConf.getDbId(),
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
- cooldownConf.getIndexId(), cooldownConf.getTabletId(),
cooldownConf.getCooldownReplicaId(),
- cooldownConf.getCooldownTerm());
-
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
- cooldownConf.getCooldownReplicaId());
-
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
- cooldownConf.getCooldownTerm());
- }
- Env.getCurrentEnv().getEditLog().logCooldownJob(this);
- LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
- }
-
- protected void runSendJob() throws CooldownException {
- Preconditions.checkState(jobState == JobState.SEND_CONF, jobState);
- LOG.info("begin to send cooldown conf tasks. job: {}", jobId);
- if (!FeConstants.runningUnitTest) {
- Map<Long, List<CooldownConf>> cooldownMap = new HashMap<>();
- for (CooldownConf cooldownConf : cooldownConfList) {
- Database db = Env.getCurrentInternalCatalog()
- .getDbOrException(cooldownConf.getDbId(), s -> new
CooldownException(
- "Database " + s + " does not exist"));
- OlapTable tbl;
- try {
- tbl = (OlapTable)
db.getTableOrMetaException(cooldownConf.getTableId(), TableIf.TableType.OLAP);
- } catch (MetaNotFoundException e) {
- throw new CooldownException(e.getMessage());
- }
- if (tbl == null) {
- throw new CooldownException(String.format("No table: %d",
cooldownConf.getTableId()));
- }
- tbl.readLock();
- try {
- Partition partition =
tbl.getPartition(cooldownConf.getPartitionId());
- if (partition == null) {
- throw new CooldownException(String.format("No
partition: %d", cooldownConf.getPartitionId()));
- }
- MaterializedIndex index =
partition.getIndex(cooldownConf.getIndexId());
- if (index == null) {
- throw new CooldownException(String.format("No index:
%d", cooldownConf.getIndexId()));
- }
- Tablet tablet =
index.getTablet(cooldownConf.getTabletId());
- if (tablet == null) {
- throw new CooldownException(String.format("No tablet:
%d", cooldownConf.getTabletId()));
- }
- for (Replica replica : tablet.getReplicas()) {
- if (!cooldownMap.containsKey(replica.getBackendId())) {
- cooldownMap.put(replica.getBackendId(), new
LinkedList<>());
- }
-
cooldownMap.get(replica.getBackendId()).add(cooldownConf);
- }
- } finally {
- tbl.readUnlock();
- }
- }
- for (Map.Entry<Long, List<CooldownConf>> entry :
cooldownMap.entrySet()) {
- PushCooldownConfTask pushCooldownConfTask = new
PushCooldownConfTask(entry.getKey(), entry.getValue());
- cooldownBatchTask.addTask(pushCooldownConfTask);
- }
- AgentTaskQueue.addBatchTask(cooldownBatchTask);
- AgentTaskExecutor.submit(cooldownBatchTask);
- }
-
- this.jobState = JobState.RUNNING;
- // write edit log
- Env.getCurrentEnv().getEditLog().logCooldownJob(this);
- LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
- }
-
- protected void runRunningJob() throws CooldownException {
- if (!cooldownBatchTask.isFinished()) {
- LOG.info("cooldown tasks not finished. job: {}", jobId);
- List<AgentTask> tasks = cooldownBatchTask.getUnfinishedTasks(2000);
- for (AgentTask task : tasks) {
- if (task.getFailedTimes() >= 3) {
- task.setFinished(true);
- AgentTaskQueue.removeTask(task.getBackendId(),
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
- LOG.warn("push cooldown conf task failed after try three
times: " + task.getErrorMsg());
- throw new CooldownException("cooldown tasks failed on
backend: " + task.getBackendId());
- }
- }
- return;
- }
- this.jobState = CooldownJob.JobState.FINISHED;
- this.finishedTimeMs = System.currentTimeMillis();
-
- Env.getCurrentEnv().getEditLog().logCooldownJob(this);
- LOG.info("push cooldown conf job finished: {}", jobId);
- }
-
- public boolean isTimeout() {
- return System.currentTimeMillis() - createTimeMs > timeoutMs;
- }
-
- public boolean isDone() {
- return jobState.isFinalState();
- }
-
- /*
- * cancelImpl() can be called any time any place.
- * We need to clean any possible residual of this job.
- */
- protected synchronized void cancelImpl(String errMsg) {
- if (jobState.isFinalState()) {
- return;
- }
-
- cancelInternal();
-
- this.errMsg = errMsg;
- this.finishedTimeMs = System.currentTimeMillis();
- LOG.info("cancel cooldown job {}, err: {}", jobId, errMsg);
- Env.getCurrentEnv().getEditLog().logCooldownJob(this);
- }
-
- /**
- * The keyword 'synchronized' only protects 2 methods:
- * run() and cancel()
- * Only these 2 methods can be visited by different thread(internal
working thread and user connection thread)
- * So using 'synchronized' to make sure only one thread can run the job at
one time.
- *
- * lock order:
- * synchronized
- * db lock
- */
- public synchronized void run() {
- if (isTimeout()) {
- cancelImpl("Timeout");
- return;
- }
-
- try {
- switch (jobState) {
- case PENDING:
- runPendingJob();
- break;
- case SEND_CONF:
- runSendJob();
- break;
- case RUNNING:
- runRunningJob();
- break;
- default:
- break;
- }
- } catch (CooldownException e) {
- cancelImpl(e.getMessage());
- }
- }
-
- public void replay(CooldownJob replayedJob) {
- try {
- switch (replayedJob.jobState) {
- case PENDING:
- replayCreateJob(replayedJob);
- break;
- case SEND_CONF:
- replayPendingJob();
- break;
- case FINISHED:
- replayRunningJob(replayedJob);
- break;
- case CANCELLED:
- replayCancelled(replayedJob);
- break;
- default:
- break;
- }
- } catch (CooldownException e) {
- LOG.warn("[INCONSISTENT META] replay cooldown job failed {}",
replayedJob.jobId, e);
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- String json = GsonUtils.GSON.toJson(this);
- Text.writeString(out, json);
- }
-
- public static CooldownJob read(DataInput in) throws IOException {
- if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
- String json = Text.readString(in);
- return GsonUtils.GSON.fromJson(json, CooldownJob.class);
- }
- return null;
- }
-
- /**
- * Replay job in PENDING state.
- * Should replay all changes before this job's state transfer to PENDING.
- * These changes should be same as changes in CooldownHandler.createJob()
- */
- private void replayCreateJob(CooldownJob replayedJob) {
- jobId = replayedJob.jobId;
- for (CooldownConf replayedConf : replayedJob.cooldownConfList) {
- CooldownConf cooldownConf = new
CooldownConf(replayedConf.getDbId(), replayedConf.getTableId(),
- replayedConf.getPartitionId(), replayedConf.getIndexId(),
replayedConf.getTabletId(),
- replayedConf.getCooldownReplicaId(),
replayedConf.getCooldownTerm());
- cooldownConfList.add(cooldownConf);
- }
- createTimeMs = replayedJob.createTimeMs;
- timeoutMs = replayedJob.timeoutMs;
- jobState = JobState.PENDING;
- LOG.info("replay create cooldown job: {}, conf size: {}", jobId,
cooldownConfList.size());
- }
-
- /**
- * Replay job in PENDING state. set cooldown type in Replica
- */
- private void replayPendingJob() throws CooldownException {
- for (CooldownConf cooldownConf : cooldownConfList) {
- setCooldownReplica(cooldownConf.getDbId(),
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
- cooldownConf.getIndexId(), cooldownConf.getTabletId(),
cooldownConf.getCooldownReplicaId(),
- cooldownConf.getCooldownTerm());
- if
(Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()) !=
null) {
-
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
- cooldownConf.getCooldownReplicaId());
-
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
- cooldownConf.getCooldownTerm());
- }
- }
- jobState = JobState.SEND_CONF;
- LOG.info("replay send cooldown conf, job: {}", jobId);
- }
-
- /**
- * Replay job in FINISHED state.
- * Should replay all changes in runRunningJob()
- */
- private void replayRunningJob(CooldownJob replayedJob) throws
CooldownException {
- jobState = CooldownJob.JobState.FINISHED;
- this.finishedTimeMs = replayedJob.finishedTimeMs;
- LOG.info("replay finished cooldown job: {}", jobId);
- }
-
- private void setCooldownReplica(long dbId, long tableId, long partitionId,
long indexId, long tabletId,
- long cooldownReplicaId, long cooldownTerm)
throws CooldownException {
- Database db = Env.getCurrentInternalCatalog()
- .getDbOrException(dbId, s -> new CooldownException("Database "
+ s + " does not exist"));
- OlapTable tbl;
- try {
- tbl = (OlapTable) db.getTableOrMetaException(tableId,
TableIf.TableType.OLAP);
- } catch (MetaNotFoundException e) {
- throw new CooldownException(e.getMessage());
- }
- if (tbl != null) {
- tbl.writeLock();
- try {
- Partition partition = tbl.getPartition(partitionId);
- if (partition != null) {
- MaterializedIndex index = partition.getIndex(indexId);
- if (index != null) {
- Tablet tablet = index.getTablet(tabletId);
- if (tablet != null) {
- tablet.setCooldownReplicaId(cooldownReplicaId);
- tablet.setCooldownTerm(cooldownTerm);
- LOG.info("setCooldownReplicaId to {} when cancel
job: {}:{}", cooldownReplicaId,
- tablet.getId(), jobId);
- return;
- }
- }
- }
- throw new CooldownException("set cooldown type failed.");
- } finally {
- tbl.writeUnlock();
- }
- }
- }
-
- private void cancelInternal() {
- // clear tasks if has
- AgentTaskQueue.removeBatchTask(cooldownBatchTask,
TTaskType.PUSH_COOLDOWN_CONF);
- jobState = CooldownJob.JobState.CANCELLED;
- }
-
- /**
- * Replay job in CANCELLED state.
- */
- private void replayCancelled(CooldownJob replayedJob) {
- cancelInternal();
- this.jobState = CooldownJob.JobState.CANCELLED;
- this.finishedTimeMs = replayedJob.finishedTimeMs;
- this.errMsg = replayedJob.errMsg;
- LOG.info("replay cancelled cooldown job: {}", jobId);
- }
-
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 6a9392ffee..10562b9a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -361,7 +361,7 @@ public class InternalCatalog implements CatalogIf<Database>
{
int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId,
tableId, partitionId, indexId, schemaHash,
- medium, tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
+ medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -1290,7 +1290,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : mIndex.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId,
tableId, partitionId, indexId, schemaHash,
- medium, tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
+ medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -1557,8 +1557,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(info.getDbId(),
info.getTableId(), partition.getId(),
- index.getId(), schemaHash,
info.getDataProperty().getStorageMedium(),
- tablet.getCooldownReplicaId(),
tablet.getCooldownTerm());
+ index.getId(), schemaHash,
info.getDataProperty().getStorageMedium());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
@@ -1709,8 +1708,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
// create tablets
int schemaHash = indexMeta.getSchemaHash();
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, schemaHash, storageMedium, -1,
- -1);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, schemaHash, storageMedium);
createTablets(clusterName, index, ReplicaState.NORMAL,
distributionInfo, version, replicaAlloc, tabletMeta,
tabletIdSet, idGeneratorBuffer);
@@ -2710,7 +2708,7 @@ public class InternalCatalog implements
CatalogIf<Database> {
int schemaHash =
olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : mIndex.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(db.getId(),
olapTable.getId(), partitionId, indexId,
- schemaHash, medium,
tablet.getCooldownReplicaId(), tablet.getCooldownTerm());
+ schemaHash, medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 0c0e49f617..2eb0b8dbe3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -36,7 +36,7 @@ import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
-import org.apache.doris.cooldown.CooldownJob;
+import org.apache.doris.cooldown.CooldownConfList;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -580,8 +580,8 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
- case OperationType.OP_PUSH_COOLDOWN_CONF: {
- data = CooldownJob.read(in);
+ case OperationType.OP_UPDATE_COOLDOWN_CONF: {
+ data = CooldownConfList.read(in);
isRead = true;
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index e14f87d375..d45d787ed0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.cooldown.CooldownConf;
import org.apache.doris.metric.GaugeMetric;
import org.apache.doris.metric.Metric.MetricUnit;
import org.apache.doris.metric.MetricRepo;
@@ -62,6 +63,7 @@ import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.task.PushCooldownConfTask;
import org.apache.doris.task.PushStoragePolicyTask;
import org.apache.doris.task.StorageMediaMigrationTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
@@ -94,6 +96,7 @@ import org.apache.thrift.TException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -263,6 +266,17 @@ public class ReportHandler extends Daemon {
}
}
+ private static void handlePushCooldownConf(long backendId,
List<CooldownConf> cooldownConfToPush) {
+ final int PUSH_BATCH_SIZE = 1024;
+ AgentBatchTask batchTask = new AgentBatchTask();
+ for (int start = 0; start < cooldownConfToPush.size(); start +=
PUSH_BATCH_SIZE) {
+ PushCooldownConfTask task = new PushCooldownConfTask(backendId,
+ cooldownConfToPush.subList(start, Math.min(start +
PUSH_BATCH_SIZE, cooldownConfToPush.size())));
+ batchTask.addTask(task);
+ }
+ AgentTaskExecutor.submit(batchTask);
+ }
+
private static void handlePushStoragePolicy(long backendId, List<Policy>
policyToPush,
List<Resource> resourceToPush,
List<Long> policyToDrop) {
AgentBatchTask batchTask = new AgentBatchTask();
@@ -396,8 +410,6 @@ public class ReportHandler extends Daemon {
Set<Long> tabletFoundInMeta = Sets.newConcurrentHashSet();
// storage medium -> tablet id
ListMultimap<TStorageMedium, Long> tabletMigrationMap =
LinkedListMultimap.create();
- // the cooldown type of replicas which need to be sync. tabletId ->
TabletMeta
- Map<Long, TabletMeta> syncCooldownTabletMap = new HashMap<>();
// dbid -> txn id -> [partition info]
Map<Long, ListMultimap<Long, TPartitionVersionInfo>>
transactionsToPublish = Maps.newHashMap();
@@ -408,6 +420,9 @@ public class ReportHandler extends Daemon {
List<Triple<Long, Integer, Boolean>> tabletToInMemory =
Lists.newArrayList();
+ List<CooldownConf> cooldownConfToPush = new LinkedList<>();
+ List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();
+
// 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets,
storageMediumMap,
tabletSyncMap,
@@ -418,7 +433,8 @@ public class ReportHandler extends Daemon {
transactionsToClear,
tabletRecoveryMap,
tabletToInMemory,
- syncCooldownTabletMap);
+ cooldownConfToPush,
+ cooldownConfToUpdate);
// 2. sync
if (!tabletSyncMap.isEmpty()) {
@@ -461,9 +477,12 @@ public class ReportHandler extends Daemon {
handleSetTabletInMemory(backendId, tabletToInMemory);
}
- // 10. send cooldownType which need sync to CooldownHandler
- if (!syncCooldownTabletMap.isEmpty()) {
-
Env.getCurrentEnv().getCooldownHandler().handleCooldownConf(syncCooldownTabletMap);
+ // handle cooldown conf
+ if (!cooldownConfToPush.isEmpty()) {
+ handlePushCooldownConf(backendId, cooldownConfToPush);
+ }
+ if (!cooldownConfToUpdate.isEmpty()) {
+
Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate);
}
final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index f6a71b5377..7a1f9a967e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -41,7 +41,8 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
-import org.apache.doris.cooldown.CooldownJob;
+import org.apache.doris.cooldown.CooldownConfHandler;
+import org.apache.doris.cooldown.CooldownConfList;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -720,11 +721,9 @@ public class EditLog {
}
break;
}
- case OperationType.OP_PUSH_COOLDOWN_CONF:
- if (Config.cooldown_single_remote_file) {
- CooldownJob cooldownJob = (CooldownJob)
journal.getData();
-
env.getCooldownHandler().replayCooldownJob(cooldownJob);
- }
+ case OperationType.OP_UPDATE_COOLDOWN_CONF:
+ CooldownConfList cooldownConfList = (CooldownConfList)
journal.getData();
+
CooldownConfHandler.replayUpdateCooldownConf(cooldownConfList);
break;
case OperationType.OP_BATCH_ADD_ROLLUP: {
BatchAlterJobPersistInfo batchAlterJobV2 =
(BatchAlterJobPersistInfo) journal.getData();
@@ -1517,8 +1516,8 @@ public class EditLog {
logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
}
- public void logCooldownJob(CooldownJob cooldownJob) {
- logEdit(OperationType.OP_PUSH_COOLDOWN_CONF, cooldownJob);
+ public void logUpdateCooldownConf(CooldownConfList cooldownConf) {
+ logEdit(OperationType.OP_UPDATE_COOLDOWN_CONF, cooldownConf);
}
public void logBatchAlterJob(BatchAlterJobPersistInfo batchAlterJobV2) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 73ead272e4..256331444f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -76,8 +76,6 @@ public class OperationType {
//schema change for add and drop columns
public static final short OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 128;
- // set cooldown conf in replica
- public static final short OP_PUSH_COOLDOWN_CONF = 129;
// 30~39 130~139 230~239 ...
// load job for only hadoop load
@@ -266,6 +264,8 @@ public class OperationType {
public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
public static final short OP_ALTER_USER = 400;
+ // cooldown conf
+ public static final short OP_UPDATE_COOLDOWN_CONF = 401;
/**
* Get opcode name by op code.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
index 0198a6b20f..c01b868f02 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
@@ -203,12 +203,6 @@ public class MetaPersistMethod {
metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveMTMVJobManager",
CountingDataOutputStream.class, long.class);
break;
- case "cooldownJob":
- metaPersistMethod.readMethod =
Env.class.getDeclaredMethod("loadCooldownJob", DataInputStream.class,
- long.class);
- metaPersistMethod.writeMethod =
Env.class.getDeclaredMethod("saveCooldownJob",
- CountingDataOutputStream.class, long.class);
- break;
default:
break;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
index 2074ffe539..2737c052f3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
@@ -38,7 +38,7 @@ public class PersistMetaModules {
"masterInfo", "frontends", "backends", "datasource", "db",
"alterJob", "recycleBin",
"globalVariable", "cluster", "broker", "resources", "exportJob",
"syncJob", "backupHandler",
"paloAuth", "transactionState", "colocateTableIndex",
"routineLoadJobs", "loadJobV2", "smallFiles",
- "plugins", "deleteHandler", "sqlBlockRule", "policy",
"mtmvJobManager", "cooldownJob");
+ "plugins", "deleteHandler", "sqlBlockRule", "policy",
"mtmvJobManager");
static {
MODULES_MAP = Maps.newHashMap();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
index 42e788b537..1c8a0130cc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
@@ -249,7 +249,7 @@ public class CatalogMocker {
Tablet tablet0 = new Tablet(TEST_TABLET0_ID);
TabletMeta tabletMeta = new TabletMeta(TEST_DB_ID, TEST_TBL_ID,
TEST_SINGLE_PARTITION_ID,
- TEST_TBL_ID, SCHEMA_HASH,
TStorageMedium.HDD, -1, -1);
+ TEST_TBL_ID, SCHEMA_HASH,
TStorageMedium.HDD);
baseIndex.addTablet(tablet0, tabletMeta);
Replica replica0 = new Replica(TEST_REPLICA0_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica1 = new Replica(TEST_REPLICA1_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -323,7 +323,7 @@ public class CatalogMocker {
Tablet baseTabletP1 = new Tablet(TEST_BASE_TABLET_P1_ID);
TabletMeta tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION1_ID,
- TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
+ TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD);
baseIndexP1.addTablet(baseTabletP1, tabletMetaBaseTabletP1);
Replica replica3 = new Replica(TEST_REPLICA3_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica4 = new Replica(TEST_REPLICA4_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -335,7 +335,7 @@ public class CatalogMocker {
Tablet baseTabletP2 = new Tablet(TEST_BASE_TABLET_P2_ID);
TabletMeta tabletMetaBaseTabletP2 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION2_ID,
- TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
+ TEST_TBL2_ID,
SCHEMA_HASH, TStorageMedium.HDD);
baseIndexP2.addTablet(baseTabletP2, tabletMetaBaseTabletP2);
Replica replica6 = new Replica(TEST_REPLICA6_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica7 = new Replica(TEST_REPLICA7_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -356,7 +356,7 @@ public class CatalogMocker {
Tablet rollupTabletP1 = new Tablet(TEST_ROLLUP_TABLET_P1_ID);
TabletMeta tabletMetaRollupTabletP1 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION1_ID,
TEST_ROLLUP_TABLET_P1_ID, ROLLUP_SCHEMA_HASH,
-
TStorageMedium.HDD, -1, -1);
+
TStorageMedium.HDD);
rollupIndexP1.addTablet(rollupTabletP1, tabletMetaRollupTabletP1);
Replica replica9 = new Replica(TEST_REPLICA9_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica10 = new Replica(TEST_REPLICA10_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
@@ -373,7 +373,7 @@ public class CatalogMocker {
Tablet rollupTabletP2 = new Tablet(TEST_ROLLUP_TABLET_P2_ID);
TabletMeta tabletMetaRollupTabletP2 = new TabletMeta(TEST_DB_ID,
TEST_TBL2_ID, TEST_PARTITION1_ID,
TEST_ROLLUP_TABLET_P2_ID, ROLLUP_SCHEMA_HASH,
-
TStorageMedium.HDD, -1, -1);
+
TStorageMedium.HDD);
rollupIndexP2.addTablet(rollupTabletP2, tabletMetaRollupTabletP2);
Replica replica12 = new Replica(TEST_REPLICA12_ID, BACKEND1_ID, 0,
ReplicaState.NORMAL);
Replica replica13 = new Replica(TEST_REPLICA13_ID, BACKEND2_ID, 0,
ReplicaState.NORMAL);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index d79855ce67..5e022a56e2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -193,7 +193,7 @@ public class CatalogTestUtil {
// index
MaterializedIndex index = new MaterializedIndex(indexId,
IndexState.NORMAL);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD, -1, -1);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD);
index.addTablet(tablet, tabletMeta);
tablet.addReplica(replica1);
@@ -261,7 +261,7 @@ public class CatalogTestUtil {
// index
MaterializedIndex index = new MaterializedIndex(testIndexId2,
IndexState.NORMAL);
TabletMeta tabletMeta = new TabletMeta(testDbId1, testTableId2,
testPartitionId2, testIndexId2, 0,
- TStorageMedium.HDD, -1, -1);
+ TStorageMedium.HDD);
index.addTablet(tablet, tabletMeta);
tablet.addReplica(replica);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index 14e20a1bd7..d7fdb2694a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -66,7 +66,7 @@ public class TabletTest {
};
tablet = new Tablet(1);
- TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD, -1, -1);
+ TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1,
TStorageMedium.HDD);
invertedIndex.addTablet(1, tabletMeta);
replica1 = new Replica(1L, 1L, 100L, 0, 200000L, 0, 3000L,
ReplicaState.NORMAL, 0, 0);
replica2 = new Replica(2L, 2L, 100L, 0, 200000L, 0, 3000L,
ReplicaState.NORMAL, 0, 0);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
index 1ac66444ef..c81c3839c4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
@@ -145,16 +145,16 @@ public class ClusterLoadStatisticsTest {
// tablet
invertedIndex = new TabletInvertedIndex();
- invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD, -1, -1));
+ invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD));
invertedIndex.addReplica(50000, new Replica(50001, be1.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(50000, new Replica(50002, be2.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(50000, new Replica(50003, be3.getId(), 0,
ReplicaState.NORMAL));
- invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD, -1, -1));
+ invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD));
invertedIndex.addReplica(60000, new Replica(60002, be2.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(60000, new Replica(60003, be3.getId(), 0,
ReplicaState.NORMAL));
- invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD, -1, -1));
+ invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5,
TStorageMedium.HDD));
invertedIndex.addReplica(70000, new Replica(70002, be2.getId(), 0,
ReplicaState.NORMAL));
invertedIndex.addReplica(70000, new Replica(70003, be3.getId(), 0,
ReplicaState.NORMAL));
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 756510de77..88f02df4cb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -81,7 +81,7 @@ public class RebalancerTestUtil {
int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(),
- partition.getId(), baseIndex.getId(), schemaHash, medium, -1,
-1);
+ partition.getId(), baseIndex.getId(), schemaHash, medium);
Tablet tablet = new Tablet(tabletId);
// add tablet to olapTable
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
index d933b80982..4460fcb31f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
@@ -78,7 +78,7 @@ public class UnitTestUtil {
// index
MaterializedIndex index = new MaterializedIndex(indexId,
IndexState.NORMAL);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD, -1, -1);
+ TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 0, TStorageMedium.HDD);
index.addTablet(tablet, tabletMeta);
tablet.addReplica(replica1);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
deleted file mode 100644
index 6e16e9e945..0000000000
--- a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionInfo;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.TabletMeta;
-import org.apache.doris.cluster.Cluster;
-import org.apache.doris.persist.EditLog;
-import org.apache.doris.thrift.TStorageMedium;
-
-import mockit.Mocked;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class CooldownJobTest {
-
- private static long jobId = 100L;
- private static long dbId = 101L;
- private static long tableId = 102L;
- private static long partitionId = 103L;
- private static long indexId = 104L;
- private static long tabletId = 105L;
- private static long replicaId = 106L;
- private static long backendId = 107L;
- private static long cooldownReplicaId = 106L;
- private static long cooldownTerm = 109L;
- private static long timeoutMs = 10000L;
- private static Tablet tablet = new Tablet(tabletId);
- private static Replica replica = new Replica(replicaId, backendId, 1,
Replica.ReplicaState.NORMAL);
-
- private static CooldownConf cooldownConf = new CooldownConf(dbId, tableId,
partitionId, indexId, tabletId,
- cooldownReplicaId, cooldownTerm);
-
- private static List<CooldownConf> cooldownConfList = new LinkedList<>();
-
- @Mocked
- private EditLog editLog;
-
- public static CooldownJob createCooldownJob() {
- tablet.setCooldownReplicaId(cooldownReplicaId);
- tablet.setCooldownTerm(cooldownTerm);
- cooldownConfList.add(cooldownConf);
- return new CooldownJob(jobId, cooldownConfList, timeoutMs);
- }
-
- @Before
- public void setUp() {
- Cluster testCluster = new Cluster("test_cluster", 0);
- Database db = new Database(dbId, "db1");
- db.setClusterName("test_cluster");
- Env.getCurrentEnv().addCluster(testCluster);
- Env.getCurrentEnv().unprotectCreateDb(db);
- OlapTable table = new OlapTable(tableId, "testTable", new
ArrayList<>(), KeysType.DUP_KEYS,
- new PartitionInfo(), null);
- table.setId(tableId);
- db.createTable(table);
- MaterializedIndex baseIndex = new MaterializedIndex();
- baseIndex.setIdForRestore(indexId);
- Partition partition = new Partition(partitionId, "part1", baseIndex,
null);
- table.addPartition(partition);
- TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId,
indexId, 1, TStorageMedium.HDD,
- cooldownReplicaId, cooldownTerm);
- baseIndex.addTablet(tablet, tabletMeta);
- tablet.addReplica(replica);
- Env.getCurrentEnv().setEditLog(editLog);
- }
-
- @Test
- public void testPending() throws Exception {
- CooldownJob cooldownJob = createCooldownJob();
- cooldownJob.runPendingJob();
- Assert.assertEquals(CooldownJob.JobState.SEND_CONF,
cooldownJob.jobState);
- for (CooldownConf conf : cooldownJob.getCooldownConfList()) {
- Assert.assertEquals(conf.getCooldownReplicaId(), replica.getId());
- }
- CooldownJob job1 = createCooldownJob();
- job1.replay(cooldownJob);
- Assert.assertEquals(CooldownJob.JobState.SEND_CONF, job1.jobState);
- // run send job
- cooldownJob.runSendJob();
- Assert.assertEquals(CooldownJob.JobState.RUNNING,
cooldownJob.jobState);
- // run replay finish job
- cooldownJob.jobState = CooldownJob.JobState.FINISHED;
- job1.replay(cooldownJob);
- Assert.assertEquals(CooldownJob.JobState.FINISHED, job1.jobState);
- }
-
- @Test
- public void testCancelJob() throws Exception {
- CooldownJob cooldownJob = createCooldownJob();
- cooldownJob.runPendingJob();
- Assert.assertEquals(CooldownJob.JobState.SEND_CONF,
cooldownJob.jobState);
- Assert.assertEquals(cooldownReplicaId, replica.getId());
- // run send job
- cooldownJob.runSendJob();
- Assert.assertEquals(CooldownJob.JobState.RUNNING,
cooldownJob.jobState);
- // run cancel job
- cooldownJob.cancelImpl("test cancel");
- Assert.assertEquals(CooldownJob.JobState.CANCELLED,
cooldownJob.jobState);
- }
-
-}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
index df8ed5a316..2871ff3596 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
@@ -157,7 +157,7 @@ public abstract class DorisHttpTestCase {
// index
MaterializedIndex baseIndex = new MaterializedIndex(testIndexId,
MaterializedIndex.IndexState.NORMAL);
TabletMeta tabletMeta = new TabletMeta(testDbId, testTableId,
testPartitionId, testIndexId, testSchemaHash,
- TStorageMedium.HDD, -1, -1);
+ TStorageMedium.HDD);
baseIndex.addTablet(tablet, tabletMeta);
tablet.addReplica(replica1);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index e464517d7f..5c2c005972 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -116,7 +116,7 @@ public class DeleteHandlerTest {
auth = AccessTestUtil.fetchAdminAccess();
analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
db = CatalogMocker.mockDb();
- TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID,
TBL_ID, 0, null, -1, -1);
+ TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID,
TBL_ID, 0, null);
invertedIndex.addTablet(TABLET_ID, tabletMeta);
invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_1,
BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_2,
BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index 70112f4a5c..fe98c7ef4e 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -43,8 +43,8 @@ struct TTabletInfo {
// data size on remote storage
16: optional Types.TSize remote_data_size
17: optional Types.TReplicaId cooldown_replica_id
- 18: optional bool is_cooldown = false
- 19: optional i64 cooldown_term = -1
+ // 18: optional bool is_cooldown
+ 19: optional i64 cooldown_term
}
struct TFinishTaskRequest {
diff --git a/regression-test/suites/cold_heat_separation/policy/alter.groovy
b/regression-test/suites/cold_heat_separation/policy/alter.groovy
index e56c235bb6..781a619ad9 100644
--- a/regression-test/suites/cold_heat_separation/policy/alter.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/alter.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("alter_policy") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def has_resouce_policy_alter = sql """
SHOW RESOURCES WHERE NAME = "has_resouce_policy_alter";
"""
diff --git a/regression-test/suites/cold_heat_separation/policy/create.groovy
b/regression-test/suites/cold_heat_separation/policy/create.groovy
index 19f259065e..111dc00f27 100644
--- a/regression-test/suites/cold_heat_separation/policy/create.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/create.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("create_policy") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def has_created_1 = sql """
SHOW RESOURCES WHERE NAME = "crete_policy_1";
"""
diff --git a/regression-test/suites/cold_heat_separation/policy/drop.groovy
b/regression-test/suites/cold_heat_separation/policy/drop.groovy
index c04b1afdcb..f29d27c530 100644
--- a/regression-test/suites/cold_heat_separation/policy/drop.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/drop.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("drop_policy") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def storage_exist = { name ->
def show_storage_policy = sql """
SHOW STORAGE POLICY;
diff --git a/regression-test/suites/cold_heat_separation/policy/show.groovy
b/regression-test/suites/cold_heat_separation/policy/show.groovy
index c18f3cab8b..5d750e0f3b 100644
--- a/regression-test/suites/cold_heat_separation/policy/show.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/show.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("show_policy") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def get_storage_policy = { name ->
def show_storage_policy = sql """
SHOW STORAGE POLICY;
diff --git
a/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
b/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
index 0c02dc2888..b793465f46 100644
---
a/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
+++
b/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("add_table_policy_by_alter_table") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def create_table_not_have_policy_result = try_sql """
CREATE TABLE IF NOT EXISTS create_table_not_have_policy
(
diff --git
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
index 8f1fcba080..37adbbd2ef 100644
---
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
+++
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("create_table_use_partition_policy") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def cooldown_ttl = "10"
def create_table_partition_use_not_create_policy = try_sql """
diff --git
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
index 021ffd33e1..c94065f5ae 100644
---
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
+++
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("create_table_use_policy") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def cooldown_ttl = "10"
def create_table_use_not_create_policy = try_sql """
diff --git
a/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
b/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
index 0998cc4b1d..cb9c807729 100644
---
a/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
+++
b/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
@@ -19,8 +19,6 @@ import java.text.SimpleDateFormat;
import java.util.Date;
suite("add_table_policy_by_modify_partition") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
Date date = new Date(System.currentTimeMillis() + 3600000)
def cooldownTime = format.format(date)
diff --git
a/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
b/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
index 45bd4a77d3..b11344198e 100644
---
a/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
+++
b/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
@@ -16,8 +16,6 @@
// under the License.
suite("use_default_storage_policy") {
- sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
def storage_exist = { name ->
def show_storage_policy = sql """
SHOW STORAGE POLICY;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]