luoyuxia commented on code in PR #2037:
URL: https://github.com/apache/fluss/pull/2037#discussion_r2583357153


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java:
##########
@@ -1146,50 +1172,52 @@ private <T> void 
processAccessContext(AccessContextEvent<T> event) {
         }
     }
 
-    private CommitLakeTableSnapshotResponse tryProcessCommitLakeTableSnapshot(
-            CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent) {
+    private void tryProcessCommitLakeTableSnapshot(
+            CommitLakeTableSnapshotEvent commitLakeTableSnapshotEvent,
+            CompletableFuture<CommitLakeTableSnapshotResponse> callback) {
+        // commit the lake table snapshot asynchronously
         CommitLakeTableSnapshotData commitLakeTableSnapshotData =
                 commitLakeTableSnapshotEvent.getCommitLakeTableSnapshotData();
-        CommitLakeTableSnapshotResponse response = new 
CommitLakeTableSnapshotResponse();
         Map<Long, LakeTableSnapshot> lakeTableSnapshots =
                 commitLakeTableSnapshotData.getLakeTableSnapshot();
-        for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
-                lakeTableSnapshots.entrySet()) {
-            Long tableId = lakeTableSnapshotEntry.getKey();
-
-            PbCommitLakeTableSnapshotRespForTable tableResp = 
response.addTableResp();
-            tableResp.setTableId(tableId);
-
-            try {
-                zooKeeperClient.upsertLakeTableSnapshot(tableId, 
lakeTableSnapshotEntry.getValue());
-            } catch (Exception e) {
-                ApiError error = ApiError.fromThrowable(e);
-                tableResp.setError(error.error().code(), error.message());
-            }
-        }
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        CommitLakeTableSnapshotResponse response =
+                                new CommitLakeTableSnapshotResponse();
+                        for (Map.Entry<Long, LakeTableSnapshot> 
lakeTableSnapshotEntry :
+                                lakeTableSnapshots.entrySet()) {
+                            Long tableId = lakeTableSnapshotEntry.getKey();
+
+                            PbCommitLakeTableSnapshotRespForTable tableResp =
+                                    response.addTableResp();
+                            tableResp.setTableId(tableId);
+
+                            try {
+                                TablePath tablePath = 
coordinatorContext.getTablePathById(tableId);
+                                if (tablePath == null) {
+                                    throw new RuntimeException(
+                                            String.format(
+                                                    "Failed to find table path 
for table id: %d",
+                                                    tableId));
+                                }
+                                // this involves IO operation (ZK), so we do 
it in ioExecutor
+                                zooKeeperClient.upsertLakeTableSnapshot(
+                                        tableId, tablePath, 
lakeTableSnapshotEntry.getValue());
+                            } catch (Exception e) {
+                                ApiError error = ApiError.fromThrowable(e);
+                                tableResp.setError(error.error().code(), 
error.message());
+                            }
+                        }
 
-        // send notify lakehouse data request to all replicas.
-        coordinatorRequestBatch.newBatch();
-        for (Map.Entry<Long, LakeTableSnapshot> lakeTableSnapshotEntry :
-                lakeTableSnapshots.entrySet()) {
-            LakeTableSnapshot lakeTableSnapshot = 
lakeTableSnapshotEntry.getValue();
-            for (Map.Entry<TableBucket, Long> bucketLogEndOffsetEntry :
-                    lakeTableSnapshot.getBucketLogEndOffset().entrySet()) {
-                TableBucket tb = bucketLogEndOffsetEntry.getKey();
-                coordinatorContext
-                        
.getBucketLeaderAndIsr(bucketLogEndOffsetEntry.getKey())
-                        .ifPresent(
-                                leaderAndIsr ->
-                                        coordinatorRequestBatch
-                                                
.addNotifyLakeTableOffsetRequestForTableServers(
-                                                        
coordinatorContext.getAssignment(tb),
-                                                        tb,
-                                                        lakeTableSnapshot));
-            }
-        }
-        coordinatorRequestBatch.sendNotifyLakeTableOffsetRequest(
-                coordinatorContext.getCoordinatorEpoch());
-        return response;
+                        // send notify lakehouse data request to all replicas 
via coordinator event
+                        coordinatorEventManager.put(
+                                new 
NotifyLakeTableOffsetEvent(lakeTableSnapshots));

Review Comment:
   > @luoyuxia, is the snapshot ID still a `long` value and guaranteed to be 
unique? Is it the same in Delta Lake?
   
   Yes, the snapshot ID still a long value and guaranteed to be unique. Delta 
Lake's snapshot id(called version in delta) is strictly incremental.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to