This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e1f1386395 [fix](cooldown) Rewrite update cooldown conf (#16488)
e1f1386395 is described below

commit e1f13863950237d96e8488dd42fa288cc6df94b7
Author: plat1ko <[email protected]>
AuthorDate: Thu Feb 9 09:12:55 2023 +0800

    [fix](cooldown) Rewrite update cooldown conf (#16488)
    
    Remove error-prone CooldownJob, and use CooldownConfHandler to update 
Tablet's cooldown conf.
    Some bug fix about cooldown.
---
 be/src/agent/task_worker_pool.cpp                  |  11 +-
 be/src/olap/olap_server.cpp                        |   2 +-
 be/src/olap/tablet.cpp                             |  16 +-
 be/src/olap/tablet.h                               |   7 +-
 .../main/java/org/apache/doris/common/Config.java  |  14 +-
 .../doris/alter/MaterializedViewHandler.java       |   2 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |   2 +-
 .../apache/doris/alter/SchemaChangeHandler.java    |   2 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |   2 +-
 .../java/org/apache/doris/backup/RestoreJob.java   |   9 +-
 .../apache/doris/catalog/CatalogRecycleBin.java    |   6 +-
 .../main/java/org/apache/doris/catalog/Env.java    |  30 +-
 .../main/java/org/apache/doris/catalog/Tablet.java |  30 +-
 .../apache/doris/catalog/TabletInvertedIndex.java  | 103 ++++--
 .../java/org/apache/doris/catalog/TabletMeta.java  |  60 +--
 .../org/apache/doris/cooldown/CooldownConf.java    |  76 +---
 .../apache/doris/cooldown/CooldownConfHandler.java | 138 +++++++
 .../apache/doris/cooldown/CooldownConfList.java    |  53 +++
 .../apache/doris/cooldown/CooldownException.java   |  32 --
 .../org/apache/doris/cooldown/CooldownHandler.java | 192 ----------
 .../org/apache/doris/cooldown/CooldownJob.java     | 401 ---------------------
 .../apache/doris/datasource/InternalCatalog.java   |  12 +-
 .../org/apache/doris/journal/JournalEntity.java    |   6 +-
 .../org/apache/doris/master/ReportHandler.java     |  31 +-
 .../java/org/apache/doris/persist/EditLog.java     |  15 +-
 .../org/apache/doris/persist/OperationType.java    |   4 +-
 .../doris/persist/meta/MetaPersistMethod.java      |   6 -
 .../doris/persist/meta/PersistMetaModules.java     |   2 +-
 .../org/apache/doris/backup/CatalogMocker.java     |  10 +-
 .../org/apache/doris/catalog/CatalogTestUtil.java  |   4 +-
 .../java/org/apache/doris/catalog/TabletTest.java  |   2 +-
 .../doris/clone/ClusterLoadStatisticsTest.java     |   6 +-
 .../org/apache/doris/clone/RebalancerTestUtil.java |   2 +-
 .../org/apache/doris/common/util/UnitTestUtil.java |   2 +-
 .../org/apache/doris/cooldown/CooldownJobTest.java | 130 -------
 .../org/apache/doris/http/DorisHttpTestCase.java   |   2 +-
 .../org/apache/doris/load/DeleteHandlerTest.java   |   2 +-
 gensrc/thrift/MasterService.thrift                 |   4 +-
 .../cold_heat_separation/policy/alter.groovy       |   2 -
 .../cold_heat_separation/policy/create.groovy      |   2 -
 .../suites/cold_heat_separation/policy/drop.groovy |   2 -
 .../suites/cold_heat_separation/policy/show.groovy |   2 -
 .../use_policy/alter_table_add_policy.groovy       |   2 -
 .../create_table_use_partition_policy.groovy       |   2 -
 .../use_policy/create_table_use_policy.groovy      |   2 -
 .../use_policy/modify_partition_add_policy.groovy  |   2 -
 .../use_policy/use_default_storage_policy.groovy   |   2 -
 47 files changed, 401 insertions(+), 1045 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index a61ea17ba9..2c6b757c0f 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -268,7 +268,8 @@ void TaskWorkerPool::notify_thread() {
 }
 
 bool TaskWorkerPool::_register_task_info(const TTaskType::type task_type, 
int64_t signature) {
-    if (task_type == TTaskType::type::PUSH_STORAGE_POLICY) {
+    if (task_type == TTaskType::type::PUSH_STORAGE_POLICY ||
+        task_type == TTaskType::type::PUSH_COOLDOWN_CONF) {
         // no need to report task of these types
         return true;
     }
@@ -1716,14 +1717,6 @@ void 
TaskWorkerPool::_push_cooldown_conf_worker_thread_callback() {
             agent_task_req = _tasks.front();
             _tasks.pop_front();
         }
-        // FIXME(plat1ko): no need to save cooldown conf job state in FE
-        TFinishTaskRequest finish_task_request;
-        finish_task_request.__set_backend(_backend);
-        finish_task_request.__set_task_type(agent_task_req.task_type);
-        finish_task_request.__set_signature(agent_task_req.signature);
-        finish_task_request.__set_task_status(Status::OK().to_thrift());
-        _finish_task(finish_task_request);
-        _remove_task_info(agent_task_req.task_type, agent_task_req.signature);
 
         TPushCooldownConfReq& push_cooldown_conf_req = 
agent_task_req.push_cooldown_conf;
         for (auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 98dca4ea1c..9764680ff9 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -729,7 +729,7 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
                 Status st = tablet->cooldown();
                 if (!st.ok()) {
                     LOG(WARNING) << "failed to cooldown, tablet: " << 
tablet->tablet_id()
-                                 << " err: " << st.to_string();
+                                 << " err: " << st;
                 } else {
                     LOG(INFO) << "succeed to cooldown, tablet: " << 
tablet->tablet_id()
                               << " cooldown progress ("
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 009513748f..92b203b339 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1392,9 +1392,9 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info,
     
tablet_info->__set_is_in_memory(_tablet_meta->tablet_schema()->is_in_memory());
     tablet_info->__set_replica_id(replica_id());
     tablet_info->__set_remote_data_size(_tablet_meta->tablet_remote_size());
-    tablet_info->__set_is_cooldown(_tablet_meta->storage_policy_id() > 0);
-    if (tablet_info->is_cooldown) {
+    if (tablet_state() == TABLET_RUNNING && _tablet_meta->storage_policy_id() 
> 0) {
         tablet_info->__set_cooldown_replica_id(_cooldown_replica_id);
+        tablet_info->__set_cooldown_term(_cooldown_term);
     }
 }
 
@@ -1645,7 +1645,7 @@ Status Tablet::cooldown() {
     }
     int64_t cooldown_replica_id = _cooldown_replica_id;
     if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
-        return Status::OK();
+        return Status::InternalError("invalid cooldown_replica_id");
     }
     auto storage_policy = get_storage_policy(storage_policy_id());
     if (storage_policy == nullptr) {
@@ -1768,6 +1768,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* 
fs, RowsetMeta* new_rs
 }
 
 Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t 
cooldown_replica_id) {
+    LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
+              << " cooldown_replica_id=" << cooldown_replica_id;
     TabletMetaPB cooldown_meta_pb;
     RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id, 
&cooldown_meta_pb));
     DCHECK(cooldown_meta_pb.rs_metas_size() > 0);
@@ -1788,12 +1790,13 @@ Status 
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
         }
     }
     if (!version_aligned) {
-        LOG(INFO) << "cooldowned version is not aligned";
-        return Status::OK();
+        return Status::InternalError("cooldowned version is not aligned");
     }
     for (auto& [v, rs] : _rs_version_map) {
         if (v.second <= cooldowned_version) {
             overlap_rowsets.push_back(rs);
+        } else if (!rs->is_local()) {
+            return Status::InternalError("cooldowned version larger than that 
to follow");
         }
     }
     std::sort(overlap_rowsets.begin(), overlap_rowsets.end(), 
Rowset::comparator);
@@ -1854,6 +1857,9 @@ RowsetSharedPtr Tablet::pick_cooldown_rowset() {
             }
         }
     }
+    if (!rowset) {
+        return nullptr;
+    }
     if (min_local_version != cooldowned_version + 1) { // ensure version 
continuity
         if (UNLIKELY(cooldowned_version != -1)) {
             LOG(WARNING) << "version not continuous. tablet_id=" << tablet_id()
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 934dd04298..c56808e8b5 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -305,9 +305,10 @@ public:
 
     void update_cooldown_conf(int64_t cooldown_term, int64_t 
cooldown_replica_id) {
         if (cooldown_term > _cooldown_term) {
-            LOG(INFO) << "update cooldown conf. cooldown_replica_id: " << 
_cooldown_replica_id
-                      << " -> " << cooldown_replica_id << ", cooldown_term: " 
<< _cooldown_term
-                      << " -> " << cooldown_term;
+            LOG(INFO) << "update cooldown conf. tablet_id=" << tablet_id()
+                      << " cooldown_replica_id: " << _cooldown_replica_id << " 
-> "
+                      << cooldown_replica_id << ", cooldown_term: " << 
_cooldown_term << " -> "
+                      << cooldown_term;
             _cooldown_replica_id = cooldown_replica_id;
             _cooldown_term = cooldown_term;
         }
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 6edbc5a160..ef325d357c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -802,13 +802,6 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static int alter_table_timeout_second = 86400 * 30; // 1month
-    /**
-     * Maximal timeout of push cooldown conf request.
-     */
-    @ConfField(mutable = false, masterOnly = true)
-    public static boolean cooldown_single_remote_file = false;
-    @ConfField(mutable = false, masterOnly = true)
-    public static int push_cooldown_conf_timeout_second = 600; // 10 min
     /**
      * If a backend is down for *max_backend_down_time_second*, a BACKEND_DOWN 
event will be triggered.
      * Do not set this if you know what you are doing.
@@ -1932,11 +1925,10 @@ public class Config extends ConfigBase {
     public static int max_same_name_catalog_trash_num = 3;
 
     /**
-     * The storage policy is still under developement.
-     * Disable it by default.
+     * NOTE: The storage policy is still under developement.
      */
-    @ConfField(mutable = true, masterOnly = true)
-    public static boolean enable_storage_policy = false;
+    @ConfField(mutable = false, masterOnly = true)
+    public static boolean enable_storage_policy = true;
 
     /**
      * This config is mainly used in the k8s cluster environment.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
index ef760bd0a6..851e4c7016 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java
@@ -379,7 +379,7 @@ public class MaterializedViewHandler extends AlterHandler {
             short replicationNum = 
olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
             for (Tablet baseTablet : baseIndex.getTablets()) {
                 TabletMeta mvTabletMeta = new TabletMeta(
-                        dbId, tableId, partitionId, mvIndexId, mvSchemaHash, 
medium, -1, 0);
+                        dbId, tableId, partitionId, mvIndexId, mvSchemaHash, 
medium);
                 long baseTabletId = baseTablet.getId();
                 long mvTabletId = idGeneratorBuffer.getNextId();
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index d83c9a6f70..b47052c601 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -658,7 +658,7 @@ public class RollupJobV2 extends AlterJobV2 implements 
GsonPostProcessable {
 
             for (Tablet rollupTablet : rollupIndex.getTablets()) {
                 TabletMeta rollupTabletMeta = new TabletMeta(dbId, tableId, 
partitionId, rollupIndexId,
-                        rollupSchemaHash, medium, -1, 0);
+                        rollupSchemaHash, medium);
                 invertedIndex.addTablet(rollupTablet.getId(), 
rollupTabletMeta);
                 for (Replica rollupReplica : rollupTablet.getReplicas()) {
                     invertedIndex.addReplica(rollupTablet.getId(), 
rollupReplica);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
index a330923582..29ff80ec79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java
@@ -1493,7 +1493,7 @@ public class SchemaChangeHandler extends AlterHandler {
                 Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
                 for (Tablet originTablet : originIndex.getTablets()) {
                     TabletMeta shadowTabletMeta = new TabletMeta(dbId, 
tableId, partitionId, shadowIndexId,
-                            newSchemaHash, medium, -1, 0);
+                            newSchemaHash, medium);
                     long originTabletId = originTablet.getId();
                     long shadowTabletId = idGeneratorBuffer.getNextId();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 3731f7cb88..757898d0ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -746,7 +746,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
                 for (Tablet shadownTablet : shadowIndex.getTablets()) {
                     TabletMeta shadowTabletMeta = new TabletMeta(dbId, 
tableId, partitionId, shadowIndexId,
-                            
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium, -1, 0);
+                            
indexSchemaVersionAndHashMap.get(shadowIndexId).schemaHash, medium);
                     invertedIndex.addTablet(shadownTablet.getId(), 
shadowTabletMeta);
                     for (Replica shadowReplica : shadownTablet.getReplicas()) {
                         invertedIndex.addReplica(shadownTablet.getId(), 
shadowReplica);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index c01aa2b685..8b894fc920 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -988,8 +988,7 @@ public class RestoreJob extends AbstractJob {
             MaterializedIndexMeta indexMeta = 
localTbl.getIndexMetaByIndexId(restoredIdx.getId());
             for (Tablet restoreTablet : restoredIdx.getTablets()) {
                 TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
-                        restoredIdx.getId(), indexMeta.getSchemaHash(), 
TStorageMedium.HDD,
-                        restoreTablet.getCooldownReplicaId(), 
restoreTablet.getCooldownTerm());
+                        restoredIdx.getId(), indexMeta.getSchemaHash(), 
TStorageMedium.HDD);
                 Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), 
tabletMeta);
                 for (Replica restoreReplica : restoreTablet.getReplicas()) {
                     
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1177,8 +1176,7 @@ public class RestoreJob extends AbstractJob {
                 int schemaHash = 
localTbl.getSchemaHashByIndexId(restoreIdx.getId());
                 for (Tablet restoreTablet : restoreIdx.getTablets()) {
                     TabletMeta tabletMeta = new TabletMeta(db.getId(), 
localTbl.getId(), restorePart.getId(),
-                            restoreIdx.getId(), schemaHash, 
TStorageMedium.HDD, restoreTablet.getCooldownReplicaId(),
-                            restoreTablet.getCooldownTerm());
+                            restoreIdx.getId(), schemaHash, 
TStorageMedium.HDD);
                     
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
                     for (Replica restoreReplica : restoreTablet.getReplicas()) 
{
                         
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
@@ -1210,8 +1208,7 @@ public class RestoreJob extends AbstractJob {
                         int schemaHash = 
olapRestoreTbl.getSchemaHashByIndexId(restoreIdx.getId());
                         for (Tablet restoreTablet : restoreIdx.getTablets()) {
                             TabletMeta tabletMeta = new TabletMeta(db.getId(), 
restoreTbl.getId(), restorePart.getId(),
-                                    restoreIdx.getId(), schemaHash, 
TStorageMedium.HDD,
-                                    restoreTablet.getCooldownReplicaId(), 
restoreTablet.getCooldownTerm());
+                                    restoreIdx.getId(), schemaHash, 
TStorageMedium.HDD);
                             
Env.getCurrentInvertedIndex().addTablet(restoreTablet.getId(), tabletMeta);
                             for (Replica restoreReplica : 
restoreTablet.getReplicas()) {
                                 
Env.getCurrentInvertedIndex().addReplica(restoreTablet.getId(), restoreReplica);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
index ec2588ae2e..dadb0f722e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CatalogRecycleBin.java
@@ -840,8 +840,7 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
                     long indexId = index.getId();
                     int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
                     for (Tablet tablet : index.getTablets()) {
-                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium,
-                                tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
+                        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium);
                         long tabletId = tablet.getId();
                         invertedIndex.addTablet(tabletId, tabletMeta);
                         for (Replica replica : tablet.getReplicas()) {
@@ -893,8 +892,7 @@ public class CatalogRecycleBin extends MasterDaemon 
implements Writable {
                 long indexId = index.getId();
                 int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
                 for (Tablet tablet : index.getTablets()) {
-                    TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium,
-                            tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
+                    TabletMeta tabletMeta = new TabletMeta(dbId, tableId, 
partitionId, indexId, schemaHash, medium);
                     long tabletId = tablet.getId();
                     invertedIndex.addTablet(tabletId, tabletMeta);
                     for (Replica replica : tablet.getReplicas()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 34f0dafdd2..15d38f61c6 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -122,7 +122,7 @@ import org.apache.doris.common.util.SmallFileMgr;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.consistency.ConsistencyChecker;
-import org.apache.doris.cooldown.CooldownHandler;
+import org.apache.doris.cooldown.CooldownConfHandler;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.CatalogMgr;
 import org.apache.doris.datasource.EsExternalCatalog;
@@ -314,7 +314,7 @@ public class Env {
     private DeleteHandler deleteHandler;
     private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
     private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector;
-    private CooldownHandler cooldownHandler;
+    private CooldownConfHandler cooldownConfHandler;
     private MetastoreEventsProcessor metastoreEventsProcessor;
 
     private MasterDaemon labelCleaner; // To clean old LabelInfo, 
ExportJobInfos
@@ -551,7 +551,9 @@ public class Env {
         this.deleteHandler = new DeleteHandler();
         this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
         this.partitionInMemoryInfoCollector = new 
PartitionInMemoryInfoCollector();
-        this.cooldownHandler = new CooldownHandler();
+        if (Config.enable_storage_policy) {
+            this.cooldownConfHandler = new CooldownConfHandler();
+        }
         this.metastoreEventsProcessor = new MetastoreEventsProcessor();
 
         this.replayedJournalId = new AtomicLong(0L);
@@ -1398,8 +1400,8 @@ public class Env {
         dbUsedDataQuotaInfoCollector.start();
         // start daemon thread to update global partition in memory 
information periodically
         partitionInMemoryInfoCollector.start();
-        if (Config.cooldown_single_remote_file) {
-            cooldownHandler.start();
+        if (Config.enable_storage_policy) {
+            cooldownConfHandler.start();
         }
         streamLoadRecordMgr.start();
         getInternalCatalog().getIcebergTableCreationRecordMgr().start();
@@ -1738,12 +1740,6 @@ public class Env {
         return checksum;
     }
 
-    public long loadCooldownJob(DataInputStream dis, long checksum) throws 
IOException {
-        cooldownHandler.readField(dis);
-        LOG.info("finished replay loadCooldownJob from image");
-        return checksum;
-    }
-
     public long loadAlterJob(DataInputStream dis, long checksum) throws 
IOException {
         long newChecksum = checksum;
         for (JobType type : JobType.values()) {
@@ -2155,14 +2151,6 @@ public class Env {
         return checksum;
     }
 
-    /**
-     * Save CooldownJob.
-     */
-    public long saveCooldownJob(CountingDataOutputStream out, long checksum) 
throws IOException {
-        Env.getCurrentEnv().getCooldownHandler().write(out);
-        return checksum;
-    }
-
     public long saveMTMVJobManager(CountingDataOutputStream out, long 
checksum) throws IOException {
         if (Config.enable_mtmv_scheduler_framework) {
             Env.getCurrentEnv().getMTMVJobManager().write(out, checksum);
@@ -3387,8 +3375,8 @@ public class Env {
         return (MaterializedViewHandler) 
this.alter.getMaterializedViewHandler();
     }
 
-    public CooldownHandler getCooldownHandler() {
-        return cooldownHandler;
+    public CooldownConfHandler getCooldownConfHandler() {
+        return cooldownConfHandler;
     }
 
     public SystemHandler getClusterHandler() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
index 7f594afa7d..69964693c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Tablet.java
@@ -49,6 +49,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
@@ -90,10 +91,13 @@ public class Tablet extends MetaObject implements Writable {
     private long checkedVersionHash;
     @SerializedName(value = "isConsistent")
     private boolean isConsistent;
+
+    // cooldown conf
     @SerializedName(value = "cooldownReplicaId")
-    private long cooldownReplicaId;
+    private long cooldownReplicaId = -1;
     @SerializedName(value = "cooldownTerm")
-    private long cooldownTerm;
+    private long cooldownTerm = -1;
+    private ReentrantReadWriteLock cooldownConfLock = new 
ReentrantReadWriteLock();
 
     // last time that the tablet checker checks this tablet.
     // no need to persist
@@ -143,20 +147,20 @@ public class Tablet extends MetaObject implements 
Writable {
         return isConsistent;
     }
 
-    public long getCooldownReplicaId() {
-        return cooldownReplicaId;
-    }
-
-    public void setCooldownReplicaId(long cooldownReplicaId) {
+    public void setCooldownConf(long cooldownReplicaId, long cooldownTerm) {
+        cooldownConfLock.writeLock().lock();
         this.cooldownReplicaId = cooldownReplicaId;
+        this.cooldownTerm = cooldownTerm;
+        cooldownConfLock.writeLock().unlock();
     }
 
-    public long getCooldownTerm() {
-        return cooldownTerm;
-    }
-
-    public void setCooldownTerm(long cooldownTerm) {
-        this.cooldownTerm = cooldownTerm;
+    public Pair<Long, Long> getCooldownConf() {
+        cooldownConfLock.readLock().lock();
+        try {
+            return Pair.of(cooldownReplicaId, cooldownTerm);
+        } finally {
+            cooldownConfLock.readLock().unlock();
+        }
     }
 
     private boolean deleteRedundantReplica(long backendId, long version) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 98fcf380f6..0ed99185ef 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -19,6 +19,8 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.catalog.Replica.ReplicaState;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.Pair;
+import org.apache.doris.cooldown.CooldownConf;
 import org.apache.doris.thrift.TPartitionVersionInfo;
 import org.apache.doris.thrift.TStorageMedium;
 import org.apache.doris.thrift.TTablet;
@@ -46,7 +48,6 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -66,7 +67,7 @@ public class TabletInvertedIndex {
     public static final int NOT_EXIST_VALUE = -1;
 
     public static final TabletMeta NOT_EXIST_TABLET_META = new 
TabletMeta(NOT_EXIST_VALUE, NOT_EXIST_VALUE,
-            NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE, 
TStorageMedium.HDD, -1, -1);
+            NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE, 
TStorageMedium.HDD);
 
     private StampedLock lock = new StampedLock();
 
@@ -126,7 +127,8 @@ public class TabletInvertedIndex {
                              ListMultimap<Long, Long> transactionsToClear,
                              ListMultimap<Long, Long> tabletRecoveryMap,
                              List<Triple<Long, Integer, Boolean>> 
tabletToInMemory,
-                             Map<Long, TabletMeta> syncCooldownTabletMap) {
+                             List<CooldownConf> cooldownConfToPush,
+                             List<CooldownConf> cooldownConfToUpdate) {
         long stamp = readLock();
         long start = System.currentTimeMillis();
         try {
@@ -188,9 +190,9 @@ public class TabletInvertedIndex {
                                 }
                             }
 
-                            if (Config.cooldown_single_remote_file
-                                    && needChangeCooldownConf(tabletMeta, 
backendTabletInfo)) {
-                                
syncCooldownTabletMap.put(backendTabletInfo.getTabletId(), tabletMeta);
+                            if (Config.enable_storage_policy) {
+                                handleCooldownConf(tabletMeta, 
backendTabletInfo, cooldownConfToPush,
+                                        cooldownConfToUpdate);
                             }
 
                             long partitionId = tabletMeta.getPartitionId();
@@ -335,48 +337,81 @@ public class TabletInvertedIndex {
         return false;
     }
 
-    private boolean needChangeCooldownConf(TabletMeta tabletMeta, TTabletInfo 
beTabletInfo) {
-        if (!beTabletInfo.isIsCooldown()) {
-            return false;
-        }
-        // check cooldown type in fe and be, they need to be the same.
-        if (tabletMeta.getCooldownReplicaId() != 
beTabletInfo.getCooldownReplicaId()) {
-            LOG.warn("cooldownReplicaId is wrong for tablet: {}, Fe: {}, Be: 
{}", beTabletInfo.getTabletId(),
-                    tabletMeta.getCooldownReplicaId(), 
beTabletInfo.getCooldownReplicaId());
-            return true;
+    private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo 
beTabletInfo,
+            List<CooldownConf> cooldownConfToPush, List<CooldownConf> 
cooldownConfToUpdate) {
+        if (!beTabletInfo.isSetCooldownReplicaId()) {
+            return;
         }
-        // check cooldown type in one tablet, One UPLOAD_DATA is needed in the 
replicas.
-        long stamp = readLock();
+        Tablet tablet;
         try {
-            boolean replicaInvalid = true;
-            Map<Long, Replica> replicaMap = 
replicaMetaTable.row(beTabletInfo.getTabletId());
-            for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
-                if (entry.getValue().getId() == 
beTabletInfo.getCooldownReplicaId()) {
-                    replicaInvalid = false;
-                    break;
-                }
+            OlapTable table = (OlapTable) 
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())
+                    .getTable(tabletMeta.getTableId())
+                    .get();
+            table.readLock();
+            try {
+                tablet = 
table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId())
+                        .getTablet(beTabletInfo.tablet_id);
+            } finally {
+                table.readUnlock();
             }
-            if (replicaInvalid) {
-                return true;
+        } catch (RuntimeException e) {
+            LOG.warn("failed to get tablet. tabletId={}", 
beTabletInfo.tablet_id);
+            return;
+        }
+        Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
+        if (beTabletInfo.getCooldownTerm() > cooldownConf.second) { // should 
not be here
+            LOG.warn("report cooldownTerm({}) > cooldownTerm in 
TabletMeta({}), tabletId={}",
+                    beTabletInfo.getCooldownTerm(), cooldownConf.second, 
beTabletInfo.tablet_id);
+            return;
+        }
+
+        if (cooldownConf.first <= 0) { // invalid cooldownReplicaId
+            CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), 
tabletMeta.getTableId(),
+                    tabletMeta.getPartitionId(), tabletMeta.getIndexId(), 
beTabletInfo.tablet_id, cooldownConf.second);
+            synchronized (cooldownConfToUpdate) {
+                cooldownConfToUpdate.add(conf);
             }
-        } finally {
-            readUnlock(stamp);
+            return;
+        }
+
+        // validate replica is active
+        Map<Long, Replica> replicaMap = 
replicaMetaTable.row(beTabletInfo.getTabletId());
+        if (replicaMap.isEmpty()) {
+            return;
+        }
+        boolean replicaInvalid = true;
+        for (Replica replica : replicaMap.values()) {
+            if (replica.getId() == cooldownConf.first) {
+                replicaInvalid = false;
+                break;
+            }
+        }
+        if (replicaInvalid) {
+            CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), 
tabletMeta.getTableId(),
+                    tabletMeta.getPartitionId(), tabletMeta.getIndexId(), 
beTabletInfo.tablet_id, cooldownConf.second);
+            synchronized (cooldownConfToUpdate) {
+                cooldownConfToUpdate.add(conf);
+            }
+            return;
+        }
+
+        if (cooldownConf.first != beTabletInfo.getCooldownReplicaId()) {
+            CooldownConf conf = new CooldownConf(beTabletInfo.tablet_id, 
cooldownConf.first, cooldownConf.second);
+            synchronized (cooldownConfToPush) {
+                cooldownConfToPush.add(conf);
+            }
+            return;
         }
-        return false;
     }
 
     public List<Replica> getReplicas(Long tabletId) {
-        List<Replica> replicas = new LinkedList<>();
         long stamp = readLock();
         try {
             Map<Long, Replica> replicaMap = replicaMetaTable.row(tabletId);
-            for (Map.Entry<Long, Replica> entry : replicaMap.entrySet()) {
-                replicas.add(entry.getValue());
-            }
+            return replicaMap.values().stream().collect(Collectors.toList());
         } finally {
             readUnlock(stamp);
         }
-        return replicas;
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
index bfaaba3eb2..be291d93ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletMeta.java
@@ -22,8 +22,6 @@ import org.apache.doris.thrift.TStorageMedium;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 public class TabletMeta {
     private static final Logger LOG = LogManager.getLogger(TabletMeta.class);
 
@@ -37,14 +35,8 @@ public class TabletMeta {
 
     private TStorageMedium storageMedium;
 
-    private long cooldownReplicaId;
-
-    private long cooldownTerm;
-
-    private ReentrantReadWriteLock lock;
-
     public TabletMeta(long dbId, long tableId, long partitionId, long indexId, 
int schemaHash,
-            TStorageMedium storageMedium, long cooldownReplicaId, long 
cooldownTerm) {
+            TStorageMedium storageMedium) {
         this.dbId = dbId;
         this.tableId = tableId;
         this.partitionId = partitionId;
@@ -54,10 +46,6 @@ public class TabletMeta {
         this.newSchemaHash = -1;
 
         this.storageMedium = storageMedium;
-        this.cooldownReplicaId = cooldownReplicaId;
-        this.cooldownTerm = cooldownTerm;
-
-        lock = new ReentrantReadWriteLock();
     }
 
     public long getDbId() {
@@ -84,46 +72,20 @@ public class TabletMeta {
         this.storageMedium = storageMedium;
     }
 
-    public long getCooldownReplicaId() {
-        return cooldownReplicaId;
-    }
-
-    public void setCooldownReplicaId(long cooldownReplicaId) {
-        this.cooldownReplicaId = cooldownReplicaId;
-    }
-
-    public long getCooldownTerm() {
-        return cooldownTerm;
-    }
-
-    public void setCooldownTerm(long cooldownTerm) {
-        this.cooldownTerm = cooldownTerm;
-    }
-
     public int getOldSchemaHash() {
-        lock.readLock().lock();
-        try {
-            return this.oldSchemaHash;
-        } finally {
-            lock.readLock().unlock();
-        }
+        return this.oldSchemaHash;
     }
 
     @Override
     public String toString() {
-        lock.readLock().lock();
-        try {
-            StringBuilder sb = new StringBuilder();
-            sb.append("dbId=").append(dbId);
-            sb.append(" tableId=").append(tableId);
-            sb.append(" partitionId=").append(partitionId);
-            sb.append(" indexId=").append(indexId);
-            sb.append(" oldSchemaHash=").append(oldSchemaHash);
-            sb.append(" newSchemaHash=").append(newSchemaHash);
-
-            return sb.toString();
-        } finally {
-            lock.readLock().unlock();
-        }
+        StringBuilder sb = new StringBuilder();
+        sb.append("dbId=").append(dbId);
+        sb.append(" tableId=").append(tableId);
+        sb.append(" partitionId=").append(partitionId);
+        sb.append(" indexId=").append(indexId);
+        sb.append(" oldSchemaHash=").append(oldSchemaHash);
+        sb.append(" newSchemaHash=").append(newSchemaHash);
+
+        return sb.toString();
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
index 9275a2633c..b0dcec8732 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
@@ -22,8 +22,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.gson.annotations.SerializedName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import lombok.Data;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -32,9 +31,8 @@ import java.io.IOException;
 /**
  * This class represents the olap replica related metadata.
  */
+@Data
 public class CooldownConf implements Writable {
-    private static final Logger LOG = LogManager.getLogger(CooldownConf.class);
-
     @SerializedName(value = "dbId")
     protected long dbId;
     @SerializedName(value = "tableId")
@@ -46,74 +44,27 @@ public class CooldownConf implements Writable {
     @SerializedName(value = "tabletId")
     protected long tabletId;
     @SerializedName(value = "cooldownReplicaId")
-    protected long cooldownReplicaId;
+    protected long cooldownReplicaId = -1;
     @SerializedName(value = "cooldownTerm")
-    protected long cooldownTerm;
-
-    public CooldownConf(long dbId, long tableId, long partitionId, long 
indexId, long tabletId, long cooldownReplicaId,
-                        long cooldownTerm) {
-        this.dbId = dbId;
-        this.tableId = tableId;
-        this.partitionId = partitionId;
-        this.indexId = indexId;
-        this.tabletId = tabletId;
-        this.cooldownReplicaId = cooldownReplicaId;
-        this.cooldownTerm = cooldownTerm;
-    }
+    protected long cooldownTerm = -1;
 
-    public long getDbId() {
-        return dbId;
+    public CooldownConf() {
     }
 
-    public void setDbId(long dbId) {
+    // for update
+    public CooldownConf(long dbId, long tableId, long partitionId, long 
indexId, long tabletId, long cooldownTerm) {
         this.dbId = dbId;
-    }
-
-    public long getTableId() {
-        return tableId;
-    }
-
-    public void setTableId(long tableId) {
         this.tableId = tableId;
-    }
-
-    public long getPartitionId() {
-        return partitionId;
-    }
-
-    public void setPartitionId(long partitionId) {
         this.partitionId = partitionId;
-    }
-
-    public long getIndexId() {
-        return indexId;
-    }
-
-    public void setIndexId(long indexId) {
         this.indexId = indexId;
-    }
-
-    public long getTabletId() {
-        return tabletId;
-    }
-
-    public void setTabletId(long tabletId) {
         this.tabletId = tabletId;
+        this.cooldownTerm = cooldownTerm;
     }
 
-    public long getCooldownReplicaId() {
-        return cooldownReplicaId;
-    }
-
-    public void setCooldownReplicaId(long cooldownReplicaId) {
+    // for push
+    public CooldownConf(long tabletId, long cooldownReplicaId, long 
cooldownTerm) {
+        this.tabletId = tabletId;
         this.cooldownReplicaId = cooldownReplicaId;
-    }
-
-    public long getCooldownTerm() {
-        return cooldownTerm;
-    }
-
-    public void setCooldownTerm(long cooldownTerm) {
         this.cooldownTerm = cooldownTerm;
     }
 
@@ -123,8 +74,9 @@ public class CooldownConf implements Writable {
         Text.writeString(out, json);
     }
 
-    public static CooldownJob read(DataInput in) throws IOException {
+    public static CooldownConf read(DataInput in) throws IOException {
         String json = Text.readString(in);
-        return GsonUtils.GSON.fromJson(json, CooldownJob.class);
+        return GsonUtils.GSON.fromJson(json, CooldownConf.class);
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java
new file mode 100644
index 0000000000..f840a304fc
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfHandler.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.common.util.MasterDaemon;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+public class CooldownConfHandler extends MasterDaemon {
+    private static final Logger LOG = 
LogManager.getLogger(CooldownConfHandler.class);
+
+    // TODO(plat1ko): better to use `Condition`?
+    private static final long INTERVAL_MS = 5000; // 5s
+    private static final int UPDATE_BATCH_SIZE = 512;
+
+    private final Map<Long, CooldownConf> cooldownConfToUpdate = 
Maps.newConcurrentMap();
+
+    public CooldownConfHandler() {
+        super("CooldownConfHandler", INTERVAL_MS);
+    }
+
+    public void addCooldownConfToUpdate(List<CooldownConf> cooldownConfs) {
+        cooldownConfs.forEach(conf -> 
cooldownConfToUpdate.put(conf.getTabletId(), conf));
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        if (cooldownConfToUpdate.isEmpty()) {
+            return;
+        }
+        List<CooldownConf> cooldownConfList = 
cooldownConfToUpdate.values().stream().collect(Collectors.toList());
+        for (int start = 0; start < cooldownConfList.size(); start += 
UPDATE_BATCH_SIZE) {
+            updateCooldownConf(
+                    cooldownConfList.subList(start, Math.min(start + 
UPDATE_BATCH_SIZE, cooldownConfList.size())));
+        }
+    }
+
+    private void updateCooldownConf(List<CooldownConf> confToUpdate) {
+        ArrayList<CooldownConf> updatedConf = new ArrayList<>();
+        updatedConf.ensureCapacity(confToUpdate.size());
+
+        Map<Long, Tablet> tabletMap = new HashMap<>(); // cache tablet
+
+        TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
+        for (CooldownConf conf : confToUpdate) {
+            // choose cooldown replica
+            List<Replica> replicas = 
invertedIndex.getReplicas(conf.getTabletId());
+            if (replicas.isEmpty()) {
+                continue;
+            }
+            Random rand = new Random(System.currentTimeMillis());
+            int index = rand.nextInt(replicas.size());
+            conf.setCooldownReplicaId(replicas.get(index).getId());
+            // find TabletMeta to get cooldown term
+            Tablet tablet = getTablet(conf);
+            if (tablet == null || tablet.getCooldownConf().second != 
conf.cooldownTerm) {
+                // If tablet.cooldownTerm != conf.cooldownTerm, means cooldown 
conf of this tablet has been updated,
+                // should skip this update.
+                continue;
+            }
+            ++conf.cooldownTerm;
+            updatedConf.add(conf);
+            tabletMap.put(conf.tabletId, tablet);
+        }
+
+        // write editlog
+        CooldownConfList list = new CooldownConfList(updatedConf);
+        Env.getCurrentEnv().getEditLog().logUpdateCooldownConf(list);
+
+        // update Tablet
+        for (CooldownConf conf : updatedConf) {
+            Tablet tablet = tabletMap.get(conf.tabletId);
+            tablet.setCooldownConf(conf.cooldownReplicaId, conf.cooldownTerm);
+            LOG.info("update cooldown conf. tabletId={} cooldownReplicaId={} 
cooldownTerm={}", conf.tabletId,
+                    conf.cooldownReplicaId, conf.cooldownTerm);
+        }
+
+        // update finish, remove from map
+        confToUpdate.forEach(conf -> 
cooldownConfToUpdate.remove(conf.getTabletId()));
+
+        // TODO(plat1ko): push CooldownConf to BE?
+    }
+
+    private static Tablet getTablet(CooldownConf conf) {
+        try {
+            OlapTable table = (OlapTable) 
Env.getCurrentInternalCatalog().getDbNullable(conf.dbId)
+                    .getTable(conf.tableId)
+                    .get();
+            table.readLock();
+            try {
+                return 
table.getPartition(conf.partitionId).getIndex(conf.indexId).getTablet(conf.tabletId);
+            } finally {
+                table.readUnlock();
+            }
+        } catch (RuntimeException e) {
+            LOG.warn("failed to get tablet. tabletId={}", conf.tabletId);
+            return null;
+        }
+    }
+
+    public static void replayUpdateCooldownConf(CooldownConfList 
cooldownConfList) {
+        cooldownConfList.getCooldownConf().forEach(conf -> {
+            Tablet tablet = getTablet(conf);
+            if (tablet != null) {
+                tablet.setCooldownConf(conf.cooldownReplicaId, 
conf.cooldownTerm);
+            }
+        });
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java
new file mode 100644
index 0000000000..06bca7a42b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConfList.java
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.cooldown;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+import org.apache.doris.persist.gson.GsonUtils;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+public class CooldownConfList implements Writable {
+    @SerializedName(value = "cooldownConf")
+    private List<CooldownConf> cooldownConf;
+
+    CooldownConfList(List<CooldownConf> cooldownConf) {
+        this.cooldownConf = cooldownConf;
+    }
+
+    List<CooldownConf> getCooldownConf() {
+        return cooldownConf;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        String json = GsonUtils.GSON.toJson(this);
+        Text.writeString(out, json);
+    }
+
+    public static CooldownConfList read(DataInput in) throws IOException {
+        String json = Text.readString(in);
+        return GsonUtils.GSON.fromJson(json, CooldownConfList.class);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
deleted file mode 100644
index c0ec6bd5e7..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.common.DdlException;
-
-/*
- * This exception will be thrown when the cooldown job(v2) running failed.
- */
-public class CooldownException extends DdlException {
-
-    private static final long serialVersionUID = 4844951783432954268L;
-
-    public CooldownException(String msg) {
-        super(msg);
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
deleted file mode 100644
index 4ad601e5f4..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownHandler.java
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.TabletMeta;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.common.util.MasterDaemon;
-
-import com.google.common.collect.Maps;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadPoolExecutor;
-
-public class CooldownHandler extends MasterDaemon {
-    private static final Logger LOG = 
LogManager.getLogger(CooldownHandler.class);
-    private static final int MAX_ACTIVE_COOLDOWN_JOB_SIZE = 10;
-
-    private static final int MAX_RUNABLE_COOLDOWN_JOB_SIZE = 100;
-
-    private static final int MAX_TABLET_PER_JOB = 100;
-
-    private static final long timeoutMs = 1000L * 
Config.push_cooldown_conf_timeout_second;
-
-    // jobId -> CooldownJob, it is used to hold CooldownJob which is used to 
sent conf to be.
-    private final Map<Long, CooldownJob> runableCooldownJobs = 
Maps.newConcurrentMap();
-    private final Map<Long, Boolean> resetingTablet = Maps.newConcurrentMap();
-    // jobId -> CooldownJob,
-    public final Map<Long, CooldownJob> activeCooldownJobs = 
Maps.newConcurrentMap();
-
-    public final ThreadPoolExecutor cooldownThreadPool = 
ThreadPoolManager.newDaemonCacheThreadPool(
-            MAX_ACTIVE_COOLDOWN_JOB_SIZE, "cooldown-pool", true);
-
-    // syncCooldownTabletMap: tabletId -> TabletMeta
-    public void handleCooldownConf(Map<Long, TabletMeta> 
syncCooldownTabletMap) {
-        List<CooldownConf> cooldownConfList = new LinkedList<>();
-        for (Map.Entry<Long, TabletMeta> entry : 
syncCooldownTabletMap.entrySet()) {
-            if (runableCooldownJobs.size() >= MAX_RUNABLE_COOLDOWN_JOB_SIZE) {
-                return;
-            }
-            Long tabletId = entry.getKey();
-            TabletMeta tabletMeta = entry.getValue();
-            if (resetingTablet.containsKey(tabletId)) {
-                continue;
-            }
-            long cooldownReplicaId = -1;
-            List<Replica> replicas = 
Env.getCurrentInvertedIndex().getReplicas(tabletId);
-            if (replicas.size() == 0) {
-                continue;
-            }
-            for (Replica replica : replicas) {
-                if (tabletMeta.getCooldownReplicaId() == replica.getId()) {
-                    cooldownReplicaId = tabletMeta.getCooldownReplicaId();
-                    break;
-                }
-            }
-            long cooldownTerm = tabletMeta.getCooldownTerm();
-            if (cooldownReplicaId == -1) {
-                Random rand = new Random(System.currentTimeMillis());
-                int index = rand.nextInt(replicas.size());
-                cooldownReplicaId = replicas.get(index).getId();
-                ++cooldownTerm;
-            }
-            CooldownConf cooldownConf = new CooldownConf(tabletMeta.getDbId(), 
tabletMeta.getTableId(),
-                    tabletMeta.getPartitionId(), tabletMeta.getIndexId(), 
tabletId, cooldownReplicaId, cooldownTerm);
-            cooldownConfList.add(cooldownConf);
-            if (cooldownConfList.size() >= MAX_TABLET_PER_JOB) {
-                long jobId = Env.getCurrentEnv().getNextId();
-                CooldownJob cooldownJob = new CooldownJob(jobId, 
cooldownConfList, timeoutMs);
-                runableCooldownJobs.put(jobId, cooldownJob);
-                for (CooldownConf conf : cooldownConfList) {
-                    resetingTablet.put(conf.getTabletId(), true);
-                }
-                cooldownConfList = new LinkedList<>();
-            }
-        }
-        if (cooldownConfList.size() > 0) {
-            long jobId = Env.getCurrentEnv().getNextId();
-            CooldownJob cooldownJob = new CooldownJob(jobId, cooldownConfList, 
timeoutMs);
-            runableCooldownJobs.put(jobId, cooldownJob);
-            for (CooldownConf conf : cooldownConfList) {
-                resetingTablet.put(conf.getTabletId(), true);
-            }
-        }
-    }
-
-    @Override
-    protected void runAfterCatalogReady() {
-        clearFinishedOrCancelledCooldownJob();
-        runableCooldownJobs.values().forEach(cooldownJob -> {
-            if (!cooldownJob.isDone() && 
!activeCooldownJobs.containsKey(cooldownJob.getJobId())
-                    && activeCooldownJobs.size() < 
MAX_ACTIVE_COOLDOWN_JOB_SIZE) {
-                if (FeConstants.runningUnitTest) {
-                    cooldownJob.run();
-                } else {
-                    cooldownThreadPool.submit(() -> {
-                        if 
(activeCooldownJobs.putIfAbsent(cooldownJob.getJobId(), cooldownJob) == null) {
-                            try {
-                                cooldownJob.run();
-                            } finally {
-                                
activeCooldownJobs.remove(cooldownJob.getJobId());
-                            }
-                        }
-                    });
-                }
-            }
-        });
-    }
-
-    public void write(DataOutput out) throws IOException {
-        if (Config.cooldown_single_remote_file) {
-            out.writeInt(runableCooldownJobs.size());
-            for (CooldownJob cooldownJob : runableCooldownJobs.values()) {
-                cooldownJob.write(out);
-            }
-        }
-    }
-
-    public void readField(DataInput in) throws IOException {
-        if (Config.cooldown_single_remote_file) {
-            if (Env.getCurrentEnvJournalVersion() >= 
FeMetaVersion.VERSION_115) {
-                int size = in.readInt();
-                for (int i = 0; i < size; i++) {
-                    CooldownJob cooldownJob = CooldownJob.read(in);
-                    replayCooldownJob(cooldownJob);
-                }
-            }
-        }
-    }
-
-    public void replayCooldownJob(CooldownJob cooldownJob) {
-        CooldownJob replayCooldownJob;
-        if (!runableCooldownJobs.containsKey(cooldownJob.getJobId())) {
-            replayCooldownJob = new CooldownJob(cooldownJob.jobId, 
cooldownJob.getCooldownConfList(),
-                    cooldownJob.timeoutMs);
-            runableCooldownJobs.put(cooldownJob.getJobId(), replayCooldownJob);
-            for (CooldownConf cooldownConf : 
cooldownJob.getCooldownConfList()) {
-                resetingTablet.put(cooldownConf.getTabletId(), true);
-            }
-        } else {
-            replayCooldownJob = 
runableCooldownJobs.get(cooldownJob.getJobId());
-        }
-        replayCooldownJob.replay(cooldownJob);
-        if (replayCooldownJob.isDone()) {
-            runableCooldownJobs.remove(cooldownJob.getJobId());
-            for (CooldownConf cooldownConf : 
cooldownJob.getCooldownConfList()) {
-                resetingTablet.remove(cooldownConf.getTabletId());
-            }
-        }
-    }
-
-    private void clearFinishedOrCancelledCooldownJob() {
-        Iterator<Map.Entry<Long, CooldownJob>> iterator = 
runableCooldownJobs.entrySet().iterator();
-        while (iterator.hasNext()) {
-            CooldownJob cooldownJob = iterator.next().getValue();
-            if (cooldownJob.isDone()) {
-                iterator.remove();
-                for (CooldownConf cooldownConf : 
cooldownJob.getCooldownConfList()) {
-                    resetingTablet.remove(cooldownConf.getTabletId());
-                }
-            }
-        }
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
deleted file mode 100644
index 99643b874a..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownJob.java
+++ /dev/null
@@ -1,401 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.common.FeConstants;
-import org.apache.doris.common.FeMetaVersion;
-import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
-import org.apache.doris.persist.gson.GsonUtils;
-import org.apache.doris.task.AgentBatchTask;
-import org.apache.doris.task.AgentTask;
-import org.apache.doris.task.AgentTaskExecutor;
-import org.apache.doris.task.AgentTaskQueue;
-import org.apache.doris.task.PushCooldownConfTask;
-import org.apache.doris.thrift.TTaskType;
-
-import com.google.common.base.Preconditions;
-import com.google.gson.annotations.SerializedName;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class CooldownJob implements Writable {
-    private static final Logger LOG = LogManager.getLogger(CooldownJob.class);
-
-    public enum JobState {
-        PENDING, // Job is created
-        SEND_CONF, // send cooldown task to BE.
-        RUNNING, // cooldown tasks are sent to BE, and waiting for them 
finished.
-        FINISHED, // job is done
-        CANCELLED; // job is cancelled(failed or be cancelled by user)
-
-        public boolean isFinalState() {
-            return this == CooldownJob.JobState.FINISHED || this == 
CooldownJob.JobState.CANCELLED;
-        }
-    }
-
-    @SerializedName(value = "jobId")
-    protected long jobId;
-    @SerializedName(value = "jobState")
-    protected CooldownJob.JobState jobState;
-    @SerializedName(value = "cooldownConfList")
-    protected List<CooldownConf> cooldownConfList;
-
-    @SerializedName(value = "errMsg")
-    protected String errMsg = "";
-    @SerializedName(value = "createTimeMs")
-    protected long createTimeMs = -1;
-    @SerializedName(value = "finishedTimeMs")
-    protected long finishedTimeMs = -1;
-    @SerializedName(value = "timeoutMs")
-    protected long timeoutMs = -1;
-
-    public long getJobId() {
-        return jobId;
-    }
-
-    public JobState getJobState() {
-        return jobState;
-    }
-
-    public List<CooldownConf> getCooldownConfList() {
-        return cooldownConfList;
-    }
-
-    public long getTimeoutMs() {
-        return timeoutMs;
-    }
-
-    private AgentBatchTask cooldownBatchTask = new AgentBatchTask();
-
-    public CooldownJob(long jobId, List<CooldownConf> cooldownConfList, long 
timeoutMs) {
-        this.jobId = jobId;
-        this.jobState = JobState.PENDING;
-        this.cooldownConfList = cooldownConfList;
-        this.createTimeMs = System.currentTimeMillis();
-        this.timeoutMs = timeoutMs;
-    }
-
-    protected void runPendingJob() throws CooldownException {
-        Preconditions.checkState(jobState == CooldownJob.JobState.PENDING, 
jobState);
-        this.jobState = JobState.SEND_CONF;
-        // write edit log
-        for (CooldownConf cooldownConf : cooldownConfList) {
-            setCooldownReplica(cooldownConf.getDbId(), 
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
-                    cooldownConf.getIndexId(), cooldownConf.getTabletId(), 
cooldownConf.getCooldownReplicaId(),
-                    cooldownConf.getCooldownTerm());
-            
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
-                    cooldownConf.getCooldownReplicaId());
-            
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
-                    cooldownConf.getCooldownTerm());
-        }
-        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
-        LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
-    }
-
-    protected void runSendJob() throws CooldownException {
-        Preconditions.checkState(jobState == JobState.SEND_CONF, jobState);
-        LOG.info("begin to send cooldown conf tasks. job: {}", jobId);
-        if (!FeConstants.runningUnitTest) {
-            Map<Long, List<CooldownConf>> cooldownMap = new HashMap<>();
-            for (CooldownConf cooldownConf : cooldownConfList) {
-                Database db = Env.getCurrentInternalCatalog()
-                        .getDbOrException(cooldownConf.getDbId(), s -> new 
CooldownException(
-                                "Database " + s + " does not exist"));
-                OlapTable tbl;
-                try {
-                    tbl = (OlapTable) 
db.getTableOrMetaException(cooldownConf.getTableId(), TableIf.TableType.OLAP);
-                } catch (MetaNotFoundException e) {
-                    throw new CooldownException(e.getMessage());
-                }
-                if (tbl == null) {
-                    throw new CooldownException(String.format("No table: %d", 
cooldownConf.getTableId()));
-                }
-                tbl.readLock();
-                try {
-                    Partition partition = 
tbl.getPartition(cooldownConf.getPartitionId());
-                    if (partition == null) {
-                        throw new CooldownException(String.format("No 
partition: %d", cooldownConf.getPartitionId()));
-                    }
-                    MaterializedIndex index = 
partition.getIndex(cooldownConf.getIndexId());
-                    if (index == null) {
-                        throw new CooldownException(String.format("No index: 
%d", cooldownConf.getIndexId()));
-                    }
-                    Tablet tablet = 
index.getTablet(cooldownConf.getTabletId());
-                    if (tablet == null) {
-                        throw new CooldownException(String.format("No tablet: 
%d", cooldownConf.getTabletId()));
-                    }
-                    for (Replica replica : tablet.getReplicas()) {
-                        if (!cooldownMap.containsKey(replica.getBackendId())) {
-                            cooldownMap.put(replica.getBackendId(), new 
LinkedList<>());
-                        }
-                        
cooldownMap.get(replica.getBackendId()).add(cooldownConf);
-                    }
-                } finally {
-                    tbl.readUnlock();
-                }
-            }
-            for (Map.Entry<Long, List<CooldownConf>> entry : 
cooldownMap.entrySet()) {
-                PushCooldownConfTask pushCooldownConfTask = new 
PushCooldownConfTask(entry.getKey(), entry.getValue());
-                cooldownBatchTask.addTask(pushCooldownConfTask);
-            }
-            AgentTaskQueue.addBatchTask(cooldownBatchTask);
-            AgentTaskExecutor.submit(cooldownBatchTask);
-        }
-
-        this.jobState = JobState.RUNNING;
-        // write edit log
-        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
-        LOG.info("send cooldown job {} state to {}", jobId, this.jobState);
-    }
-
-    protected void runRunningJob() throws CooldownException {
-        if (!cooldownBatchTask.isFinished()) {
-            LOG.info("cooldown tasks not finished. job: {}", jobId);
-            List<AgentTask> tasks = cooldownBatchTask.getUnfinishedTasks(2000);
-            for (AgentTask task : tasks) {
-                if (task.getFailedTimes() >= 3) {
-                    task.setFinished(true);
-                    AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
-                    LOG.warn("push cooldown conf task failed after try three 
times: " + task.getErrorMsg());
-                    throw new CooldownException("cooldown tasks failed on 
backend: " + task.getBackendId());
-                }
-            }
-            return;
-        }
-        this.jobState = CooldownJob.JobState.FINISHED;
-        this.finishedTimeMs = System.currentTimeMillis();
-
-        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
-        LOG.info("push cooldown conf job finished: {}", jobId);
-    }
-
-    public boolean isTimeout() {
-        return System.currentTimeMillis() - createTimeMs > timeoutMs;
-    }
-
-    public boolean isDone() {
-        return jobState.isFinalState();
-    }
-
-    /*
-     * cancelImpl() can be called any time any place.
-     * We need to clean any possible residual of this job.
-     */
-    protected synchronized void cancelImpl(String errMsg) {
-        if (jobState.isFinalState()) {
-            return;
-        }
-
-        cancelInternal();
-
-        this.errMsg = errMsg;
-        this.finishedTimeMs = System.currentTimeMillis();
-        LOG.info("cancel cooldown job {}, err: {}", jobId, errMsg);
-        Env.getCurrentEnv().getEditLog().logCooldownJob(this);
-    }
-
-    /**
-     * The keyword 'synchronized' only protects 2 methods:
-     * run() and cancel()
-     * Only these 2 methods can be visited by different thread(internal 
working thread and user connection thread)
-     * So using 'synchronized' to make sure only one thread can run the job at 
one time.
-     *
-     * lock order:
-     *      synchronized
-     *      db lock
-     */
-    public synchronized void run() {
-        if (isTimeout()) {
-            cancelImpl("Timeout");
-            return;
-        }
-
-        try {
-            switch (jobState) {
-                case PENDING:
-                    runPendingJob();
-                    break;
-                case SEND_CONF:
-                    runSendJob();
-                    break;
-                case RUNNING:
-                    runRunningJob();
-                    break;
-                default:
-                    break;
-            }
-        } catch (CooldownException e) {
-            cancelImpl(e.getMessage());
-        }
-    }
-
-    public void replay(CooldownJob replayedJob) {
-        try {
-            switch (replayedJob.jobState) {
-                case PENDING:
-                    replayCreateJob(replayedJob);
-                    break;
-                case SEND_CONF:
-                    replayPendingJob();
-                    break;
-                case FINISHED:
-                    replayRunningJob(replayedJob);
-                    break;
-                case CANCELLED:
-                    replayCancelled(replayedJob);
-                    break;
-                default:
-                    break;
-            }
-        } catch (CooldownException e) {
-            LOG.warn("[INCONSISTENT META] replay cooldown job failed {}", 
replayedJob.jobId, e);
-        }
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-        String json = GsonUtils.GSON.toJson(this);
-        Text.writeString(out, json);
-    }
-
-    public static CooldownJob read(DataInput in) throws IOException {
-        if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_115) {
-            String json = Text.readString(in);
-            return GsonUtils.GSON.fromJson(json, CooldownJob.class);
-        }
-        return null;
-    }
-
-    /**
-     * Replay job in PENDING state.
-     * Should replay all changes before this job's state transfer to PENDING.
-     * These changes should be same as changes in CooldownHandler.createJob()
-     */
-    private void replayCreateJob(CooldownJob replayedJob) {
-        jobId = replayedJob.jobId;
-        for (CooldownConf replayedConf : replayedJob.cooldownConfList) {
-            CooldownConf cooldownConf = new 
CooldownConf(replayedConf.getDbId(), replayedConf.getTableId(),
-                    replayedConf.getPartitionId(), replayedConf.getIndexId(), 
replayedConf.getTabletId(),
-                    replayedConf.getCooldownReplicaId(), 
replayedConf.getCooldownTerm());
-            cooldownConfList.add(cooldownConf);
-        }
-        createTimeMs = replayedJob.createTimeMs;
-        timeoutMs = replayedJob.timeoutMs;
-        jobState = JobState.PENDING;
-        LOG.info("replay create cooldown job: {}, conf size: {}", jobId, 
cooldownConfList.size());
-    }
-
-    /**
-     * Replay job in PENDING state. set cooldown type in Replica
-     */
-    private void replayPendingJob() throws CooldownException {
-        for (CooldownConf cooldownConf : cooldownConfList) {
-            setCooldownReplica(cooldownConf.getDbId(), 
cooldownConf.getTableId(), cooldownConf.getPartitionId(),
-                    cooldownConf.getIndexId(), cooldownConf.getTabletId(), 
cooldownConf.getCooldownReplicaId(),
-                    cooldownConf.getCooldownTerm());
-            if 
(Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()) != 
null) {
-                
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownReplicaId(
-                        cooldownConf.getCooldownReplicaId());
-                
Env.getCurrentInvertedIndex().getTabletMeta(cooldownConf.getTabletId()).setCooldownTerm(
-                        cooldownConf.getCooldownTerm());
-            }
-        }
-        jobState = JobState.SEND_CONF;
-        LOG.info("replay send cooldown conf, job: {}", jobId);
-    }
-
-    /**
-     * Replay job in FINISHED state.
-     * Should replay all changes in runRunningJob()
-     */
-    private void replayRunningJob(CooldownJob replayedJob) throws 
CooldownException {
-        jobState = CooldownJob.JobState.FINISHED;
-        this.finishedTimeMs = replayedJob.finishedTimeMs;
-        LOG.info("replay finished cooldown job: {}", jobId);
-    }
-
-    private void setCooldownReplica(long dbId, long tableId, long partitionId, 
long indexId, long tabletId,
-                                    long cooldownReplicaId, long cooldownTerm) 
throws CooldownException {
-        Database db = Env.getCurrentInternalCatalog()
-                .getDbOrException(dbId, s -> new CooldownException("Database " 
+ s + " does not exist"));
-        OlapTable tbl;
-        try {
-            tbl = (OlapTable) db.getTableOrMetaException(tableId, 
TableIf.TableType.OLAP);
-        } catch (MetaNotFoundException e) {
-            throw new CooldownException(e.getMessage());
-        }
-        if (tbl != null) {
-            tbl.writeLock();
-            try {
-                Partition partition = tbl.getPartition(partitionId);
-                if (partition != null) {
-                    MaterializedIndex index = partition.getIndex(indexId);
-                    if (index != null) {
-                        Tablet tablet = index.getTablet(tabletId);
-                        if (tablet != null) {
-                            tablet.setCooldownReplicaId(cooldownReplicaId);
-                            tablet.setCooldownTerm(cooldownTerm);
-                            LOG.info("setCooldownReplicaId to {} when cancel 
job: {}:{}", cooldownReplicaId,
-                                    tablet.getId(), jobId);
-                            return;
-                        }
-                    }
-                }
-                throw new CooldownException("set cooldown type failed.");
-            } finally {
-                tbl.writeUnlock();
-            }
-        }
-    }
-
-    private void cancelInternal() {
-        // clear tasks if has
-        AgentTaskQueue.removeBatchTask(cooldownBatchTask, 
TTaskType.PUSH_COOLDOWN_CONF);
-        jobState = CooldownJob.JobState.CANCELLED;
-    }
-
-    /**
-     * Replay job in CANCELLED state.
-     */
-    private void replayCancelled(CooldownJob replayedJob) {
-        cancelInternal();
-        this.jobState = CooldownJob.JobState.CANCELLED;
-        this.finishedTimeMs = replayedJob.finishedTimeMs;
-        this.errMsg = replayedJob.errMsg;
-        LOG.info("replay cancelled cooldown job: {}", jobId);
-    }
-
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 6a9392ffee..10562b9a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -361,7 +361,7 @@ public class InternalCatalog implements CatalogIf<Database> 
{
                         int schemaHash = 
olapTable.getSchemaHashByIndexId(indexId);
                         for (Tablet tablet : index.getTablets()) {
                             TabletMeta tabletMeta = new TabletMeta(dbId, 
tableId, partitionId, indexId, schemaHash,
-                                    medium, tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
+                                    medium);
                             long tabletId = tablet.getId();
                             invertedIndex.addTablet(tabletId, tabletMeta);
                             for (Replica replica : tablet.getReplicas()) {
@@ -1290,7 +1290,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         int schemaHash = 
olapTable.getSchemaHashByIndexId(indexId);
                         for (Tablet tablet : mIndex.getTablets()) {
                             TabletMeta tabletMeta = new TabletMeta(dbId, 
tableId, partitionId, indexId, schemaHash,
-                                    medium, tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
+                                    medium);
                             long tabletId = tablet.getId();
                             invertedIndex.addTablet(tabletId, tabletMeta);
                             for (Replica replica : tablet.getReplicas()) {
@@ -1557,8 +1557,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                     int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
                     for (Tablet tablet : index.getTablets()) {
                         TabletMeta tabletMeta = new TabletMeta(info.getDbId(), 
info.getTableId(), partition.getId(),
-                                index.getId(), schemaHash, 
info.getDataProperty().getStorageMedium(),
-                                tablet.getCooldownReplicaId(), 
tablet.getCooldownTerm());
+                                index.getId(), schemaHash, 
info.getDataProperty().getStorageMedium());
                         long tabletId = tablet.getId();
                         invertedIndex.addTablet(tabletId, tabletMeta);
                         for (Replica replica : tablet.getReplicas()) {
@@ -1709,8 +1708,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
 
             // create tablets
             int schemaHash = indexMeta.getSchemaHash();
-            TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, schemaHash, storageMedium, -1,
-                    -1);
+            TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, schemaHash, storageMedium);
             createTablets(clusterName, index, ReplicaState.NORMAL, 
distributionInfo, version, replicaAlloc, tabletMeta,
                     tabletIdSet, idGeneratorBuffer);
 
@@ -2710,7 +2708,7 @@ public class InternalCatalog implements 
CatalogIf<Database> {
                         int schemaHash = 
olapTable.getSchemaHashByIndexId(indexId);
                         for (Tablet tablet : mIndex.getTablets()) {
                             TabletMeta tabletMeta = new TabletMeta(db.getId(), 
olapTable.getId(), partitionId, indexId,
-                                    schemaHash, medium, 
tablet.getCooldownReplicaId(), tablet.getCooldownTerm());
+                                    schemaHash, medium);
                             long tabletId = tablet.getId();
                             invertedIndex.addTablet(tabletId, tabletMeta);
                             for (Replica replica : tablet.getReplicas()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 0c0e49f617..2eb0b8dbe3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -36,7 +36,7 @@ import org.apache.doris.cluster.Cluster;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
-import org.apache.doris.cooldown.CooldownJob;
+import org.apache.doris.cooldown.CooldownConfList;
 import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -580,8 +580,8 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
-            case OperationType.OP_PUSH_COOLDOWN_CONF: {
-                data = CooldownJob.read(in);
+            case OperationType.OP_UPDATE_COOLDOWN_CONF: {
+                data = CooldownConfList.read(in);
                 isRead = true;
                 break;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index e14f87d375..d45d787ed0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Daemon;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.cooldown.CooldownConf;
 import org.apache.doris.metric.GaugeMetric;
 import org.apache.doris.metric.Metric.MetricUnit;
 import org.apache.doris.metric.MetricRepo;
@@ -62,6 +63,7 @@ import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTask;
 import org.apache.doris.task.PublishVersionTask;
+import org.apache.doris.task.PushCooldownConfTask;
 import org.apache.doris.task.PushStoragePolicyTask;
 import org.apache.doris.task.StorageMediaMigrationTask;
 import org.apache.doris.task.UpdateTabletMetaInfoTask;
@@ -94,6 +96,7 @@ import org.apache.thrift.TException;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -263,6 +266,17 @@ public class ReportHandler extends Daemon {
         }
     }
 
+    private static void handlePushCooldownConf(long backendId, 
List<CooldownConf> cooldownConfToPush) {
+        final int PUSH_BATCH_SIZE = 1024;
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (int start = 0; start < cooldownConfToPush.size(); start += 
PUSH_BATCH_SIZE) {
+            PushCooldownConfTask task = new PushCooldownConfTask(backendId,
+                    cooldownConfToPush.subList(start, Math.min(start + 
PUSH_BATCH_SIZE, cooldownConfToPush.size())));
+            batchTask.addTask(task);
+        }
+        AgentTaskExecutor.submit(batchTask);
+    }
+
     private static void handlePushStoragePolicy(long backendId, List<Policy> 
policyToPush,
                                                 List<Resource> resourceToPush, 
List<Long> policyToDrop) {
         AgentBatchTask batchTask = new AgentBatchTask();
@@ -396,8 +410,6 @@ public class ReportHandler extends Daemon {
         Set<Long> tabletFoundInMeta = Sets.newConcurrentHashSet();
         // storage medium -> tablet id
         ListMultimap<TStorageMedium, Long> tabletMigrationMap = 
LinkedListMultimap.create();
-        // the cooldown type of replicas which need to be sync. tabletId -> 
TabletMeta
-        Map<Long, TabletMeta> syncCooldownTabletMap = new HashMap<>();
 
         // dbid -> txn id -> [partition info]
         Map<Long, ListMultimap<Long, TPartitionVersionInfo>> 
transactionsToPublish = Maps.newHashMap();
@@ -408,6 +420,9 @@ public class ReportHandler extends Daemon {
 
         List<Triple<Long, Integer, Boolean>> tabletToInMemory = 
Lists.newArrayList();
 
+        List<CooldownConf> cooldownConfToPush = new LinkedList<>();
+        List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();
+
         // 1. do the diff. find out (intersection) / (be - meta) / (meta - be)
         Env.getCurrentInvertedIndex().tabletReport(backendId, backendTablets, 
storageMediumMap,
                 tabletSyncMap,
@@ -418,7 +433,8 @@ public class ReportHandler extends Daemon {
                 transactionsToClear,
                 tabletRecoveryMap,
                 tabletToInMemory,
-                syncCooldownTabletMap);
+                cooldownConfToPush,
+                cooldownConfToUpdate);
 
         // 2. sync
         if (!tabletSyncMap.isEmpty()) {
@@ -461,9 +477,12 @@ public class ReportHandler extends Daemon {
             handleSetTabletInMemory(backendId, tabletToInMemory);
         }
 
-        // 10. send cooldownType which need sync to CooldownHandler
-        if (!syncCooldownTabletMap.isEmpty()) {
-            
Env.getCurrentEnv().getCooldownHandler().handleCooldownConf(syncCooldownTabletMap);
+        // handle cooldown conf
+        if (!cooldownConfToPush.isEmpty()) {
+            handlePushCooldownConf(backendId, cooldownConfToPush);
+        }
+        if (!cooldownConfToUpdate.isEmpty()) {
+            
Env.getCurrentEnv().getCooldownConfHandler().addCooldownConfToUpdate(cooldownConfToUpdate);
         }
 
         final SystemInfoService currentSystemInfo = Env.getCurrentSystemInfo();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index f6a71b5377..7a1f9a967e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -41,7 +41,8 @@ import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
-import org.apache.doris.cooldown.CooldownJob;
+import org.apache.doris.cooldown.CooldownConfHandler;
+import org.apache.doris.cooldown.CooldownConfList;
 import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -720,11 +721,9 @@ public class EditLog {
                     }
                     break;
                 }
-                case OperationType.OP_PUSH_COOLDOWN_CONF:
-                    if (Config.cooldown_single_remote_file) {
-                        CooldownJob cooldownJob = (CooldownJob) 
journal.getData();
-                        
env.getCooldownHandler().replayCooldownJob(cooldownJob);
-                    }
+                case OperationType.OP_UPDATE_COOLDOWN_CONF:
+                    CooldownConfList cooldownConfList = (CooldownConfList) 
journal.getData();
+                    
CooldownConfHandler.replayUpdateCooldownConf(cooldownConfList);
                     break;
                 case OperationType.OP_BATCH_ADD_ROLLUP: {
                     BatchAlterJobPersistInfo batchAlterJobV2 = 
(BatchAlterJobPersistInfo) journal.getData();
@@ -1517,8 +1516,8 @@ public class EditLog {
         logEdit(OperationType.OP_ALTER_JOB_V2, alterJob);
     }
 
-    public void logCooldownJob(CooldownJob cooldownJob) {
-        logEdit(OperationType.OP_PUSH_COOLDOWN_CONF, cooldownJob);
+    public void logUpdateCooldownConf(CooldownConfList cooldownConf) {
+        logEdit(OperationType.OP_UPDATE_COOLDOWN_CONF, cooldownConf);
     }
 
     public void logBatchAlterJob(BatchAlterJobPersistInfo batchAlterJobV2) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 73ead272e4..256331444f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -76,8 +76,6 @@ public class OperationType {
 
     //schema change for add and drop columns
     public static final short OP_MODIFY_TABLE_ADD_OR_DROP_COLUMNS = 128;
-    // set cooldown conf in replica
-    public static final short OP_PUSH_COOLDOWN_CONF = 129;
 
     // 30~39 130~139 230~239 ...
     // load job for only hadoop load
@@ -266,6 +264,8 @@ public class OperationType {
     public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
 
     public static final short OP_ALTER_USER = 400;
+    // cooldown conf
+    public static final short OP_UPDATE_COOLDOWN_CONF = 401;
 
     /**
      * Get opcode name by op code.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
index 0198a6b20f..c01b868f02 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java
@@ -203,12 +203,6 @@ public class MetaPersistMethod {
                 metaPersistMethod.writeMethod = 
Env.class.getDeclaredMethod("saveMTMVJobManager",
                         CountingDataOutputStream.class, long.class);
                 break;
-            case "cooldownJob":
-                metaPersistMethod.readMethod = 
Env.class.getDeclaredMethod("loadCooldownJob", DataInputStream.class,
-                        long.class);
-                metaPersistMethod.writeMethod = 
Env.class.getDeclaredMethod("saveCooldownJob",
-                        CountingDataOutputStream.class, long.class);
-                break;
             default:
                 break;
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
index 2074ffe539..2737c052f3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java
@@ -38,7 +38,7 @@ public class PersistMetaModules {
             "masterInfo", "frontends", "backends", "datasource", "db", 
"alterJob", "recycleBin",
             "globalVariable", "cluster", "broker", "resources", "exportJob", 
"syncJob", "backupHandler",
             "paloAuth", "transactionState", "colocateTableIndex", 
"routineLoadJobs", "loadJobV2", "smallFiles",
-            "plugins", "deleteHandler", "sqlBlockRule", "policy", 
"mtmvJobManager", "cooldownJob");
+            "plugins", "deleteHandler", "sqlBlockRule", "policy", 
"mtmvJobManager");
 
     static {
         MODULES_MAP = Maps.newHashMap();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java 
b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
index 42e788b537..1c8a0130cc 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/CatalogMocker.java
@@ -249,7 +249,7 @@ public class CatalogMocker {
 
         Tablet tablet0 = new Tablet(TEST_TABLET0_ID);
         TabletMeta tabletMeta = new TabletMeta(TEST_DB_ID, TEST_TBL_ID, 
TEST_SINGLE_PARTITION_ID,
-                                               TEST_TBL_ID, SCHEMA_HASH, 
TStorageMedium.HDD, -1, -1);
+                                               TEST_TBL_ID, SCHEMA_HASH, 
TStorageMedium.HDD);
         baseIndex.addTablet(tablet0, tabletMeta);
         Replica replica0 = new Replica(TEST_REPLICA0_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica1 = new Replica(TEST_REPLICA1_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -323,7 +323,7 @@ public class CatalogMocker {
 
         Tablet baseTabletP1 = new Tablet(TEST_BASE_TABLET_P1_ID);
         TabletMeta tabletMetaBaseTabletP1 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION1_ID,
-                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
+                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD);
         baseIndexP1.addTablet(baseTabletP1, tabletMetaBaseTabletP1);
         Replica replica3 = new Replica(TEST_REPLICA3_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica4 = new Replica(TEST_REPLICA4_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -335,7 +335,7 @@ public class CatalogMocker {
 
         Tablet baseTabletP2 = new Tablet(TEST_BASE_TABLET_P2_ID);
         TabletMeta tabletMetaBaseTabletP2 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION2_ID,
-                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD, -1, -1);
+                                                           TEST_TBL2_ID, 
SCHEMA_HASH, TStorageMedium.HDD);
         baseIndexP2.addTablet(baseTabletP2, tabletMetaBaseTabletP2);
         Replica replica6 = new Replica(TEST_REPLICA6_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica7 = new Replica(TEST_REPLICA7_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -356,7 +356,7 @@ public class CatalogMocker {
         Tablet rollupTabletP1 = new Tablet(TEST_ROLLUP_TABLET_P1_ID);
         TabletMeta tabletMetaRollupTabletP1 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION1_ID,
                                                              
TEST_ROLLUP_TABLET_P1_ID, ROLLUP_SCHEMA_HASH,
-                                                             
TStorageMedium.HDD, -1, -1);
+                                                             
TStorageMedium.HDD);
         rollupIndexP1.addTablet(rollupTabletP1, tabletMetaRollupTabletP1);
         Replica replica9 = new Replica(TEST_REPLICA9_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica10 = new Replica(TEST_REPLICA10_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
@@ -373,7 +373,7 @@ public class CatalogMocker {
         Tablet rollupTabletP2 = new Tablet(TEST_ROLLUP_TABLET_P2_ID);
         TabletMeta tabletMetaRollupTabletP2 = new TabletMeta(TEST_DB_ID, 
TEST_TBL2_ID, TEST_PARTITION1_ID,
                                                              
TEST_ROLLUP_TABLET_P2_ID, ROLLUP_SCHEMA_HASH,
-                                                             
TStorageMedium.HDD, -1, -1);
+                                                             
TStorageMedium.HDD);
         rollupIndexP2.addTablet(rollupTabletP2, tabletMetaRollupTabletP2);
         Replica replica12 = new Replica(TEST_REPLICA12_ID, BACKEND1_ID, 0, 
ReplicaState.NORMAL);
         Replica replica13 = new Replica(TEST_REPLICA13_ID, BACKEND2_ID, 0, 
ReplicaState.NORMAL);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
index d79855ce67..5e022a56e2 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CatalogTestUtil.java
@@ -193,7 +193,7 @@ public class CatalogTestUtil {
 
         // index
         MaterializedIndex index = new MaterializedIndex(indexId, 
IndexState.NORMAL);
-        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD, -1, -1);
+        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD);
         index.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica1);
@@ -261,7 +261,7 @@ public class CatalogTestUtil {
         // index
         MaterializedIndex index = new MaterializedIndex(testIndexId2, 
IndexState.NORMAL);
         TabletMeta tabletMeta = new TabletMeta(testDbId1, testTableId2, 
testPartitionId2, testIndexId2, 0,
-                TStorageMedium.HDD, -1, -1);
+                TStorageMedium.HDD);
         index.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
index 14e20a1bd7..d7fdb2694a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java
@@ -66,7 +66,7 @@ public class TabletTest {
         };
 
         tablet = new Tablet(1);
-        TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, 
TStorageMedium.HDD, -1, -1);
+        TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, 
TStorageMedium.HDD);
         invertedIndex.addTablet(1, tabletMeta);
         replica1 = new Replica(1L, 1L, 100L, 0, 200000L, 0, 3000L, 
ReplicaState.NORMAL, 0, 0);
         replica2 = new Replica(2L, 2L, 100L, 0, 200000L, 0, 3000L, 
ReplicaState.NORMAL, 0, 0);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
index 1ac66444ef..c81c3839c4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/clone/ClusterLoadStatisticsTest.java
@@ -145,16 +145,16 @@ public class ClusterLoadStatisticsTest {
         // tablet
         invertedIndex = new TabletInvertedIndex();
 
-        invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD, -1, -1));
+        invertedIndex.addTablet(50000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD));
         invertedIndex.addReplica(50000, new Replica(50001, be1.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(50000, new Replica(50002, be2.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(50000, new Replica(50003, be3.getId(), 0, 
ReplicaState.NORMAL));
 
-        invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD, -1, -1));
+        invertedIndex.addTablet(60000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD));
         invertedIndex.addReplica(60000, new Replica(60002, be2.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(60000, new Replica(60003, be3.getId(), 0, 
ReplicaState.NORMAL));
 
-        invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD, -1, -1));
+        invertedIndex.addTablet(70000, new TabletMeta(1, 2, 3, 4, 5, 
TStorageMedium.HDD));
         invertedIndex.addReplica(70000, new Replica(70002, be2.getId(), 0, 
ReplicaState.NORMAL));
         invertedIndex.addReplica(70000, new Replica(70003, be3.getId(), 0, 
ReplicaState.NORMAL));
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
index 756510de77..88f02df4cb 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RebalancerTestUtil.java
@@ -81,7 +81,7 @@ public class RebalancerTestUtil {
         int schemaHash = olapTable.getSchemaHashByIndexId(baseIndex.getId());
 
         TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(),
-                partition.getId(), baseIndex.getId(), schemaHash, medium, -1, 
-1);
+                partition.getId(), baseIndex.getId(), schemaHash, medium);
         Tablet tablet = new Tablet(tabletId);
 
         // add tablet to olapTable
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
index d933b80982..4460fcb31f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/UnitTestUtil.java
@@ -78,7 +78,7 @@ public class UnitTestUtil {
 
         // index
         MaterializedIndex index = new MaterializedIndex(indexId, 
IndexState.NORMAL);
-        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD, -1, -1);
+        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 0, TStorageMedium.HDD);
         index.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica1);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
deleted file mode 100644
index 6e16e9e945..0000000000
--- a/fe/fe-core/src/test/java/org/apache/doris/cooldown/CooldownJobTest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.cooldown;
-
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.MaterializedIndex;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionInfo;
-import org.apache.doris.catalog.Replica;
-import org.apache.doris.catalog.Tablet;
-import org.apache.doris.catalog.TabletMeta;
-import org.apache.doris.cluster.Cluster;
-import org.apache.doris.persist.EditLog;
-import org.apache.doris.thrift.TStorageMedium;
-
-import mockit.Mocked;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-public class CooldownJobTest {
-
-    private static long jobId = 100L;
-    private static long dbId = 101L;
-    private static long tableId = 102L;
-    private static long partitionId = 103L;
-    private static long indexId = 104L;
-    private static long tabletId = 105L;
-    private static long replicaId = 106L;
-    private static long backendId = 107L;
-    private static long cooldownReplicaId = 106L;
-    private static long cooldownTerm = 109L;
-    private static long timeoutMs = 10000L;
-    private static Tablet tablet = new Tablet(tabletId);
-    private static Replica replica = new Replica(replicaId, backendId, 1, 
Replica.ReplicaState.NORMAL);
-
-    private static CooldownConf cooldownConf = new CooldownConf(dbId, tableId, 
partitionId, indexId, tabletId,
-            cooldownReplicaId, cooldownTerm);
-
-    private static List<CooldownConf> cooldownConfList = new LinkedList<>();
-
-    @Mocked
-    private EditLog editLog;
-
-    public static CooldownJob createCooldownJob() {
-        tablet.setCooldownReplicaId(cooldownReplicaId);
-        tablet.setCooldownTerm(cooldownTerm);
-        cooldownConfList.add(cooldownConf);
-        return new CooldownJob(jobId, cooldownConfList, timeoutMs);
-    }
-
-    @Before
-    public void setUp() {
-        Cluster testCluster = new Cluster("test_cluster", 0);
-        Database db = new Database(dbId, "db1");
-        db.setClusterName("test_cluster");
-        Env.getCurrentEnv().addCluster(testCluster);
-        Env.getCurrentEnv().unprotectCreateDb(db);
-        OlapTable table = new OlapTable(tableId, "testTable", new 
ArrayList<>(), KeysType.DUP_KEYS,
-                new PartitionInfo(), null);
-        table.setId(tableId);
-        db.createTable(table);
-        MaterializedIndex baseIndex = new MaterializedIndex();
-        baseIndex.setIdForRestore(indexId);
-        Partition partition = new Partition(partitionId, "part1", baseIndex, 
null);
-        table.addPartition(partition);
-        TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, 
indexId, 1, TStorageMedium.HDD,
-                cooldownReplicaId, cooldownTerm);
-        baseIndex.addTablet(tablet, tabletMeta);
-        tablet.addReplica(replica);
-        Env.getCurrentEnv().setEditLog(editLog);
-    }
-
-    @Test
-    public void testPending() throws Exception {
-        CooldownJob cooldownJob = createCooldownJob();
-        cooldownJob.runPendingJob();
-        Assert.assertEquals(CooldownJob.JobState.SEND_CONF, 
cooldownJob.jobState);
-        for (CooldownConf conf : cooldownJob.getCooldownConfList()) {
-            Assert.assertEquals(conf.getCooldownReplicaId(), replica.getId());
-        }
-        CooldownJob job1 = createCooldownJob();
-        job1.replay(cooldownJob);
-        Assert.assertEquals(CooldownJob.JobState.SEND_CONF, job1.jobState);
-        // run send job
-        cooldownJob.runSendJob();
-        Assert.assertEquals(CooldownJob.JobState.RUNNING, 
cooldownJob.jobState);
-        // run replay finish job
-        cooldownJob.jobState = CooldownJob.JobState.FINISHED;
-        job1.replay(cooldownJob);
-        Assert.assertEquals(CooldownJob.JobState.FINISHED, job1.jobState);
-    }
-
-    @Test
-    public void testCancelJob() throws Exception {
-        CooldownJob cooldownJob = createCooldownJob();
-        cooldownJob.runPendingJob();
-        Assert.assertEquals(CooldownJob.JobState.SEND_CONF, 
cooldownJob.jobState);
-        Assert.assertEquals(cooldownReplicaId, replica.getId());
-        // run send job
-        cooldownJob.runSendJob();
-        Assert.assertEquals(CooldownJob.JobState.RUNNING, 
cooldownJob.jobState);
-        // run cancel job
-        cooldownJob.cancelImpl("test cancel");
-        Assert.assertEquals(CooldownJob.JobState.CANCELLED, 
cooldownJob.jobState);
-    }
-
-}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java 
b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
index df8ed5a316..2871ff3596 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java
@@ -157,7 +157,7 @@ public abstract class DorisHttpTestCase {
         // index
         MaterializedIndex baseIndex = new MaterializedIndex(testIndexId, 
MaterializedIndex.IndexState.NORMAL);
         TabletMeta tabletMeta = new TabletMeta(testDbId, testTableId, 
testPartitionId, testIndexId, testSchemaHash,
-                TStorageMedium.HDD, -1, -1);
+                TStorageMedium.HDD);
         baseIndex.addTablet(tablet, tabletMeta);
 
         tablet.addReplica(replica1);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
index e464517d7f..5c2c005972 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/DeleteHandlerTest.java
@@ -116,7 +116,7 @@ public class DeleteHandlerTest {
         auth = AccessTestUtil.fetchAdminAccess();
         analyzer = AccessTestUtil.fetchAdminAnalyzer(false);
         db = CatalogMocker.mockDb();
-        TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID, 
TBL_ID, 0, null, -1, -1);
+        TabletMeta tabletMeta = new TabletMeta(DB_ID, TBL_ID, PARTITION_ID, 
TBL_ID, 0, null);
         invertedIndex.addTablet(TABLET_ID, tabletMeta);
         invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_1, 
BACKEND_ID_1, 0, Replica.ReplicaState.NORMAL));
         invertedIndex.addReplica(TABLET_ID, new Replica(REPLICA_ID_2, 
BACKEND_ID_2, 0, Replica.ReplicaState.NORMAL));
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index 70112f4a5c..fe98c7ef4e 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -43,8 +43,8 @@ struct TTabletInfo {
     // data size on remote storage
     16: optional Types.TSize remote_data_size
     17: optional Types.TReplicaId cooldown_replica_id
-    18: optional bool is_cooldown = false
-    19: optional i64 cooldown_term = -1
+    // 18: optional bool is_cooldown
+    19: optional i64 cooldown_term
 }
 
 struct TFinishTaskRequest {
diff --git a/regression-test/suites/cold_heat_separation/policy/alter.groovy 
b/regression-test/suites/cold_heat_separation/policy/alter.groovy
index e56c235bb6..781a619ad9 100644
--- a/regression-test/suites/cold_heat_separation/policy/alter.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/alter.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("alter_policy") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def has_resouce_policy_alter = sql """
         SHOW RESOURCES WHERE NAME = "has_resouce_policy_alter";
     """
diff --git a/regression-test/suites/cold_heat_separation/policy/create.groovy 
b/regression-test/suites/cold_heat_separation/policy/create.groovy
index 19f259065e..111dc00f27 100644
--- a/regression-test/suites/cold_heat_separation/policy/create.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/create.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("create_policy") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def has_created_1 = sql """
         SHOW RESOURCES WHERE NAME = "crete_policy_1";
     """
diff --git a/regression-test/suites/cold_heat_separation/policy/drop.groovy 
b/regression-test/suites/cold_heat_separation/policy/drop.groovy
index c04b1afdcb..f29d27c530 100644
--- a/regression-test/suites/cold_heat_separation/policy/drop.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/drop.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("drop_policy") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def storage_exist = { name ->
         def show_storage_policy = sql """
         SHOW STORAGE POLICY;
diff --git a/regression-test/suites/cold_heat_separation/policy/show.groovy 
b/regression-test/suites/cold_heat_separation/policy/show.groovy
index c18f3cab8b..5d750e0f3b 100644
--- a/regression-test/suites/cold_heat_separation/policy/show.groovy
+++ b/regression-test/suites/cold_heat_separation/policy/show.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("show_policy") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def get_storage_policy = { name ->
         def show_storage_policy = sql """
         SHOW STORAGE POLICY;
diff --git 
a/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
 
b/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
index 0c02dc2888..b793465f46 100644
--- 
a/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
+++ 
b/regression-test/suites/cold_heat_separation/use_policy/alter_table_add_policy.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("add_table_policy_by_alter_table") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def create_table_not_have_policy_result = try_sql """
         CREATE TABLE IF NOT EXISTS create_table_not_have_policy
         (
diff --git 
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
 
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
index 8f1fcba080..37adbbd2ef 100644
--- 
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
+++ 
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_partition_policy.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("create_table_use_partition_policy") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def cooldown_ttl = "10"
 
     def create_table_partition_use_not_create_policy = try_sql """
diff --git 
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
 
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
index 021ffd33e1..c94065f5ae 100644
--- 
a/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
+++ 
b/regression-test/suites/cold_heat_separation/use_policy/create_table_use_policy.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("create_table_use_policy") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def cooldown_ttl = "10"
 
     def create_table_use_not_create_policy = try_sql """
diff --git 
a/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
 
b/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
index 0998cc4b1d..cb9c807729 100644
--- 
a/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
+++ 
b/regression-test/suites/cold_heat_separation/use_policy/modify_partition_add_policy.groovy
@@ -19,8 +19,6 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 
 suite("add_table_policy_by_modify_partition") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
     Date date = new Date(System.currentTimeMillis() + 3600000)
     def cooldownTime = format.format(date)
diff --git 
a/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
 
b/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
index 45bd4a77d3..b11344198e 100644
--- 
a/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
+++ 
b/regression-test/suites/cold_heat_separation/use_policy/use_default_storage_policy.groovy
@@ -16,8 +16,6 @@
 // under the License.
 
 suite("use_default_storage_policy") {
-    sql """ADMIN SET FRONTEND CONFIG ("enable_storage_policy" = "true");"""
-
     def storage_exist = { name ->
         def show_storage_policy = sql """
         SHOW STORAGE POLICY;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to