luoyuxia commented on code in PR #2185:
URL: https://github.com/apache/fluss/pull/2185#discussion_r2646602752
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java:
##########
@@ -236,6 +277,53 @@ public void handleSourceEvent(int subtaskId, SourceEvent
sourceEvent) {
}
}
+ private Set<Long> checkTableReachMaxTieringDuration() {
+ Set<Long> tieringReachMaxDurationTables = new HashSet<>();
+ long currentTime = clock.milliseconds();
+ for (Map.Entry<Long, Long> tieringTableDeadline :
tieringTablesDeadline.entrySet()) {
+ long tableId = tieringTableDeadline.getKey();
+ long deadline = tieringTableDeadline.getValue();
+ if (deadline < currentTime) {
+ tieringReachMaxDurationTables.add(tableId);
+ }
+ }
+ return tieringReachMaxDurationTables;
+ }
+
+ private void handleReachMaxTieringDurationTables(
+ Set<Long> tieringReachMaxDurationTables, Throwable throwable) {
+ if (throwable != null) {
+ LOG.error("Fail to check tiering timeout tables.", throwable);
+ return;
+ }
+
+ for (Long reachMaxDurationTable : tieringReachMaxDurationTables) {
+ for (TieringSplit tieringSplit : pendingSplits) {
+ if (tieringSplit.getTableBucket().getTableId() ==
reachMaxDurationTable) {
+ // force ignore this tiering split since the tiering for
this table is timeout,
+ // we have to force to set to ignore the tiering split so
that the
+ // tiering source reader can ignore them directly
+ tieringSplit.forceIgnore();
+ } else {
+ // we can break directly, if found any one split's table
id is not equal to the
+ // timeout
+ // table, the following split must be not equal to the
table id
+ break;
Review Comment:
The shuffle is based on table, won't cause different tables to be
interleaved.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]