swuferhong commented on code in PR #2179:
URL: https://github.com/apache/fluss/pull/2179#discussion_r2703138735


##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -531,22 +543,96 @@ private List<SourceSplitBase> 
initPrimaryKeyTablePartitionSplits(
         List<SourceSplitBase> splits = new ArrayList<>();
         for (Partition partition : newPartitions) {
             String partitionName = partition.getPartitionName();
-            // get the table snapshot info
-            final KvSnapshots kvSnapshots;
-            try {
-                kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath, 
partitionName).get();
-            } catch (Exception e) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Failed to get table snapshot for table %s and 
partition %s",
-                                tablePath, partitionName),
-                        ExceptionUtils.stripCompletionException(e));
-            }
-            splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
+            splits.addAll(
+                    getSnapshotAndLogSplits(
+                            getLatestKvSnapshotsAndRegister(partitionName), 
partitionName));
         }
         return splits;
     }
 
+    private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String 
partitionName) {
+        long tableId;
+        Long partitionId;
+        Map<Integer, Long> snapshotIds = new HashMap<>();
+        Map<Integer, Long> logOffsets = new HashMap<>();
+
+        // retry to get the latest kv snapshots and acquire kvSnapshot lease 
util all buckets
+        // acquire success. The reason is that getLatestKvSnapshots and 
acquireKvSnapshotLease
+        // are not atomic operations, the latest kv snapshot obtained via get 
may become outdated by
+        // the time it is passed to acquire. Therefore, this logic must 
implement a retry
+        // mechanism: the unavailable tableBuckets in the 
AcquiredKvSnapshotLeaseResult returned by
+        // acquireKvSnapshotLease must be retried repeatedly until all buckets 
are successfully
+        // acquired.
+        try {
+            Set<TableBucket> remainingTableBuckets;
+            do {
+                KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
+                remainingTableBuckets = new 
HashSet<>(kvSnapshots.getTableBuckets());
+
+                tableId = kvSnapshots.getTableId();
+                partitionId = kvSnapshots.getPartitionId();
+
+                Set<TableBucket> ignoreBuckets = new HashSet<>();
+                Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+                for (TableBucket tb : remainingTableBuckets) {
+                    int bucket = tb.getBucket();
+                    OptionalLong snapshotIdOpt = 
kvSnapshots.getSnapshotId(bucket);
+                    OptionalLong logOffsetOpt = 
kvSnapshots.getLogOffset(bucket);
+                    if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
+                        bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
+                    } else {
+                        ignoreBuckets.add(tb);
+                    }
+
+                    snapshotIds.put(
+                            bucket, snapshotIdOpt.isPresent() ? 
snapshotIdOpt.getAsLong() : null);
+                    logOffsets.put(
+                            bucket, logOffsetOpt.isPresent() ? 
logOffsetOpt.getAsLong() : null);
+                }
+
+                if (!ignoreBuckets.isEmpty()) {
+                    remainingTableBuckets.removeAll(ignoreBuckets);
+                }
+
+                if (!bucketsToLease.isEmpty()) {
+                    String kvSnapshotLeaseId = 
leaseContext.getKvSnapshotLeaseId();
+                    LOG.info(
+                            "Try to acquire kv snapshot lease {} for table {}",
+                            kvSnapshotLeaseId,
+                            tablePath);
+                    remainingTableBuckets =
+                            flussAdmin
+                                    .acquireKvSnapshotLease(
+                                            kvSnapshotLeaseId,
+                                            bucketsToLease,
+                                            
leaseContext.getKvSnapshotLeaseDurationMs())
+                                    .get()
+                                    .getUnavailableTableBucketSet();
+                    if (!remainingTableBuckets.isEmpty()) {
+                        LOG.info(
+                                "Failed to acquire kv snapshot lease for table 
{}: {}. Retry to re-acquire",
+                                tablePath,
+                                remainingTableBuckets);
+                    }
+                }
+            } while (!remainingTableBuckets.isEmpty());

Review Comment:
   I will remove this loop first as we choose to reserve more than one kv 
snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to