luoyuxia commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2646596284


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent 
sourceEvent) {
         }
     }
 
+    private Set<Long> checkTableReachMaxTieringDuration() {
+        Set<Long> tieringReachMaxDurationTables = new HashSet<>();
+        long currentTime = clock.milliseconds();
+        for (Map.Entry<Long, Long> tieringTableDeadline : 
tieringTablesDeadline.entrySet()) {
+            long tableId = tieringTableDeadline.getKey();
+            long deadline = tieringTableDeadline.getValue();
+            if (deadline < currentTime) {
+                tieringReachMaxDurationTables.add(tableId);
+            }
+        }
+        return tieringReachMaxDurationTables;
+    }
+
+    private void handleReachMaxTieringDurationTables(
+            Set<Long> tieringReachMaxDurationTables, Throwable throwable) {
+        if (throwable != null) {
+            LOG.error("Fail to check tiering timeout tables.", throwable);
+            return;
+        }
+
+        for (Long reachMaxDurationTable : tieringReachMaxDurationTables) {
+            for (TieringSplit tieringSplit : pendingSplits) {
+                if (tieringSplit.getTableBucket().getTableId() == 
reachMaxDurationTable) {
+                    // force ignore this tiering split since the tiering for 
this table is timeout,
+                    // we have to force to set to ignore the tiering split so 
that the
+                    // tiering source reader can ignore them directly
+                    tieringSplit.forceIgnore();
+                } else {
+                    // we can break directly, if found any one split's table 
id is not equal to the
+                    // timeout
+                    // table, the following split must be not equal to the 
table id
+                    break;

Review Comment:
   handleReachMaxTieringDurationTables,handleSplitRequest, addSplitsBack is 
done in a single thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to