github-actions[bot] commented on code in PR #62384:
URL: https://github.com/apache/doris/pull/62384#discussion_r3338533820


##########
fe/fe-core/src/main/java/org/apache/doris/cloud/CacheHotspotManager.java:
##########
@@ -148,10 +154,185 @@ public String toString() {
         }
     }
 
+    private static class OncePendingJobKey {
+        private final JobType jobType;
+        private final String srcName;
+        private final String dstName;
+        private final List<String> normalizedTables;
+        private final boolean force;
+
+        OncePendingJobKey(JobType jobType, String srcName, String dstName,
+                List<String> normalizedTables, boolean force) {
+            this.jobType = jobType;
+            this.srcName = normalizeNullableName(srcName);
+            this.dstName = normalizeNullableName(dstName);
+            this.normalizedTables = normalizedTables.isEmpty()
+                    ? Collections.emptyList()
+                    : Collections.unmodifiableList(new 
ArrayList<>(normalizedTables));
+            this.force = force;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof OncePendingJobKey)) {
+                return false;
+            }
+            OncePendingJobKey jobKey = (OncePendingJobKey) o;
+            return force == jobKey.force
+                    && jobType == jobKey.jobType
+                    && Objects.equals(srcName, jobKey.srcName)
+                    && Objects.equals(dstName, jobKey.dstName)
+                    && Objects.equals(normalizedTables, 
jobKey.normalizedTables);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(jobType, srcName, dstName, normalizedTables, 
force);
+        }
+
+        @Override
+        public String toString() {
+            return "OncePendingWarmUpJob{"
+                    + "jobType=" + jobType
+                    + ", src='" + srcName + '\''
+                    + ", dst='" + dstName + '\''
+                    + ", tables=" + normalizedTables
+                    + ", force=" + force
+                    + '}';
+        }
+    }
+
+    private static class RefCountedPendingCreateLock {
+        private final ReentrantLock lock = new ReentrantLock();
+
+        // Tracks holders and waiters that retained the entry before locking.
+        private volatile int refCount = 1;
+
+        void retain() {
+            ++refCount;
+        }
+
+        int release() {
+            Preconditions.checkState(refCount > 0, "once pending create lock 
ref count underflow");
+            return --refCount;
+        }
+
+        void lock() {
+            lock.lock();
+        }
+
+        void unlock() {
+            lock.unlock();
+        }
+    }
+
     // Tracks long-running jobs (event-driven and periodic).
     // Ensures only one active job exists per <source, destination, sync_mode> 
tuple.
     private Set<JobKey> repeatJobDetectionSet = ConcurrentHashMap.newKeySet();
 
+    private static String normalizeNullableName(String value) {
+        return value == null ? "" : value;
+    }
+
+    private static String normalizeTableKey(Triple<String, String, String> 
tableTriple) {
+        String dbName = normalizeNullableName(tableTriple.getLeft());
+        String tableName = normalizeNullableName(tableTriple.getMiddle());
+        String partitionName = normalizeNullableName(tableTriple.getRight());
+        if (partitionName.isEmpty()) {
+            return dbName + "." + tableName;
+        }
+        return dbName + "." + tableName + "." + partitionName;
+    }
+
+    private static List<String> normalizeTables(List<Triple<String, String, 
String>> tables) {
+        if (tables == null || tables.isEmpty()) {
+            return Collections.emptyList();
+        }
+        HashSet<String> normalizedTables = new HashSet<>();
+        for (Triple<String, String, String> table : tables) {
+            normalizedTables.add(normalizeTableKey(table));
+        }
+        List<String> sortedTables = new ArrayList<>(normalizedTables);
+        Collections.sort(sortedTables);
+        return sortedTables;
+    }
+
+    private boolean isClusterOnceCommand(WarmUpClusterCommand command) {
+        Map<String, String> properties = command.getProperties();
+        if (properties == null) {
+            return true;
+        }
+        String syncMode = properties.get("sync_mode");
+        return !"periodic".equals(syncMode) && 
!"event_driven".equals(syncMode);
+    }
+
+    private OncePendingJobKey buildOncePendingJobKey(WarmUpClusterCommand 
command) {
+        if (command.isWarmUpWithTable()) {
+            return new OncePendingJobKey(JobType.TABLE, "", 
command.getDstCluster(),
+                    normalizeTables(command.getTables()), command.isForce());
+        }

Review Comment:
   This makes TABLE once-job reuse depend only on the db/table/partition names, 
but TABLE warm-up jobs materialize `beToTabletIdBatches` immediately in 
`createJobInternal()` via `warmUpNewClusterByTable()` and persist those batches 
in the pending `CloudWarmUpJob`. A concrete failure is: create `WARM UP TABLE 
db.tbl` without a partition name, the job stays PENDING, then a new partition 
is added (or the table/partition is dropped and recreated with the same name), 
and the user submits the same warm-up command again. The new command should 
warm the current table contents, but this dedupe path returns the old job id 
whose tablet batches were computed before the metadata change, so the new 
tablets are never warmed. Please either key TABLE dedupe on resolved stable 
table/partition metadata/version (and store enough of it on the job to 
compare), or avoid reusing TABLE jobs when the current metadata may differ from 
the precomputed batches.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to