This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 6fa11115049 branch-4.1: [fix](cloud) Deduplicate pending one-shot warm 
up jobs(pick#62384) (#64236)
6fa11115049 is described below

commit 6fa11115049dc9054096246f0c6daaa1d21cb452
Author: zhengyu <[email protected]>
AuthorDate: Tue Jun 9 10:22:26 2026 +0800

    branch-4.1: [fix](cloud) Deduplicate pending one-shot warm up 
jobs(pick#62384) (#64236)
    
    Original PR: https://github.com/apache/doris/pull/62384
    Picked to: branch-4.1
    Pick branch: freemandealer:pick-branch-4.1-pr-62384
    
    Validation:
    - git diff --check
    - ./build.sh --fe
    
    Notes:
    Resolved CacheHotspotManagerTest.java conflict by using the PR
    Mockito-based test structure.
---
 .../apache/doris/cloud/CacheHotspotManager.java    | 210 +++++++++-
 .../doris/cloud/cache/CacheHotspotManagerTest.java | 435 ++++++++++++++++++---
 2 files changed, 580 insertions(+), 65 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
index cac253243ff..42826af77ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java
@@ -52,6 +52,7 @@ import org.apache.doris.thrift.THotTableMessage;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStatusCode;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
@@ -67,6 +68,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -82,6 +84,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReentrantLock;
 
 public class CacheHotspotManager extends MasterDaemon {
     public static final int MAX_SHOW_ENTRIES = 2000;
@@ -109,6 +112,9 @@ public class CacheHotspotManager extends MasterDaemon {
 
     private ConcurrentMap<Long, CloudWarmUpJob> runnableCloudWarmUpJobs = 
Maps.newConcurrentMap();
 
+    private final ConcurrentMap<OncePendingJobKey, 
RefCountedPendingCreateLock> oncePendingCreateLocks
+            = Maps.newConcurrentMap();
+
     private final ThreadPoolExecutor cloudWarmUpThreadPool = 
ThreadPoolManager.newDaemonCacheThreadPool(
             Config.max_active_cloud_warm_up_job, "cloud-warm-up-pool", true);
 
@@ -148,10 +154,185 @@ public class CacheHotspotManager extends MasterDaemon {
         }
     }
 
+    private static class OncePendingJobKey {
+        private final JobType jobType;
+        private final String srcName;
+        private final String dstName;
+        private final List<String> normalizedTables;
+        private final boolean force;
+
+        OncePendingJobKey(JobType jobType, String srcName, String dstName,
+                List<String> normalizedTables, boolean force) {
+            this.jobType = jobType;
+            this.srcName = normalizeNullableName(srcName);
+            this.dstName = normalizeNullableName(dstName);
+            this.normalizedTables = normalizedTables.isEmpty()
+                    ? Collections.emptyList()
+                    : Collections.unmodifiableList(new 
ArrayList<>(normalizedTables));
+            this.force = force;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof OncePendingJobKey)) {
+                return false;
+            }
+            OncePendingJobKey jobKey = (OncePendingJobKey) o;
+            return force == jobKey.force
+                    && jobType == jobKey.jobType
+                    && Objects.equals(srcName, jobKey.srcName)
+                    && Objects.equals(dstName, jobKey.dstName)
+                    && Objects.equals(normalizedTables, 
jobKey.normalizedTables);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(jobType, srcName, dstName, normalizedTables, 
force);
+        }
+
+        @Override
+        public String toString() {
+            return "OncePendingWarmUpJob{"
+                    + "jobType=" + jobType
+                    + ", src='" + srcName + '\''
+                    + ", dst='" + dstName + '\''
+                    + ", tables=" + normalizedTables
+                    + ", force=" + force
+                    + '}';
+        }
+    }
+
+    private static class RefCountedPendingCreateLock {
+        private final ReentrantLock lock = new ReentrantLock();
+
+        // Tracks holders and waiters that retained the entry before locking.
+        private volatile int refCount = 1;
+
+        void retain() {
+            ++refCount;
+        }
+
+        int release() {
+            Preconditions.checkState(refCount > 0, "once pending create lock 
ref count underflow");
+            return --refCount;
+        }
+
+        void lock() {
+            lock.lock();
+        }
+
+        void unlock() {
+            lock.unlock();
+        }
+    }
+
     // Tracks long-running jobs (event-driven and periodic).
     // Ensures only one active job exists per <source, destination, sync_mode> 
tuple.
     private Set<JobKey> repeatJobDetectionSet = ConcurrentHashMap.newKeySet();
 
+    private static String normalizeNullableName(String value) {
+        return value == null ? "" : value;
+    }
+
+    private static String normalizeTableKey(Triple<String, String, String> 
tableTriple) {
+        String dbName = normalizeNullableName(tableTriple.getLeft());
+        String tableName = normalizeNullableName(tableTriple.getMiddle());
+        String partitionName = normalizeNullableName(tableTriple.getRight());
+        if (partitionName.isEmpty()) {
+            return dbName + "." + tableName;
+        }
+        return dbName + "." + tableName + "." + partitionName;
+    }
+
+    private static List<String> normalizeTables(List<Triple<String, String, 
String>> tables) {
+        if (tables == null || tables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        HashSet<String> normalizedTables = new HashSet<>();
+        for (Triple<String, String, String> table : tables) {
+            normalizedTables.add(normalizeTableKey(table));
+        }
+        List<String> sortedTables = new ArrayList<>(normalizedTables);
+        Collections.sort(sortedTables);
+        return sortedTables;
+    }
+
+    private boolean isClusterOnceCommand(WarmUpClusterCommand command) {
+        Map<String, String> properties = command.getProperties();
+        if (properties == null) {
+            return true;
+        }
+        String syncMode = properties.get("sync_mode");
+        return !"periodic".equals(syncMode) && 
!"event_driven".equals(syncMode);
+    }
+
+    private OncePendingJobKey buildOncePendingJobKey(WarmUpClusterCommand 
command) {
+        if (command.isWarmUpWithTable()) {
+            return new OncePendingJobKey(JobType.TABLE, "", 
command.getDstCluster(),
+                    normalizeTables(command.getTables()), command.isForce());
+        }
+        if (!isClusterOnceCommand(command)) {
+            return null;
+        }
+        return new OncePendingJobKey(JobType.CLUSTER, command.getSrcCluster(),
+                command.getDstCluster(), Collections.emptyList(), false);
+    }
+
+    private OncePendingJobKey buildOncePendingJobKey(CloudWarmUpJob job) {
+        if (!job.isOnce()) {
+            return null;
+        }
+        if (job.getJobType() == JobType.TABLE) {
+            return new OncePendingJobKey(JobType.TABLE, "", 
job.getDstClusterName(),
+                    normalizeTables(job.tables), job.force);
+        }
+        if (job.getJobType() == JobType.CLUSTER) {
+            return new OncePendingJobKey(JobType.CLUSTER, 
job.getSrcClusterName(),
+                    job.getDstClusterName(), Collections.emptyList(), false);
+        }
+        return null;
+    }
+
+    private CloudWarmUpJob findExistingPendingOnceJob(OncePendingJobKey key) {
+        CloudWarmUpJob selectedJob = null;
+        for (CloudWarmUpJob job : cloudWarmUpJobs.values()) {
+            if (job.getJobState() != JobState.PENDING || !job.isOnce()) {
+                continue;
+            }
+            OncePendingJobKey existingKey = buildOncePendingJobKey(job);
+            if (!key.equals(existingKey)) {
+                continue;
+            }
+            if (selectedJob == null
+                    || job.getCreateTimeMs() < selectedJob.getCreateTimeMs()
+                    || (job.getCreateTimeMs() == selectedJob.getCreateTimeMs()
+                        && job.getJobId() < selectedJob.getJobId())) {
+                selectedJob = job;
+            }
+        }
+        return selectedJob;
+    }
+
+    private RefCountedPendingCreateLock 
retainOncePendingCreateLock(OncePendingJobKey key) {
+        return oncePendingCreateLocks.compute(key, (ignored, existingLock) -> {
+            if (existingLock == null) {
+                return new RefCountedPendingCreateLock();
+            }
+            existingLock.retain();
+            return existingLock;
+        });
+    }
+
+    private void releaseOncePendingCreateLock(OncePendingJobKey key, 
RefCountedPendingCreateLock lock) {
+        oncePendingCreateLocks.compute(key, (ignored, existingLock) -> {
+            Preconditions.checkState(existingLock == lock, "unexpected once 
pending create lock entry");
+            return existingLock.release() == 0 ? null : existingLock;
+        });
+    }
+
     private void registerJobForRepeatDetection(CloudWarmUpJob job, boolean 
replay) throws AnalysisException {
         if (job.isDone()) {
             return;
@@ -781,6 +962,31 @@ public class CacheHotspotManager extends MasterDaemon {
     }
 
     public long createJob(WarmUpClusterCommand stmt) throws AnalysisException {
+        OncePendingJobKey oncePendingJobKey = buildOncePendingJobKey(stmt);
+        if (oncePendingJobKey != null) {
+            RefCountedPendingCreateLock createLock = 
retainOncePendingCreateLock(oncePendingJobKey);
+            createLock.lock();
+            try {
+                CloudWarmUpJob existingPendingJob = 
findExistingPendingOnceJob(oncePendingJobKey);
+                if (existingPendingJob != null) {
+                    long existingJobId = existingPendingJob.getJobId();
+                    if (stmt.isWarmUpWithTable()) {
+                        throw new AnalysisException("Table warm up job already 
has a pending job, job id: "
+                                + existingJobId + ". Please retry later.");
+                    }
+                    LOG.info("reuse existing pending warm up job {} for key 
{}", existingJobId, oncePendingJobKey);
+                    return existingJobId;
+                }
+                return createJobInternal(stmt);
+            } finally {
+                createLock.unlock();
+                releaseOncePendingCreateLock(oncePendingJobKey, createLock);
+            }
+        }
+        return createJobInternal(stmt);
+    }
+
+    private long createJobInternal(WarmUpClusterCommand stmt) throws 
AnalysisException {
         long jobId = Env.getCurrentEnv().getNextId();
         CloudWarmUpJob warmUpJob;
         if (stmt.isWarmUpWithTable()) {
@@ -800,6 +1006,9 @@ public class CacheHotspotManager extends MasterDaemon {
                     .setJobType(JobType.CLUSTER);
 
             Map<String, String> properties = stmt.getProperties();
+            if (properties == null) {
+                properties = Collections.emptyMap();
+            }
             if ("periodic".equals(properties.get("sync_mode"))) {
                 String syncIntervalSecStr = 
properties.get("sync_interval_sec");
                 if (syncIntervalSecStr == null) {
@@ -831,7 +1040,6 @@ public class CacheHotspotManager extends MasterDaemon {
             }
             warmUpJob = builder.build();
         }
-
         addCloudWarmUpJob(warmUpJob);
 
         Env.getCurrentEnv().getEditLog().logModifyCloudWarmUpJob(warmUpJob);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
index 3bb5a93c864..e25eb98d140 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/cache/CacheHotspotManagerTest.java
@@ -17,109 +17,416 @@
 
 package org.apache.doris.cloud.cache;
 
+import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.cloud.CacheHotspotManager;
+import org.apache.doris.cloud.CloudWarmUpJob;
+import org.apache.doris.cloud.CloudWarmUpJob.JobState;
+import org.apache.doris.cloud.CloudWarmUpJob.JobType;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncEvent;
+import org.apache.doris.cloud.CloudWarmUpJob.SyncMode;
 import org.apache.doris.cloud.catalog.CloudTablet;
 import org.apache.doris.cloud.system.CloudSystemInfoService;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Triple;
+import org.apache.doris.nereids.trees.plans.commands.WarmUpClusterCommand;
+import org.apache.doris.persist.EditLog;
 import org.apache.doris.system.Backend;
 
-import mockit.Mock;
-import mockit.MockUp;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class CacheHotspotManagerTest {
     private CacheHotspotManager cacheHotspotManager;
     private CloudSystemInfoService cloudSystemInfoService;
-    private Partition partition;
+    private boolean originalRunningUnitTest;
+    private AtomicLong nextJobId;
+    private Env env;
+    private EditLog editLog;
+    private MockedStatic<Env> envMockedStatic;
+
+    @Before
+    public void setUp() {
+        originalRunningUnitTest = FeConstants.runningUnitTest;
+        FeConstants.runningUnitTest = true;
+        nextJobId = new AtomicLong(1000L);
+        env = Mockito.mock(Env.class);
+        editLog = Mockito.mock(EditLog.class);
+        Mockito.when(env.getNextId()).thenAnswer(invocation -> 
nextJobId.getAndIncrement());
+        Mockito.when(env.getEditLog()).thenReturn(editLog);
+        envMockedStatic = Mockito.mockStatic(Env.class);
+        envMockedStatic.when(Env::getCurrentEnv).thenReturn(env);
+        cloudSystemInfoService = new CloudSystemInfoService();
+        cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService);
+    }
+
+    @After
+    public void tearDown() {
+        envMockedStatic.close();
+        FeConstants.runningUnitTest = originalRunningUnitTest;
+    }
 
     @Test
     public void testWarmUpNewClusterByTable() {
-        partition = new Partition(0, null, null, null);
-        new MockUp<Partition>() {
-
-            @Mock
-            public long getDataSize(boolean singleReplica) {
-                return 10000000L;
+        cloudSystemInfoService = new CloudSystemInfoService();
+        // Use mock with CALLS_REAL_METHODS to avoid package-private access 
issues
+        // (CacheHotspotManager's helper methods are package-private)
+        cacheHotspotManager = Mockito.mock(CacheHotspotManager.class, 
invocation -> {
+            String methodName = invocation.getMethod().getName();
+            switch (methodName) {
+                case "getFileCacheCapacity":
+                    return 100L;
+                case "getPartitionsFromTriple": {
+                    List<Partition> partitions = new ArrayList<>();
+                    Partition spyPartition = Mockito.spy(new Partition(1, 
"p1", null, null));
+                    
Mockito.doReturn(10000000L).when(spyPartition).getDataSize(Mockito.anyBoolean());
+                    List<MaterializedIndex> list = new ArrayList<>();
+                    list.add(new MaterializedIndex());
+                    Mockito.doReturn(list).when(spyPartition)
+                            
.getMaterializedIndices(Mockito.any(IndexExtState.class));
+                    partitions.add(spyPartition);
+                    return partitions;
+                }
+                case "getBackendsFromCluster": {
+                    String dstClusterName = (String) invocation.getArgument(0);
+                    List<Backend> backends = new ArrayList<>();
+                    backends.add(new Backend(11, dstClusterName, 0));
+                    return backends;
+                }
+                case "getTabletsFromIndexs": {
+                    List<Tablet> list = new ArrayList<>();
+                    list.add(new CloudTablet(1001L));
+                    return list;
+                }
+                case "getTabletIdsFromBe": {
+                    Set<Long> tabletIds = new HashSet<>();
+                    tabletIds.add(1001L);
+                    return tabletIds;
+                }
+                default:
+                    return invocation.callRealMethod();
             }
+        });
+
+        long jobId = 1L;
+        String dstClusterName = "test_cluster";
+        List<Triple<String, String, String>> tables = new ArrayList<>();
+        tables.add(Triple.of("test_db", "test_table", ""));
+
+        Map<Long, List<Tablet>> result = 
cacheHotspotManager.warmUpNewClusterByTable(
+                jobId, dstClusterName, tables, true);
+        Assert.assertEquals(1, result.size());
+        Assert.assertEquals(1001L, result.get(11L).get(0).getId());
+
+        RuntimeException exception = 
Assert.assertThrows(RuntimeException.class, () ->
+                cacheHotspotManager.warmUpNewClusterByTable(jobId, 
dstClusterName, tables, false));
+        Assert.assertEquals("The cluster " + dstClusterName + " cache size is 
not enough", exception.getMessage());
+    }
+
+    @Test
+    public void testCreateTableOnceJobRejectsPendingDuplicateOrderDifference() 
throws AnalysisException {
+        long firstJobId = cacheHotspotManager.createJob(newTableStmt("dst", 
false,
+                Triple.of("db1", "tbl1", ""),
+                Triple.of("db2", "tbl2", "p1")));
+        AnalysisException exception = 
Assert.assertThrows(AnalysisException.class, () ->
+                cacheHotspotManager.createJob(newTableStmt("dst", false,
+                Triple.of("db2", "tbl2", "p1"),
+                Triple.of("db1", "tbl1", ""))));
+
+        Assert.assertTrue(exception.getMessage().contains("already has a 
pending job"));
+        Assert.assertTrue(exception.getMessage().contains("job id: " + 
firstJobId));
+        Assert.assertEquals(1, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+        Mockito.verify(env, Mockito.times(1)).getNextId();
+        Mockito.verify(editLog, 
Mockito.times(1)).logModifyCloudWarmUpJob(Mockito.any(CloudWarmUpJob.class));
+    }
+
+    @Test
+    public void testCreateTableOnceJobRejectsPendingDuplicateTableEntries() 
throws AnalysisException {
+        long firstJobId = cacheHotspotManager.createJob(newTableStmt("dst", 
false,
+                Triple.of("db1", "tbl1", ""),
+                Triple.of("db1", "tbl1", "")));
+        AnalysisException exception = 
Assert.assertThrows(AnalysisException.class, () ->
+                cacheHotspotManager.createJob(newTableStmt("dst", false, 
Triple.of("db1", "tbl1", ""))));
+
+        Assert.assertTrue(exception.getMessage().contains("already has a 
pending job"));
+        Assert.assertTrue(exception.getMessage().contains("job id: " + 
firstJobId));
+        Assert.assertEquals(1, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
+
+    @Test
+    public void testCreateTableOnceJobDoesNotDedupDifferentForce() throws 
AnalysisException {
+        long forceFalseJobId = 
cacheHotspotManager.createJob(newTableStmt("dst", false,
+                Triple.of("db1", "tbl1", "")));
+        long forceTrueJobId = 
cacheHotspotManager.createJob(newTableStmt("dst", true,
+                Triple.of("db1", "tbl1", "")));
+
+        Assert.assertNotEquals(forceFalseJobId, forceTrueJobId);
+        Assert.assertEquals(2, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
+
+    @Test
+    public void testCreateClusterOnceJobDedupesPendingJob() throws 
AnalysisException {
+        long firstJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", false));
+        long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", false));
 
-            @Mock
-            public List<MaterializedIndex> 
getMaterializedIndices(IndexExtState extState) {
-                List<MaterializedIndex> list = new ArrayList<>();
-                MaterializedIndex ind = new MaterializedIndex();
-                list.add(ind);
-                return list;
+        Assert.assertEquals(firstJobId, reusedJobId);
+        Assert.assertEquals(1, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
+
+    @Test
+    public void testCreateClusterOnceJobDedupesRegardlessOfForceFlag() throws 
AnalysisException {
+        long firstJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", false));
+        long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", true));
+
+        Assert.assertEquals(firstJobId, reusedJobId);
+        Assert.assertEquals(1, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
+
+    @Test
+    public void 
testCreateClusterOnceJobAllowsNewPendingWhenOnlyRunningExists() throws 
AnalysisException {
+        CloudWarmUpJob runningJob = newClusterJob(10L, "src", "dst", 
SyncMode.ONCE, JobState.RUNNING, 100L);
+        cacheHotspotManager.addCloudWarmUpJob(runningJob);
+
+        long newJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", false));
+
+        Assert.assertNotEquals(runningJob.getJobId(), newJobId);
+        Assert.assertEquals(2, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+        Assert.assertEquals(JobState.PENDING, 
cacheHotspotManager.getCloudWarmUpJob(newJobId).getJobState());
+    }
+
+    @Test
+    public void 
testCreateClusterOnceJobReusesPendingWhenRunningAndPendingExist() throws 
AnalysisException {
+        CloudWarmUpJob runningJob = newClusterJob(10L, "src", "dst", 
SyncMode.ONCE, JobState.RUNNING, 100L);
+        CloudWarmUpJob pendingJob = newClusterJob(11L, "src", "dst", 
SyncMode.ONCE, JobState.PENDING, 200L);
+        cacheHotspotManager.addCloudWarmUpJob(runningJob);
+        cacheHotspotManager.addCloudWarmUpJob(pendingJob);
+
+        long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", false));
+
+        Assert.assertEquals(pendingJob.getJobId(), reusedJobId);
+        Assert.assertEquals(2, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
+
+    @Test
+    public void testCreateOnceJobIgnoresFinishedHistory() throws 
AnalysisException {
+        CloudWarmUpJob finishedJob = newClusterJob(10L, "src", "dst", 
SyncMode.ONCE, JobState.FINISHED, 100L);
+        cacheHotspotManager.addCloudWarmUpJob(finishedJob);
+
+        long newJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", false));
+
+        Assert.assertNotEquals(finishedJob.getJobId(), newJobId);
+        Assert.assertEquals(2, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+        Assert.assertEquals(JobState.PENDING, 
cacheHotspotManager.getCloudWarmUpJob(newJobId).getJobState());
+    }
+
+    @Test
+    public void 
testCreateClusterOnceJobReusesOldestHistoricalPendingDuplicateAfterReplay() 
throws Exception {
+        CloudWarmUpJob newerPendingJob = newClusterJob(20L, "src", "dst", 
SyncMode.ONCE, JobState.PENDING, 200L);
+        CloudWarmUpJob olderPendingJob = newClusterJob(30L, "src", "dst", 
SyncMode.ONCE, JobState.PENDING, 100L);
+        cacheHotspotManager.replayCloudWarmUpJob(newerPendingJob);
+        cacheHotspotManager.replayCloudWarmUpJob(olderPendingJob);
+
+        long reusedJobId = cacheHotspotManager.createJob(newClusterStmt("dst", 
"src", false));
+
+        Assert.assertEquals(olderPendingJob.getJobId(), reusedJobId);
+        Assert.assertEquals(2, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
+
+    @Test
+    public void testCreateTableOnceJobRemovesLockEntryWhenCreateFails() throws 
Exception {
+        boolean previousRunningUnitTest = FeConstants.runningUnitTest;
+        FeConstants.runningUnitTest = false;
+        cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService) {
+            @Override
+            public Map<Long, List<Tablet>> warmUpNewClusterByTable(long jobId, 
String dstClusterName,
+                    List<Triple<String, String, String>> tables, boolean 
isForce) {
+                throw new RuntimeException("mock create failure");
             }
         };
 
-        cloudSystemInfoService = new CloudSystemInfoService();
-        cacheHotspotManager = new CacheHotspotManager(cloudSystemInfoService);
-        new MockUp<CacheHotspotManager>() {
+        try {
+            RuntimeException exception = 
Assert.assertThrows(RuntimeException.class, () ->
+                    cacheHotspotManager.createJob(newTableStmt("dst", false, 
Triple.of("db1", "tbl1", ""))));
 
-            @Mock
-            Long getFileCacheCapacity(String clusterName) throws 
RuntimeException {
-                return 100L;
-            }
+            Assert.assertEquals("mock create failure", exception.getMessage());
+            Assert.assertEquals(0, getOncePendingCreateLockCount());
+            Assert.assertEquals(0, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+        } finally {
+            FeConstants.runningUnitTest = previousRunningUnitTest;
+        }
+    }
 
-            @Mock
-            List<Partition> getPartitionsFromTriple(Triple<String, String, 
String> tableTriple) {
-                List<Partition> partitions = new ArrayList<>();
-                partition = new Partition(1, "p1", null, null);
-                partitions.add(partition);
-                return partitions;
+    @Test
+    public void 
testConcurrentCreateClusterOnceJobReleasesRefCountedLockAfterWaiterCompletes() 
throws Exception {
+        CountDownLatch firstCreateEntered = new CountDownLatch(1);
+        CountDownLatch allowFirstCreateToContinue = new CountDownLatch(1);
+        AtomicInteger getNextIdCalls = new AtomicInteger();
+        Mockito.when(env.getNextId()).thenAnswer(invocation -> {
+            if (getNextIdCalls.incrementAndGet() == 1) {
+                firstCreateEntered.countDown();
+                Assert.assertTrue(allowFirstCreateToContinue.await(5, 
TimeUnit.SECONDS));
             }
+            return nextJobId.getAndIncrement();
+        });
 
-            @Mock
-            List<Backend> getBackendsFromCluster(String dstClusterName) {
-                List<Backend> backends = new ArrayList<>();
-                Backend backend = new Backend(11, dstClusterName, 0);
-                backends.add(backend);
-                return backends;
-            }
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            Future<Long> firstCreate = executor.submit(() -> 
createJobWithThreadLocalEnv(
+                    newClusterStmt("dst", "src", false)));
+            Assert.assertTrue(firstCreateEntered.await(5, TimeUnit.SECONDS));
 
-            @Mock
-            public List<Tablet> getTabletsFromIndexs(List<MaterializedIndex> 
indexes) {
-                List<Tablet> list = new ArrayList<>();
-                Tablet tablet = new CloudTablet(1001L);
-                list.add(tablet);
-                return list;
-            }
+            Future<Long> secondCreate = executor.submit(() -> 
createJobWithThreadLocalEnv(
+                    newClusterStmt("dst", "src", false)));
+            waitForOncePendingCreateLockRefCount(2, 5000L);
 
-            @Mock
-            Set<Long> getTabletIdsFromBe(long beId) {
-                Set<Long> tabletIds = new HashSet<Long>();
-                tabletIds.add(1001L);
-                return tabletIds;
-            }
-        };
+            allowFirstCreateToContinue.countDown();
 
-        // Setup mock data
-        long jobId = 1L;
-        String dstClusterName = "test_cluster";
-        List<Triple<String, String, String>> tables = new ArrayList<>();
-        tables.add(Triple.of("test_db", "test_table", ""));
+            long firstJobId = firstCreate.get(5, TimeUnit.SECONDS);
+            long secondJobId = secondCreate.get(5, TimeUnit.SECONDS);
+            Assert.assertEquals(firstJobId, secondJobId);
+            Assert.assertEquals(1, getNextIdCalls.get());
+            Assert.assertEquals(1, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+            Assert.assertEquals(0, getOncePendingCreateLockCount());
+        } finally {
+            allowFirstCreateToContinue.countDown();
+            executor.shutdownNow();
+        }
+    }
 
+    @Test
+    public void testCreatePeriodicJobUnaffected() throws AnalysisException {
+        WarmUpClusterCommand periodicStmt = newClusterStmt("dst", "src", 
false, periodicProperties(60));
+        long firstJobId = cacheHotspotManager.createJob(periodicStmt);
+        AnalysisException exception = 
Assert.assertThrows(AnalysisException.class, () ->
+                cacheHotspotManager.createJob(newClusterStmt("dst", "src", 
false, periodicProperties(60))));
 
-        // force = true
-        Map<Long, List<Tablet>> result = 
cacheHotspotManager.warmUpNewClusterByTable(
-                jobId, dstClusterName, tables, true);
-        Assert.assertEquals(result.size(), 1);
-        Assert.assertEquals(result.get(11L).get(0).getId(), 1001L);
+        Assert.assertEquals(1000L, firstJobId);
+        Assert.assertTrue(exception.getMessage().contains("already has a 
runnable job"));
+        Assert.assertEquals(1, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
 
-        // force = false
-        RuntimeException exception = 
Assert.assertThrows(RuntimeException.class, () -> {
-            cacheHotspotManager.warmUpNewClusterByTable(jobId, dstClusterName, 
tables, false);
-        });
-        Assert.assertEquals("The cluster " + dstClusterName + " cache size is 
not enough", exception.getMessage());
+    @Test
+    public void testCreateEventDrivenJobUnaffected() throws AnalysisException {
+        WarmUpClusterCommand eventDrivenStmt = newClusterStmt("dst", "src", 
false, eventDrivenProperties("load"));
+        long firstJobId = cacheHotspotManager.createJob(eventDrivenStmt);
+        AnalysisException exception = 
Assert.assertThrows(AnalysisException.class, () ->
+                cacheHotspotManager.createJob(newClusterStmt("dst", "src", 
false, eventDrivenProperties("load"))));
+
+        Assert.assertEquals(1000L, firstJobId);
+        Assert.assertTrue(exception.getMessage().contains("already has a 
runnable job"));
+        Assert.assertEquals(1, 
cacheHotspotManager.getCloudWarmUpJobs().size());
+    }
+
+    private WarmUpClusterCommand newTableStmt(String dstClusterName, boolean 
force,
+            Triple<String, String, String>... tables) {
+        WarmUpClusterCommand stmt = new WarmUpClusterCommand(new ArrayList<>(),
+                null, dstClusterName, force, true);
+        for (Triple<String, String, String> table : tables) {
+            stmt.getTables().add(table);
+        }
+        return stmt;
+    }
+
+    private WarmUpClusterCommand newClusterStmt(String dstClusterName, String 
srcClusterName, boolean force) {
+        return newClusterStmt(dstClusterName, srcClusterName, force, new 
HashMap<>());
+    }
+
+    private WarmUpClusterCommand newClusterStmt(String dstClusterName, String 
srcClusterName,
+            boolean force, Map<String, String> properties) {
+        return new WarmUpClusterCommand(null, srcClusterName, dstClusterName, 
force, false, properties);
+    }
+
+    private Map<String, String> periodicProperties(long syncIntervalSec) {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("sync_mode", "periodic");
+        properties.put("sync_interval_sec", String.valueOf(syncIntervalSec));
+        return properties;
+    }
+
+    private Map<String, String> eventDrivenProperties(String syncEvent) {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("sync_mode", "event_driven");
+        properties.put("sync_event", syncEvent);
+        return properties;
+    }
+
+    private CloudWarmUpJob newClusterJob(long jobId, String srcClusterName, 
String dstClusterName,
+            SyncMode syncMode, JobState jobState, long createTimeMs) {
+        CloudWarmUpJob.Builder builder = new CloudWarmUpJob.Builder()
+                .setJobId(jobId)
+                .setSrcClusterName(srcClusterName)
+                .setDstClusterName(dstClusterName)
+                .setJobType(JobType.CLUSTER)
+                .setSyncMode(syncMode);
+        if (syncMode == SyncMode.PERIODIC) {
+            builder.setSyncInterval(60L);
+        } else if (syncMode == SyncMode.EVENT_DRIVEN) {
+            builder.setSyncEvent(SyncEvent.LOAD);
+        }
+        CloudWarmUpJob job = builder.build();
+        job.setJobState(jobState);
+        job.setCreateTimeMs(createTimeMs);
+        return job;
+    }
+
+    private int getOncePendingCreateLockCount() throws Exception {
+        return getOncePendingCreateLocks().size();
+    }
+
+    private long createJobWithThreadLocalEnv(WarmUpClusterCommand command) 
throws AnalysisException {
+        try (MockedStatic<Env> threadLocalEnvMock = 
Mockito.mockStatic(Env.class)) {
+            threadLocalEnvMock.when(Env::getCurrentEnv).thenReturn(env);
+            return cacheHotspotManager.createJob(command);
+        }
+    }
+
+    private int getOnlyOncePendingCreateLockRefCount() throws Exception {
+        Map<?, ?> locks = getOncePendingCreateLocks();
+        Assert.assertEquals(1, locks.size());
+        Object lockEntry = locks.values().iterator().next();
+        Field refCountField = 
lockEntry.getClass().getDeclaredField("refCount");
+        refCountField.setAccessible(true);
+        return refCountField.getInt(lockEntry);
+    }
+
+    private Map<?, ?> getOncePendingCreateLocks() throws Exception {
+        Field locksField = 
CacheHotspotManager.class.getDeclaredField("oncePendingCreateLocks");
+        locksField.setAccessible(true);
+        return (Map<?, ?>) locksField.get(cacheHotspotManager);
+    }
+
+    private void waitForOncePendingCreateLockRefCount(int expectedRefCount, 
long timeoutMs) throws Exception {
+        long deadlineMs = System.currentTimeMillis() + timeoutMs;
+        while (System.currentTimeMillis() < deadlineMs) {
+            if (getOncePendingCreateLockCount() == 1
+                    && getOnlyOncePendingCreateLockRefCount() == 
expectedRefCount) {
+                return;
+            }
+            Thread.sleep(10L);
+        }
+        Assert.fail("Timed out waiting for once pending create lock ref count 
" + expectedRefCount);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to