This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 08758e929 [kv] Commit kv snapshot to zk should be done asynchronously
and not block coordinator (#1375)
08758e929 is described below
commit 08758e929ca6b100414b1760e9a010d49f86b60e
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Sep 9 11:20:53 2025 +0800
[kv] Commit kv snapshot to zk should be done asynchronously and not block
coordinator (#1375)
---
.../coordinator/CompletedSnapshotStoreManager.java | 17 ++++---
.../coordinator/CoordinatorEventProcessor.java | 56 +++++++++++++++-------
.../event/NotifyKvSnapshotOffsetEvent.java | 41 ++++++++++++++++
.../coordinator/CoordinatorEventProcessorTest.java | 7 ++-
4 files changed, 96 insertions(+), 25 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
index 418cca954..0ea715930 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
@@ -26,17 +26,18 @@ import
org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
import org.apache.fluss.server.kv.snapshot.SharedKvFileRegistry;
import
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.utils.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
@@ -48,13 +49,14 @@ import static
org.apache.fluss.utils.Preconditions.checkNotNull;
* {@link CompletedSnapshotStore} not exist for a {@link TableBucket}, it will
create a new {@link
* CompletedSnapshotStore} for it.
*/
-@NotThreadSafe
+@ThreadSafe
public class CompletedSnapshotStoreManager {
private static final Logger LOG =
LoggerFactory.getLogger(CompletedSnapshotStoreManager.class);
private final int maxNumberOfSnapshotsToRetain;
private final ZooKeeperClient zooKeeperClient;
- private final Map<TableBucket, CompletedSnapshotStore>
bucketCompletedSnapshotStores;
+ private final ConcurrentHashMap<TableBucket, CompletedSnapshotStore>
+ bucketCompletedSnapshotStores;
private final Executor ioExecutor;
private final Function<ZooKeeperClient, CompletedSnapshotHandleStore>
makeZookeeperCompletedSnapshotHandleStore;
@@ -67,7 +69,7 @@ public class CompletedSnapshotStoreManager {
maxNumberOfSnapshotsToRetain > 0,
"maxNumberOfSnapshotsToRetain must be positive");
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
this.zooKeeperClient = zooKeeperClient;
- this.bucketCompletedSnapshotStores = new HashMap<>();
+ this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
this.ioExecutor = ioExecutor;
this.makeZookeeperCompletedSnapshotHandleStore =
ZooKeeperCompletedSnapshotHandleStore::new;
}
@@ -83,7 +85,7 @@ public class CompletedSnapshotStoreManager {
maxNumberOfSnapshotsToRetain > 0,
"maxNumberOfSnapshotsToRetain must be positive");
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
this.zooKeeperClient = zooKeeperClient;
- this.bucketCompletedSnapshotStores = new HashMap<>();
+ this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
this.ioExecutor = ioExecutor;
this.makeZookeeperCompletedSnapshotHandleStore =
makeZookeeperCompletedSnapshotHandleStore;
}
@@ -191,7 +193,8 @@ public class CompletedSnapshotStoreManager {
ioExecutor);
}
- public Map<TableBucket, CompletedSnapshotStore>
getBucketCompletedSnapshotStores() {
+ @VisibleForTesting
+ Map<TableBucket, CompletedSnapshotStore>
getBucketCompletedSnapshotStores() {
return bucketCompletedSnapshotStores;
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index ffc6192b7..6533565a0 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -56,6 +56,7 @@ import
org.apache.fluss.server.coordinator.event.DropTableEvent;
import org.apache.fluss.server.coordinator.event.EventProcessor;
import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent;
import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
+import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
import
org.apache.fluss.server.coordinator.event.NotifyLeaderAndIsrResponseReceivedEvent;
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
import
org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
@@ -117,6 +118,7 @@ public class CoordinatorEventProcessor implements
EventProcessor {
private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorEventProcessor.class);
private final ZooKeeperClient zooKeeperClient;
+ private final ExecutorService ioExecutor;
private final CoordinatorContext coordinatorContext;
private final ReplicaStateMachine replicaStateMachine;
private final TableBucketStateMachine tableBucketStateMachine;
@@ -190,6 +192,7 @@ public class CoordinatorEventProcessor implements
EventProcessor {
this.lakeTableTieringManager = lakeTableTieringManager;
this.coordinatorMetricGroup = coordinatorMetricGroup;
this.internalListenerName =
conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME);
+ this.ioExecutor = ioExecutor;
}
public CoordinatorEventManager getCoordinatorEventManager() {
@@ -455,9 +458,10 @@ public class CoordinatorEventProcessor implements
EventProcessor {
adjustIsrReceivedEvent.getLeaderAndIsrMap())));
} else if (event instanceof CommitKvSnapshotEvent) {
CommitKvSnapshotEvent commitKvSnapshotEvent =
(CommitKvSnapshotEvent) event;
- CompletableFuture<CommitKvSnapshotResponse> callback =
- commitKvSnapshotEvent.getRespCallback();
- completeFromCallable(callback, () ->
tryProcessCommitKvSnapshot(commitKvSnapshotEvent));
+ tryProcessCommitKvSnapshot(
+ commitKvSnapshotEvent,
commitKvSnapshotEvent.getRespCallback());
+ } else if (event instanceof NotifyKvSnapshotOffsetEvent) {
+ processNotifyKvSnapshotOffsetEvent((NotifyKvSnapshotOffsetEvent)
event);
} else if (event instanceof CommitRemoteLogManifestEvent) {
CommitRemoteLogManifestEvent commitRemoteLogManifestEvent =
(CommitRemoteLogManifestEvent) event;
@@ -936,21 +940,40 @@ public class CoordinatorEventProcessor implements
EventProcessor {
}
}
- private CommitKvSnapshotResponse
tryProcessCommitKvSnapshot(CommitKvSnapshotEvent event)
- throws Exception {
+ private void tryProcessCommitKvSnapshot(
+ CommitKvSnapshotEvent event,
CompletableFuture<CommitKvSnapshotResponse> callback) {
// validate
- validateFencedEvent(event);
+ try {
+ validateFencedEvent(event);
+ } catch (Exception e) {
+ callback.completeExceptionally(e);
+ return;
+ }
+ // commit the kv snapshot asynchronously
+ ioExecutor.execute(
+ () -> {
+ try {
+ TableBucket tb = event.getTableBucket();
+ CompletedSnapshot completedSnapshot =
+
event.getAddCompletedSnapshotData().getCompletedSnapshot();
+ // add completed snapshot
+ CompletedSnapshotStore completedSnapshotStore =
+
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
+ // this involves IO operation (ZK), so we do it in
ioExecutor
+ completedSnapshotStore.add(completedSnapshot);
+ coordinatorEventManager.put(
+ new NotifyKvSnapshotOffsetEvent(
+ tb, completedSnapshot.getLogOffset()));
+ callback.complete(new CommitKvSnapshotResponse());
+ } catch (Exception e) {
+ callback.completeExceptionally(e);
+ }
+ });
+ }
+ private void
processNotifyKvSnapshotOffsetEvent(NotifyKvSnapshotOffsetEvent event) {
TableBucket tb = event.getTableBucket();
- CompletedSnapshot completedSnapshot =
- event.getAddCompletedSnapshotData().getCompletedSnapshot();
- // add completed snapshot
- CompletedSnapshotStore completedSnapshotStore =
-
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
- completedSnapshotStore.add(completedSnapshot);
-
- // send notify snapshot request to all replicas.
- // TODO: this should be moved after sending
AddCompletedSnapshotResponse
+ long logOffset = event.getLogOffset();
coordinatorRequestBatch.newBatch();
coordinatorContext
.getBucketLeaderAndIsr(tb)
@@ -961,10 +984,9 @@ public class CoordinatorEventProcessor implements
EventProcessor {
coordinatorContext.getFollowers(
tb,
leaderAndIsr.leader()),
tb,
-
completedSnapshot.getLogOffset()));
+ logOffset));
coordinatorRequestBatch.sendNotifyKvSnapshotOffsetRequest(
coordinatorContext.getCoordinatorEpoch());
- return new CommitKvSnapshotResponse();
}
private CommitRemoteLogManifestResponse tryProcessCommitRemoteLogManifest(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java
new file mode 100644
index 000000000..da6a42067
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/NotifyKvSnapshotOffsetEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.metadata.TableBucket;
+
+/** An event for notify kv snapshot offset to local tablet servers. */
+public class NotifyKvSnapshotOffsetEvent implements CoordinatorEvent {
+
+ private final TableBucket tableBucket;
+ private final long logOffset;
+
+ public NotifyKvSnapshotOffsetEvent(TableBucket tableBucket, long
logOffset) {
+ this.tableBucket = tableBucket;
+ this.logOffset = logOffset;
+ }
+
+ public TableBucket getTableBucket() {
+ return tableBucket;
+ }
+
+ public long getLogOffset() {
+ return logOffset;
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index b448c4126..82cd8f1a3 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -763,7 +763,11 @@ class CoordinatorEventProcessorTest {
completedSnapshot, coordinatorEpoch,
bucketLeaderEpoch),
responseCompletableFuture2));
responseCompletableFuture2.get();
- verifyReceiveRequestExceptFor(3, leader,
NotifyKvSnapshotOffsetRequest.class);
+ retry(
+ Duration.ofMinutes(1),
+ () ->
+ verifyReceiveRequestExceptFor(
+ 3, leader,
NotifyKvSnapshotOffsetRequest.class));
}
@Test
@@ -1082,6 +1086,7 @@ class CoordinatorEventProcessorTest {
.hasMessage("No requests pending for inbound
response.");
} else {
// should contain NotifyKvSnapshotOffsetRequest
+
assertThat(testTabletServerGateway.pendingRequestSize()).isNotZero();
assertThat(testTabletServerGateway.getRequest(0)).isInstanceOf(requestClass);
}
}