wuchong commented on code in PR #2434:
URL: https://github.com/apache/fluss/pull/2434#discussion_r2725142077
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -588,6 +594,83 @@ public CompletableFuture<Void> cancelRebalance(@Nullable
String rebalanceId) {
return gateway.cancelRebalance(request).thenApply(r -> null);
}
+ //
==================================================================================
+ // Producer Offset Management APIs (for Exactly-Once Semantics)
+ //
==================================================================================
+
+ @Override
+ public CompletableFuture<RegisterResult> registerProducerOffsets(
+ String producerId, Map<TableBucket, Long> offsets) {
+ checkNotNull(producerId, "producerId must not be null");
+ checkNotNull(offsets, "offsets must not be null");
+
+ RegisterProducerOffsetsRequest request = new
RegisterProducerOffsetsRequest();
+ request.setProducerId(producerId);
+
+ // Convert TableBucket offsets to PbTableBucketOffset
+ for (Map.Entry<TableBucket, Long> entry : offsets.entrySet()) {
+ TableBucket bucket = entry.getKey();
+ PbTableBucketOffset pbOffset =
+ request.addBucketOffset()
+ .setTableId(bucket.getTableId())
+ .setBucketId(bucket.getBucket())
+ .setOffset(entry.getValue());
+ if (bucket.getPartitionId() != null) {
+ pbOffset.setPartitionId(bucket.getPartitionId());
+ }
+ }
+
+ return gateway.registerProducerOffsets(request)
+ .thenApply(
+ response -> {
+ int code =
+ response.hasResult()
+ ? response.getResult()
+ : RegisterResult.CREATED.getCode();
+ return RegisterResult.fromCode(code);
+ });
+ }
+
+ @Override
+ public CompletableFuture<ProducerOffsetsResult> getProducerOffsets(String
producerId) {
+ checkNotNull(producerId, "producerId must not be null");
+
+ GetProducerOffsetsRequest request = new GetProducerOffsetsRequest();
+ request.setProducerId(producerId);
+
+ return gateway.getProducerOffsets(request)
+ .thenApply(
+ response -> {
+ if (!response.hasProducerId()) {
+ return null;
+ }
+
+ Map<Long, Map<TableBucket, Long>> tableOffsets =
new HashMap<>();
+ for (PbProducerTableOffsets pbTableOffsets :
+ response.getTableOffsetsList()) {
+ long tableId = pbTableOffsets.getTableId();
+ tableOffsets.put(
+ tableId,
+
ClientRpcMessageUtils.toTableBucketOffsets(pbTableOffsets));
+ }
+
+ long expirationTime =
+ response.hasExpirationTime() ?
response.getExpirationTime() : 0;
+ return new ProducerOffsetsResult(
+ response.getProducerId(), tableOffsets,
expirationTime);
Review Comment:
nit: Move this conversion to `ClientRpcMessageUtils` to improve the
readability of `FlussAdmin`.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1062,4 +1062,53 @@ message PbBucketOffset {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int64 log_end_offset = 4;
+}
+
+//
------------------------------------------------------------------------------------------
+// Producer Offset Snapshot Management
+//
------------------------------------------------------------------------------------------
+
+// Register producer offset snapshot request and response
+message RegisterProducerOffsetsRequest {
Review Comment:
Move the `..Request` and `..Response` messages before Inner classes (L622).
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManager.java:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileStatus;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manager for producer offset snapshots lifecycle.
+ *
+ * <p>This manager handles:
+ *
+ * <ul>
+ * <li>Registering new producer snapshots with atomic "check and register"
semantics
+ * <li>Retrieving producer snapshot offsets
+ * <li>Deleting producer snapshots
+ * <li>Periodic cleanup of expired snapshots
+ * <li>Cleanup of orphan files in remote storage
+ * </ul>
+ *
+ * <p>The manager uses a background scheduler to periodically clean up expired
snapshots and orphan
+ * files. The cleanup interval and snapshot TTL are configurable.
+ *
+ * @see ProducerSnapshotStore for low-level storage operations
+ */
+public class ProducerSnapshotManager implements AutoCloseable {
Review Comment:
renaming:
- `ProducerSnapshotManager` -> `ProducerOffsetsManager`
- `ProducerSnapshotStore` -> `ProducerOffsetsStore`
- `ProducerSnapshot` -> `ProducerOffsets`
- `ProducerSnapshotJsonSerde` -> `ProducerOffsetsJsonSerde`
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -983,4 +1008,127 @@ public FlussPrincipal getFlussPrincipal() {
return flussPrincipal;
}
}
+
+ //
==================================================================================
+ // Producer Offset Management APIs (for Exactly-Once Semantics)
+ //
==================================================================================
+
+ @Override
+ public CompletableFuture<RegisterProducerOffsetsResponse>
registerProducerOffsets(
+ RegisterProducerOffsetsRequest request) {
+ // Authorization: require WRITE permission on cluster
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.WRITE,
Resource.cluster());
+ }
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ String producerId = request.getProducerId();
+ Map<TableBucket, Long> offsets = new HashMap<>();
+
+ // Convert PbTableBucketOffset to TableBucket offsets
+ for (PbTableBucketOffset pbOffset :
request.getBucketOffsetsList()) {
+ Long partitionId =
+ pbOffset.hasPartitionId() ?
pbOffset.getPartitionId() : null;
+ TableBucket bucket =
+ new TableBucket(
+ pbOffset.getTableId(),
+ partitionId,
+ pbOffset.getBucketId());
+ offsets.put(bucket, pbOffset.getOffset());
+ }
+
+ // Use custom TTL if provided, otherwise use default
(null means use
+ // manager's default)
+ Long ttlMs = request.hasTtlMs() ? request.getTtlMs() :
null;
+
+ // Register with atomic "check and register" semantics
+ boolean created =
+ producerSnapshotManager.registerSnapshot(
+ producerId, offsets, ttlMs);
+
+ RegisterProducerOffsetsResponse response =
+ new RegisterProducerOffsetsResponse();
+ response.setResult(
+ created
+ ? RegisterResult.CREATED.getCode()
+ :
RegisterResult.ALREADY_EXISTS.getCode());
+ return response;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to register producer offsets for
producer "
+ + request.getProducerId(),
+ e);
+ }
+ },
+ ioExecutor);
+ }
+
+ @Override
+ public CompletableFuture<GetProducerOffsetsResponse> getProducerOffsets(
+ GetProducerOffsetsRequest request) {
+ // Authorization: require READ permission on cluster
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.READ,
Resource.cluster());
Review Comment:
ditto. I think we should handle this like `listTables`.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -983,4 +1008,127 @@ public FlussPrincipal getFlussPrincipal() {
return flussPrincipal;
}
}
+
+ //
==================================================================================
+ // Producer Offset Management APIs (for Exactly-Once Semantics)
+ //
==================================================================================
+
+ @Override
+ public CompletableFuture<RegisterProducerOffsetsResponse>
registerProducerOffsets(
+ RegisterProducerOffsetsRequest request) {
+ // Authorization: require WRITE permission on cluster
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.WRITE,
Resource.cluster());
+ }
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ String producerId = request.getProducerId();
+ Map<TableBucket, Long> offsets = new HashMap<>();
+
+ // Convert PbTableBucketOffset to TableBucket offsets
+ for (PbTableBucketOffset pbOffset :
request.getBucketOffsetsList()) {
+ Long partitionId =
+ pbOffset.hasPartitionId() ?
pbOffset.getPartitionId() : null;
+ TableBucket bucket =
+ new TableBucket(
+ pbOffset.getTableId(),
+ partitionId,
+ pbOffset.getBucketId());
+ offsets.put(bucket, pbOffset.getOffset());
+ }
+
+ // Use custom TTL if provided, otherwise use default
(null means use
+ // manager's default)
+ Long ttlMs = request.hasTtlMs() ? request.getTtlMs() :
null;
+
+ // Register with atomic "check and register" semantics
+ boolean created =
+ producerSnapshotManager.registerSnapshot(
+ producerId, offsets, ttlMs);
+
+ RegisterProducerOffsetsResponse response =
+ new RegisterProducerOffsetsResponse();
+ response.setResult(
+ created
+ ? RegisterResult.CREATED.getCode()
+ :
RegisterResult.ALREADY_EXISTS.getCode());
+ return response;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to register producer offsets for
producer "
+ + request.getProducerId(),
+ e);
+ }
+ },
+ ioExecutor);
+ }
+
+ @Override
+ public CompletableFuture<GetProducerOffsetsResponse> getProducerOffsets(
+ GetProducerOffsetsRequest request) {
+ // Authorization: require READ permission on cluster
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.READ,
Resource.cluster());
+ }
+
+ String producerId = request.getProducerId();
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ Optional<ProducerSnapshot> optSnapshot =
+
producerSnapshotManager.getSnapshotMetadata(producerId);
+ if (!optSnapshot.isPresent()) {
+ return new GetProducerOffsetsResponse();
+ }
+
+ ProducerSnapshot snapshot = optSnapshot.get();
+ Map<TableBucket, Long> allOffsets =
+ producerSnapshotManager.getOffsets(producerId);
+ Map<Long, Map<TableBucket, Long>> offsetsByTable =
+ groupOffsetsByTableId(allOffsets);
+
+ GetProducerOffsetsResponse response = new
GetProducerOffsetsResponse();
+ response.setProducerId(producerId);
+
response.setExpirationTime(snapshot.getExpirationTime());
+
+ for (Map.Entry<Long, Map<TableBucket, Long>> entry :
+ offsetsByTable.entrySet()) {
+ addTableOffsetsToResponse(response,
entry.getKey(), entry.getValue());
+ }
+
+ return response;
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to get producer offsets for producer "
+ producerId, e);
+ }
+ },
+ ioExecutor);
+ }
+
+ @Override
+ public CompletableFuture<DeleteProducerOffsetsResponse>
deleteProducerOffsets(
+ DeleteProducerOffsetsRequest request) {
+ // Authorization: require WRITE permission on cluster
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.WRITE,
Resource.cluster());
Review Comment:
ditto. We should check whether the user has WIRTE permission on all the
tableIds under this producer id.
##########
fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java:
##########
@@ -542,4 +542,50 @@ public static List<ConfigEntry>
toConfigEntries(List<PbDescribeConfig> pbDescrib
pbDescribeConfig.getConfigSource())))
.collect(Collectors.toList());
}
+
+ /**
+ * Parses a PbTableOffsets into a map of TableBucket to offset.
+ *
+ * @param pbTableOffsets the protobuf table offsets
+ * @return map of TableBucket to offset
+ */
+ public static Map<TableBucket, Long> toTableBucketOffsets(
Review Comment:
not used, can be removed?
##########
fluss-common/src/main/java/org/apache/fluss/rpc/messages/RegisterResult.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.rpc.messages;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+/**
+ * Result of producer offset registration.
+ *
+ * <p>This enum indicates whether a producer offset snapshot was newly created
or already existed
+ * when calling registerProducerOffsets API.
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public enum RegisterResult {
Review Comment:
`RegisterResult` is not an RPC message that is (de)serialized through the
netty RPC. I think a better package location for this should be
`org.apache.fluss.client.admin` as it is used as a return type of `Admin`
method.
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -1062,4 +1062,53 @@ message PbBucketOffset {
optional int64 partition_id = 1;
required int32 bucket_id = 2;
optional int64 log_end_offset = 4;
+}
+
+//
------------------------------------------------------------------------------------------
+// Producer Offset Snapshot Management
+//
------------------------------------------------------------------------------------------
+
+// Register producer offset snapshot request and response
+message RegisterProducerOffsetsRequest {
+ required string producer_id = 1;
+ repeated PbTableBucketOffset bucket_offsets = 2;
+ optional int64 ttl_ms = 3;
+}
+
+message RegisterProducerOffsetsResponse {
+ // Result of registration: 0 = CREATED (new snapshot), 1 = ALREADY_EXISTS
(snapshot existed)
+ optional int32 result = 1;
+}
+
+// Get producer offset snapshot request and response
+message GetProducerOffsetsRequest {
+ required string producer_id = 1;
+}
+
+message GetProducerOffsetsResponse {
+ optional string producer_id = 1;
+ optional int64 expiration_time = 2;
+ repeated PbProducerTableOffsets table_offsets = 3;
+}
+
+// Delete producer offset snapshot request and response
+message DeleteProducerOffsetsRequest {
+ required string producer_id = 1;
+}
+
+message DeleteProducerOffsetsResponse {
+}
+
+// Helper message for table offsets in producer snapshot (without table_path)
+message PbProducerTableOffsets {
+ required int64 table_id = 1;
+ repeated PbBucketOffset bucket_offsets = 2;
+}
+
+// Helper message for bucket offsets in producer snapshot
+message PbTableBucketOffset {
Review Comment:
Can we reuse the `PbProducerTableOffsets` in
`RegisterProducerOffsetsRequest`? Or vice-versa, reuse `PbTableBucketOffset` in
`GetProducerOffsetsResponse`. This can avoid introducing too many PB messages.
##########
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java:
##########
@@ -354,6 +354,31 @@ public class ConfigOptions {
+ "The default value is 10. "
+ "This option is deprecated. Please use
server.io-pool.size instead.");
+ /**
+ * The TTL (time-to-live) for producer offset snapshots. Snapshots older
than this TTL will be
+ * automatically cleaned up by the coordinator server.
+ */
+ public static final ConfigOption<Duration>
COORDINATOR_PRODUCER_SNAPSHOT_TTL =
+ key("coordinator.producer-snapshot.ttl")
Review Comment:
How about using `coordinator.producer-offsets.ttl` and
`coordinator.producer-offsets.cleanup-interval`? These names align more closely
with the established concepts of consumer/producer offsets.
The term "snapshot" can be misleading, as it typically refers to data
snapshots, such as KV snapshots or lake snapshots. While we currently store
producer offsets in a global snapshot-like structure, the long-term design may
allow each producer to persist its offsets independently. Using offset-centric
naming now better reflects the semantic intent and keeps the configuration
consistent with future extensibility.
##########
fluss-common/src/test/java/org/apache/fluss/utils/RetryUtilsTest.java:
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Unit tests for {@link RetryUtils}. */
+class RetryUtilsTest {
+
+ @Test
+ void testSuccessOnFirstAttempt() throws IOException {
+ AtomicInteger attempts = new AtomicInteger(0);
+
+ String result =
+ RetryUtils.executeWithRetry(
+ () -> {
+ attempts.incrementAndGet();
+ return "success";
+ },
+ "testOp",
+ 3,
+ 10,
+ 100,
+ e -> true);
+
+ assertThat(result).isEqualTo("success");
+ assertThat(attempts.get()).isEqualTo(1);
+ }
+
+ @Test
+ void testSuccessAfterRetries() throws IOException {
+ AtomicInteger attempts = new AtomicInteger(0);
+
+ String result =
+ RetryUtils.executeWithRetry(
+ () -> {
+ int attempt = attempts.incrementAndGet();
+ if (attempt < 3) {
+ throw new IOException("transient failure");
+ }
+ return "success";
+ },
+ "testOp",
+ 5,
+ 10,
+ 100,
+ e -> e instanceof IOException);
+
+ assertThat(result).isEqualTo("success");
+ assertThat(attempts.get()).isEqualTo(3);
+ }
+
+ @Test
+ void testExhaustedRetries() {
+ AtomicInteger attempts = new AtomicInteger(0);
+
+ assertThatThrownBy(
+ () ->
+ RetryUtils.executeWithRetry(
+ () -> {
+ attempts.incrementAndGet();
+ throw new IOException("persistent
failure");
+ },
+ "testOp",
+ 3,
+ 10,
+ 100,
+ e -> e instanceof IOException))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("testOp failed after 3 attempts")
+ .hasCauseInstanceOf(IOException.class);
+
+ assertThat(attempts.get()).isEqualTo(3);
+ }
+
+ @Test
+ void testNonRetryableException() {
+ AtomicInteger attempts = new AtomicInteger(0);
+
+ assertThatThrownBy(
+ () ->
+ RetryUtils.executeWithRetry(
+ () -> {
+ attempts.incrementAndGet();
+ throw new
IllegalArgumentException("bad argument");
+ },
+ "testOp",
+ 5,
+ 10,
+ 100,
+ e -> e instanceof IOException))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("testOp failed")
+ .hasCauseInstanceOf(IllegalArgumentException.class);
+
+ // Should fail immediately without retries
+ assertThat(attempts.get()).isEqualTo(1);
+ }
+
+ @Test
+ void testIOExceptionPassedThrough() {
+ IOException originalException = new IOException("original");
+
+ assertThatThrownBy(
+ () ->
+ RetryUtils.executeWithRetry(
+ () -> {
+ throw originalException;
+ },
+ "testOp",
+ 1,
+ 10,
+ 100,
+ e -> false))
+ .isInstanceOf(IOException.class)
+ .isSameAs(originalException);
+ }
+
+ @Test
+ void testExecuteIOWithRetry() throws IOException {
+ AtomicInteger attempts = new AtomicInteger(0);
+
+ String result =
+ RetryUtils.executeIOWithRetry(
+ () -> {
+ int attempt = attempts.incrementAndGet();
+ if (attempt < 2) {
+ throw new IOException("transient");
+ }
+ return "done";
+ },
+ "ioOp",
+ 3,
+ 10,
+ 100);
+
+ assertThat(result).isEqualTo("done");
+ assertThat(attempts.get()).isEqualTo(2);
+ }
+
+ @Test
+ void testExecuteIOWithRetryNonIOException() {
+ AtomicInteger attempts = new AtomicInteger(0);
+
+ assertThatThrownBy(
+ () ->
+ RetryUtils.executeIOWithRetry(
+ () -> {
+ attempts.incrementAndGet();
+ throw new RuntimeException("not
IO");
+ },
+ "ioOp",
+ 3,
+ 10,
+ 100))
+ .isInstanceOf(IOException.class)
+ .hasCauseInstanceOf(RuntimeException.class);
+
+ // RuntimeException is not retryable for executeIOWithRetry
+ assertThat(attempts.get()).isEqualTo(1);
+ }
+
+ @Test
+ void testBackoffCapped() throws IOException {
+ AtomicInteger attempts = new AtomicInteger(0);
+ long startTime = System.currentTimeMillis();
+
+ RetryUtils.executeWithRetry(
+ () -> {
+ int attempt = attempts.incrementAndGet();
+ if (attempt < 4) {
+ throw new IOException("fail");
+ }
+ return "ok";
+ },
+ "backoffTest",
+ 5,
+ 10, // initial backoff
+ 50, // max backoff (caps exponential growth)
+ e -> e instanceof IOException);
+
+ long elapsed = System.currentTimeMillis() - startTime;
+ // 3 retries: 10ms + 20ms + 40ms (capped to 50ms) = 80ms minimum
+ // But with cap: 10ms + 20ms + 50ms = 80ms minimum
+ // Allow some tolerance for test execution
+ assertThat(elapsed).isGreaterThanOrEqualTo(70);
+ assertThat(attempts.get()).isEqualTo(4);
+ }
+
+ @Test
+ void testInterruptedExceptionFromOperation() {
+ AtomicInteger attempts = new AtomicInteger(0);
+
+ assertThatThrownBy(
+ () ->
+ RetryUtils.executeWithRetry(
+ () -> {
+ attempts.incrementAndGet();
+ throw new
InterruptedException("interrupted");
+ },
+ "interruptOp",
+ 5,
+ 10,
+ 100,
+ e -> true))
+ .isInstanceOf(InterruptedIOException.class)
+ .hasMessageContaining("interruptOp was interrupted on attempt
1");
+
+ // Should fail immediately without retries
+ assertThat(attempts.get()).isEqualTo(1);
+ // Verify interrupt status is preserved
+ assertThat(Thread.currentThread().isInterrupted()).isTrue();
+ // Clear interrupt status for other tests
+ Thread.interrupted();
+ }
+
+ @Test
+ void testInterruptedDuringBackoff() throws Exception {
Review Comment:
This method is unstable. It fails in my local env when repeats 20 times. I
think we can remove this test as it is not a critical feature.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStore.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileStatus;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+import org.apache.fluss.utils.IOUtils;
+import org.apache.fluss.utils.RetryUtils;
+import org.apache.fluss.utils.json.TableBucketOffsets;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * Low-level storage operations for producer offset snapshots.
+ *
+ * <p>This class handles:
+ *
+ * <ul>
+ * <li>Writing offset data to remote storage (OSS/S3/HDFS)
+ * <li>Reading offset data from remote storage
+ * <li>Registering snapshot metadata in ZooKeeper
+ * <li>Deleting snapshots from both ZK and remote storage
+ * </ul>
+ *
+ * <p>This class is stateless and thread-safe. Lifecycle management should be
handled by {@link
+ * ProducerSnapshotManager}.
+ */
+public class ProducerSnapshotStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ProducerSnapshotStore.class);
+
+ private static final int MAX_RETRIES = 3;
+ private static final long INITIAL_BACKOFF_MS = 100;
+ private static final long MAX_BACKOFF_MS = 1000;
+
+ private final ZooKeeperClient zkClient;
+ private final String remoteDataDir;
+
+ public ProducerSnapshotStore(ZooKeeperClient zkClient, String
remoteDataDir) {
+ this.zkClient = zkClient;
+ this.remoteDataDir = remoteDataDir;
+ }
+
+ // ------------------------------------------------------------------------
+ // Core CRUD Operations
+ // ------------------------------------------------------------------------
+
+ /**
+ * Atomically stores a new producer offset snapshot.
+ *
+ * <p>This method first writes offset files to remote storage, then
attempts to atomically
+ * create the ZK metadata. If the ZK node already exists, this method
returns false and cleans
+ * up the remote files.
+ *
+ * <p><b>Note on potential orphan files:</b> There is a small window where
orphan files can be
+ * created if the process crashes after writing remote files but before
creating ZK metadata, or
+ * if ZK metadata creation fails. This is an acceptable trade-off because:
+ *
+ * <ul>
+ * <li>The alternative (ZK first, then files) would leave ZK metadata
pointing to non-existent
+ * files, which is worse
+ * <li>Orphan files are harmless and will be cleaned up by {@link
+ * ProducerSnapshotManager#cleanupOrphanFiles()}
+ * <li>A unified orphan file cleanup mechanism will handle these cases
in the future
+ * </ul>
+ *
+ * @param producerId the producer ID
+ * @param offsets map of TableBucket to offset
+ * @param expirationTime the expiration timestamp in milliseconds
+ * @return true if created, false if already existed
+ * @throws Exception if the operation fails
+ */
+ public boolean tryStoreSnapshot(
+ String producerId, Map<TableBucket, Long> offsets, long
expirationTime)
+ throws Exception {
+
+ Map<Long, Map<TableBucket, Long>> offsetsByTable =
groupOffsetsByTable(offsets);
+ List<ProducerSnapshot.TableOffsetMetadata> tableMetadatas = new
ArrayList<>();
+ List<FsPath> createdFiles = new ArrayList<>();
+
+ try {
+ // Write offset files to remote storage
+ for (Map.Entry<Long, Map<TableBucket, Long>> entry :
offsetsByTable.entrySet()) {
+ FsPath path = writeOffsetsFile(producerId, entry.getKey(),
entry.getValue());
+ createdFiles.add(path);
+ tableMetadatas.add(new
ProducerSnapshot.TableOffsetMetadata(entry.getKey(), path));
+ }
+
+ // Atomically create ZK metadata
+ ProducerSnapshot snapshot = new ProducerSnapshot(expirationTime,
tableMetadatas);
+ boolean created = zkClient.tryRegisterProducerSnapshot(producerId,
snapshot);
+
+ if (!created) {
+ LOG.info(
+ "Snapshot already exists for producer {}, cleaning up
{} files",
+ producerId,
+ createdFiles.size());
+ cleanupFilesSafely(createdFiles);
+ return false;
+ }
+
+ LOG.info(
+ "Stored snapshot for producer {} with {} tables, expires
at {}",
+ producerId,
+ tableMetadatas.size(),
+ expirationTime);
+ return true;
+
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to store snapshot for producer {}, cleaning up {}
files",
+ producerId,
+ createdFiles.size(),
+ e);
+ cleanupFilesSafely(createdFiles);
+ throw e;
+ }
+ }
+
+ /** Gets the snapshot metadata for a producer. */
+ public Optional<ProducerSnapshot> getSnapshotMetadata(String producerId)
throws Exception {
+ return zkClient.getProducerSnapshot(producerId);
+ }
+
+ /**
+ * Gets the snapshot metadata for a producer along with its ZK version.
+ *
+ * <p>The version can be used for conditional deletes to handle concurrent
modifications safely.
+ *
+ * @param producerId the producer ID
+ * @return Optional containing a Tuple2 of (ProducerSnapshot, version) if
exists
+ * @throws Exception if the operation fails
+ */
+ public Optional<Tuple2<ProducerSnapshot, Integer>>
getSnapshotMetadataWithVersion(
+ String producerId) throws Exception {
+ return zkClient.getProducerSnapshotWithVersion(producerId);
+ }
+
+ /** Reads all offsets for a producer from remote storage. */
+ public Map<TableBucket, Long> readOffsets(String producerId) throws
Exception {
+ Optional<ProducerSnapshot> optSnapshot =
zkClient.getProducerSnapshot(producerId);
+ if (!optSnapshot.isPresent()) {
+ return new HashMap<>();
+ }
+
+ Map<TableBucket, Long> allOffsets = new HashMap<>();
+ for (ProducerSnapshot.TableOffsetMetadata metadata :
optSnapshot.get().getTableOffsets()) {
+ allOffsets.putAll(readOffsetsFile(metadata.getOffsetsPath()));
+ }
+ return allOffsets;
+ }
+
+ /** Deletes a producer snapshot (both ZK metadata and remote files). */
+ public void deleteSnapshot(String producerId) throws Exception {
+ Optional<ProducerSnapshot> optSnapshot =
zkClient.getProducerSnapshot(producerId);
+ if (!optSnapshot.isPresent()) {
+ LOG.debug("No snapshot found for producer {}", producerId);
+ return;
+ }
+
+ // Delete remote files
+ for (ProducerSnapshot.TableOffsetMetadata metadata :
optSnapshot.get().getTableOffsets()) {
+ deleteRemoteFile(metadata.getOffsetsPath());
+ }
+
+ // Delete ZK metadata
+ zkClient.deleteProducerSnapshot(producerId);
+ LOG.info("Deleted snapshot for producer {}", producerId);
+ }
+
+ /**
+ * Deletes a producer snapshot only if the ZK version matches.
+ *
+ * <p>This provides optimistic concurrency control - the delete will only
succeed if no other
+ * process has modified the snapshot since it was read.
+ *
+ * @param producerId the producer ID
+ * @param snapshot the snapshot to delete (used to get file paths)
+ * @param expectedVersion the expected ZK version
+ * @return true if deleted successfully, false if version mismatch
+ * @throws Exception if the operation fails for reasons other than version
mismatch
+ */
+ public boolean deleteSnapshotIfVersion(
+ String producerId, ProducerSnapshot snapshot, int expectedVersion)
throws Exception {
+ // First try to delete ZK metadata with version check
+ boolean deleted = zkClient.deleteProducerSnapshotIfVersion(producerId,
expectedVersion);
+ if (!deleted) {
+ LOG.debug(
+ "Failed to delete snapshot for producer {} - version
mismatch, "
+ + "snapshot was modified by another process",
+ producerId);
+ return false;
+ }
+
+ // ZK metadata deleted successfully, now clean up remote files
+ // Even if file deletion fails, the snapshot is already gone from ZK
+ // Orphan files will be cleaned up by the periodic cleanup task
+ for (ProducerSnapshot.TableOffsetMetadata metadata :
snapshot.getTableOffsets()) {
+ deleteRemoteFile(metadata.getOffsetsPath());
+ }
+
+ LOG.info("Deleted snapshot for producer {} with version {}",
producerId, expectedVersion);
+ return true;
+ }
+
+ /** Lists all producer IDs with registered snapshots. */
+ public List<String> listProducerIds() throws Exception {
+ return zkClient.listProducerIds();
+ }
+
+ // ------------------------------------------------------------------------
+ // Remote Storage Operations
+ // ------------------------------------------------------------------------
+
+ /** Gets the base directory for producer snapshots in remote storage. */
+ public FsPath getProducersDirectory() {
+ return new FsPath(remoteDataDir + "/producers");
+ }
+
+ /**
+ * Deletes a remote file, logging but not throwing on failure.
+ *
+ * @param filePath the file path to delete
+ * @return true if deleted successfully or file didn't exist, false if
deletion failed
+ */
+ public boolean deleteRemoteFile(FsPath filePath) {
+ try {
+ FileSystem fs = filePath.getFileSystem();
+ if (fs.exists(filePath)) {
+ fs.delete(filePath, false);
+ LOG.debug("Deleted remote file: {}", filePath);
+ }
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to delete remote file: {}", filePath, e);
+ return false;
+ }
+ }
+
+ /** Recursively deletes a directory, returning the count of deleted files.
*/
+ public int deleteDirectoryRecursively(FileSystem fs, FsPath path) {
+ int count = 0;
+ try {
+ FileStatus[] contents = fs.listStatus(path);
+ if (contents != null) {
+ for (FileStatus status : contents) {
+ if (status.isDir()) {
+ count += deleteDirectoryRecursively(fs,
status.getPath());
+ } else if (fs.delete(status.getPath(), false)) {
+ count++;
+ }
+ }
+ }
+ fs.delete(path, false);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete directory: {}", path, e);
+ }
+ return count;
+ }
+
+ // ------------------------------------------------------------------------
+ // Private Helpers
+ // ------------------------------------------------------------------------
+
+ private Map<Long, Map<TableBucket, Long>>
groupOffsetsByTable(Map<TableBucket, Long> offsets) {
+ Map<Long, Map<TableBucket, Long>> result = new HashMap<>();
+ for (Map.Entry<TableBucket, Long> entry : offsets.entrySet()) {
+ result.computeIfAbsent(entry.getKey().getTableId(), k -> new
HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ }
+ return result;
+ }
+
+ private FsPath writeOffsetsFile(String producerId, long tableId,
Map<TableBucket, Long> offsets)
+ throws IOException {
+
+ String fileName = UUID.randomUUID() + ".offsets";
+ FsPath path =
+ new FsPath(
+ new FsPath(
+ new FsPath(new FsPath(remoteDataDir,
"producers"), producerId),
+ String.valueOf(tableId)),
+ fileName);
+
+ byte[] data = new TableBucketOffsets(tableId, offsets).toJsonBytes();
+
+ return RetryUtils.executeIOWithRetry(
+ () -> {
+ FileSystem fs = path.getFileSystem();
+ FsPath parentDir = path.getParent();
+ if (parentDir != null && !fs.exists(parentDir)) {
+ fs.mkdirs(parentDir);
+ }
+ try (FSDataOutputStream out = fs.create(path,
FileSystem.WriteMode.OVERWRITE)) {
+ out.write(data);
+ }
+ LOG.debug(
+ "Wrote offsets for producer {} table {} at {}",
+ producerId,
+ tableId,
+ path);
+ return path;
+ },
+ String.format("write offsets file for producer %s table %d",
producerId, tableId),
+ MAX_RETRIES,
+ INITIAL_BACKOFF_MS,
+ MAX_BACKOFF_MS);
+ }
+
+ private Map<TableBucket, Long> readOffsetsFile(FsPath path) throws
IOException {
+ return RetryUtils.executeIOWithRetry(
+ () -> {
+ FileSystem fs = path.getFileSystem();
+ if (!fs.exists(path)) {
+ throw new IOException("Offsets file not found: " +
path);
Review Comment:
Throwing `IOException` will lead to retrying. However, I don't think we need
retrying if the file not found. Should use a specific exception type to avoid
retrying here?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java:
##########
@@ -983,4 +1008,127 @@ public FlussPrincipal getFlussPrincipal() {
return flussPrincipal;
}
}
+
+ //
==================================================================================
+ // Producer Offset Management APIs (for Exactly-Once Semantics)
+ //
==================================================================================
+
+ @Override
+ public CompletableFuture<RegisterProducerOffsetsResponse>
registerProducerOffsets(
+ RegisterProducerOffsetsRequest request) {
+ // Authorization: require WRITE permission on cluster
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), OperationType.WRITE,
Resource.cluster());
Review Comment:
We should check authroize for `WRITE` permission on the `tableId` in
request. Only cluster manager has the `WRITE` permission on `cluster`.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManager.java:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileStatus;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manager for producer offset snapshots lifecycle.
+ *
+ * <p>This manager handles:
+ *
+ * <ul>
+ * <li>Registering new producer snapshots with atomic "check and register"
semantics
+ * <li>Retrieving producer snapshot offsets
+ * <li>Deleting producer snapshots
+ * <li>Periodic cleanup of expired snapshots
+ * <li>Cleanup of orphan files in remote storage
+ * </ul>
+ *
+ * <p>The manager uses a background scheduler to periodically clean up expired
snapshots and orphan
+ * files. The cleanup interval and snapshot TTL are configurable.
+ *
+ * @see ProducerSnapshotStore for low-level storage operations
+ */
+public class ProducerSnapshotManager implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ProducerSnapshotManager.class);
+
+ /** Maximum number of attempts for snapshot registration to avoid infinite
loops. */
+ private static final int MAX_REGISTER_ATTEMPTS = 3;
+
+ private final ProducerSnapshotStore snapshotStore;
+ private final long defaultTtlMs;
+ private final long cleanupIntervalMs;
+ private final ScheduledExecutorService cleanupScheduler;
+
+ public ProducerSnapshotManager(Configuration conf, ZooKeeperClient
zkClient) {
+ this(
+ new ProducerSnapshotStore(zkClient,
conf.getString(ConfigOptions.REMOTE_DATA_DIR)),
+
conf.get(ConfigOptions.COORDINATOR_PRODUCER_SNAPSHOT_TTL).toMillis(),
+
conf.get(ConfigOptions.COORDINATOR_PRODUCER_SNAPSHOT_CLEANUP_INTERVAL).toMillis());
+ }
+
+ @VisibleForTesting
+ ProducerSnapshotManager(
+ ProducerSnapshotStore snapshotStore, long defaultTtlMs, long
cleanupIntervalMs) {
+ this.snapshotStore = snapshotStore;
+ this.defaultTtlMs = defaultTtlMs;
+ this.cleanupIntervalMs = cleanupIntervalMs;
+ this.cleanupScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorThreadFactory("producer-snapshot-cleanup"));
+ }
+
+ /** Starts the background cleanup task. */
+ public void start() {
+ cleanupScheduler.scheduleAtFixedRate(
+ this::runCleanup, cleanupIntervalMs, cleanupIntervalMs,
TimeUnit.MILLISECONDS);
+ LOG.info(
+ "Started producer snapshot manager with TTL={} ms, cleanup
interval={} ms",
+ defaultTtlMs,
+ cleanupIntervalMs);
+ }
+
+ // ------------------------------------------------------------------------
+ // Public API
+ // ------------------------------------------------------------------------
+
+ /**
+ * Registers a new producer offset snapshot with atomic "check and
register" semantics.
+ *
+ * <p>This method uses ZooKeeper's version mechanism to handle concurrent
requests safely:
+ *
+ * <ul>
+ * <li>Atomicity: Uses ZK's atomic create for new snapshots
+ * <li>Idempotency: Concurrent requests with same producerId will have
exactly one succeed
+ * with CREATED, others return ALREADY_EXISTS
+ * <li>Version-based cleanup: Uses ZK version to safely delete expired
snapshots without race
+ * conditions
+ * </ul>
+ *
+ * @param producerId the producer ID (typically Flink job ID)
+ * @param offsets map of TableBucket to offset for all tables
+ * @param ttlMs TTL in milliseconds for the snapshot, or null to use
default
+ * @return true if a new snapshot was created (CREATED), false if snapshot
already existed
+ * (ALREADY_EXISTS)
+ * @throws Exception if the operation fails
+ */
+ public boolean registerSnapshot(String producerId, Map<TableBucket, Long>
offsets, Long ttlMs)
+ throws Exception {
+ long effectiveTtlMs = ttlMs != null ? ttlMs : defaultTtlMs;
+
+ // Use loop instead of recursion to avoid stack overflow risk
+ for (int attempt = 0; attempt < MAX_REGISTER_ATTEMPTS; attempt++) {
+ long currentTimeMs = System.currentTimeMillis();
+ long expirationTime = currentTimeMs + effectiveTtlMs;
+
+ // Step 1: Try to atomically create the snapshot (common case)
+ if (snapshotStore.tryStoreSnapshot(producerId, offsets,
expirationTime)) {
+ return true;
+ }
+
+ // Step 2: Snapshot exists - check if it's valid or expired
+ RegisterAttemptResult result =
+ handleExistingSnapshot(producerId, offsets,
expirationTime, currentTimeMs);
+
+ switch (result) {
+ case ALREADY_EXISTS:
+ return false;
+ case CREATED:
+ return true;
+ case RETRY:
+ // Continue to next iteration
+ LOG.debug(
+ "Retrying snapshot registration for producer {}
(attempt {}/{})",
+ producerId,
+ attempt + 1,
+ MAX_REGISTER_ATTEMPTS);
+ break;
+ }
+ }
+
+ // Exhausted all attempts
+ LOG.warn(
+ "Failed to register snapshot for producer {} after {}
attempts, "
+ + "concurrent modifications may have interfered",
+ producerId,
+ MAX_REGISTER_ATTEMPTS);
+ return false;
+ }
+
+ /**
+ * Handles the case where a snapshot already exists for the producer.
+ *
+ * @return the result indicating next action: ALREADY_EXISTS, CREATED, or
RETRY
+ */
+ private RegisterAttemptResult handleExistingSnapshot(
+ String producerId,
+ Map<TableBucket, Long> offsets,
+ long expirationTime,
+ long currentTimeMs)
+ throws Exception {
+
+ Optional<Tuple2<ProducerSnapshot, Integer>> existingWithVersion =
+ snapshotStore.getSnapshotMetadataWithVersion(producerId);
+
+ // Case 1: Snapshot was deleted between our create attempt and this
check
+ if (!existingWithVersion.isPresent()) {
+ return snapshotStore.tryStoreSnapshot(producerId, offsets,
expirationTime)
+ ? RegisterAttemptResult.CREATED
+ : RegisterAttemptResult.RETRY;
+ }
+
+ ProducerSnapshot existingSnapshot = existingWithVersion.get().f0;
+ int version = existingWithVersion.get().f1;
+
+ // Case 2: Valid (non-expired) snapshot exists
+ if (!existingSnapshot.isExpired(currentTimeMs)) {
+ LOG.info(
+ "Producer snapshot already exists for producer {} (expires
at {}, version {})",
+ producerId,
+ existingSnapshot.getExpirationTime(),
+ version);
+ return RegisterAttemptResult.ALREADY_EXISTS;
+ }
+
+ // Case 3: Expired snapshot - try to clean up and create new one
+ return tryReplaceExpiredSnapshot(
+ producerId, offsets, expirationTime, existingSnapshot,
version, currentTimeMs);
+ }
+
+ /**
+ * Attempts to replace an expired snapshot with a new one.
+ *
+ * @return the result indicating next action
+ */
+ private RegisterAttemptResult tryReplaceExpiredSnapshot(
+ String producerId,
+ Map<TableBucket, Long> offsets,
+ long expirationTime,
+ ProducerSnapshot expiredSnapshot,
+ int version,
+ long currentTimeMs)
+ throws Exception {
+
+ LOG.info(
+ "Found expired snapshot for producer {} (version {}),
attempting cleanup",
+ producerId,
+ version);
+
+ // Try version-based conditional delete
+ boolean deleted =
+ snapshotStore.deleteSnapshotIfVersion(producerId,
expiredSnapshot, version);
+
+ if (!deleted) {
+ // Version mismatch - another process modified the snapshot
+ LOG.debug(
+ "Version mismatch during delete for producer {}, checking
current state",
+ producerId);
+ return checkCurrentSnapshotState(producerId, currentTimeMs);
+ }
+
+ // Successfully deleted, try to create new snapshot
+ if (snapshotStore.tryStoreSnapshot(producerId, offsets,
expirationTime)) {
+ return RegisterAttemptResult.CREATED;
+ }
+
+ // Another concurrent request created a snapshot after our delete
+ LOG.debug(
+ "Concurrent creation detected for producer {} after delete,
checking state",
+ producerId);
+ return checkCurrentSnapshotState(producerId, currentTimeMs);
+ }
+
+ /**
+ * Checks the current snapshot state and returns the appropriate result.
+ *
+ * <p>This method is used after detecting concurrent modifications to
determine whether a valid
+ * snapshot now exists or if we should retry.
+ *
+ * @param producerId the producer ID
+ * @param currentTimeMs the current time for expiration check
+ * @return ALREADY_EXISTS if a valid snapshot exists, RETRY otherwise
+ */
+ private RegisterAttemptResult checkCurrentSnapshotState(String producerId,
long currentTimeMs)
+ throws Exception {
+ Optional<ProducerSnapshot> snapshot =
snapshotStore.getSnapshotMetadata(producerId);
+
+ if (isValidSnapshot(snapshot, currentTimeMs)) {
+ LOG.info(
+ "Valid snapshot exists for producer {} after concurrent
modification",
+ producerId);
+ return RegisterAttemptResult.ALREADY_EXISTS;
+ }
+
+ LOG.debug("No valid snapshot for producer {} after concurrent
modification", producerId);
+ return RegisterAttemptResult.RETRY;
+ }
+
+ /**
+ * Checks if a snapshot is present and not expired.
+ *
+ * @param snapshot the optional snapshot
+ * @param currentTimeMs the current time for expiration check
+ * @return true if snapshot exists and is not expired
+ */
+ private boolean isValidSnapshot(Optional<ProducerSnapshot> snapshot, long
currentTimeMs) {
+ return snapshot.isPresent() &&
!snapshot.get().isExpired(currentTimeMs);
+ }
+
+ /** Result of a single registration attempt. */
+ private enum RegisterAttemptResult {
+ /** Snapshot was successfully created. */
+ CREATED,
+ /** A valid snapshot already exists. */
+ ALREADY_EXISTS,
+ /** Should retry the registration. */
+ RETRY
+ }
+
+ /**
+ * Gets the snapshot metadata for a producer.
+ *
+ * @param producerId the producer ID
+ * @return Optional containing the snapshot if exists
+ * @throws Exception if the operation fails
+ */
+ public Optional<ProducerSnapshot> getSnapshotMetadata(String producerId)
throws Exception {
+ return snapshotStore.getSnapshotMetadata(producerId);
+ }
+
+ /**
+ * Reads all offsets for a producer.
+ *
+ * @param producerId the producer ID
+ * @return map of TableBucket to offset, or empty map if no snapshot exists
+ * @throws Exception if the operation fails
+ */
+ public Map<TableBucket, Long> getOffsets(String producerId) throws
Exception {
+ return snapshotStore.readOffsets(producerId);
+ }
+
+ /**
+ * Deletes a producer snapshot.
+ *
+ * @param producerId the producer ID
+ * @throws Exception if the operation fails
+ */
+ public void deleteSnapshot(String producerId) throws Exception {
+ snapshotStore.deleteSnapshot(producerId);
+ }
+
+ /**
+ * Gets the default TTL in milliseconds.
+ *
+ * @return the default TTL
+ */
+ public long getDefaultTtlMs() {
+ return defaultTtlMs;
+ }
+
+ // ------------------------------------------------------------------------
+ // Cleanup Operations
+ // ------------------------------------------------------------------------
+
+ /** Runs the cleanup task (expired snapshots and orphan files). */
+ @VisibleForTesting
+ void runCleanup() {
+ try {
+ int expiredCount = cleanupExpiredSnapshots();
+ if (expiredCount > 0) {
+ LOG.info("Producer snapshot cleanup: removed {} expired
snapshots", expiredCount);
+ }
+
+ int orphanCount = cleanupOrphanFiles();
+ if (orphanCount > 0) {
+ LOG.info("Producer snapshot cleanup: removed {} orphan files",
orphanCount);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to cleanup producer snapshots", e);
+ }
+ }
+
+ /**
+ * Cleans up expired producer snapshots.
+ *
+ * @return number of snapshots cleaned up
+ * @throws Exception if the operation fails
+ */
+ @VisibleForTesting
+ int cleanupExpiredSnapshots() throws Exception {
+ List<String> producerIds = snapshotStore.listProducerIds();
+ int cleanedCount = 0;
+ long currentTime = System.currentTimeMillis();
+
+ for (String producerId : producerIds) {
+ try {
+ Optional<ProducerSnapshot> optSnapshot =
+ snapshotStore.getSnapshotMetadata(producerId);
+ if (isExpiredSnapshot(optSnapshot, currentTime)) {
+ snapshotStore.deleteSnapshot(producerId);
Review Comment:
Should use `getSnapshotMetadataWithVersion` and `deleteSnapshotIfVersion`
here? Otherwise, this may delete snapshot that updated just now.
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManagerTest.java:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link ProducerSnapshotManager}. */
+class ProducerSnapshotManagerTest {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(1)
+ .setClusterConf(new Configuration())
+ .build();
+
+ @TempDir Path tempDir;
+
+ private ProducerSnapshotManager manager;
+ private ProducerSnapshotStore store;
+ private ZooKeeperClient zkClient;
+
+ private static final long DEFAULT_TTL_MS = 3600000; // 1 hour
+ private static final long CLEANUP_INTERVAL_MS = 60000; // 1 minute
+
+ @BeforeEach
+ void setUp() {
+ zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+ store = new ProducerSnapshotStore(zkClient, tempDir.toString());
+ manager = new ProducerSnapshotManager(store, DEFAULT_TTL_MS,
CLEANUP_INTERVAL_MS);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (manager != null) {
+ manager.close();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Basic Registration Tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ void testRegisterSnapshotSuccess() throws Exception {
+ String producerId = "test-manager-register";
+ Map<TableBucket, Long> offsets = createTestOffsets();
+
+ boolean created = manager.registerSnapshot(producerId, offsets, null);
+
+ assertThat(created).isTrue();
+ Optional<ProducerSnapshot> snapshot =
manager.getSnapshotMetadata(producerId);
+ assertThat(snapshot).isPresent();
+
assertThat(snapshot.get().getExpirationTime()).isGreaterThan(System.currentTimeMillis());
Review Comment:
`isGreaterThanOrEqualTo`?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStore.java:
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileStatus;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+import org.apache.fluss.utils.IOUtils;
+import org.apache.fluss.utils.RetryUtils;
+import org.apache.fluss.utils.json.TableBucketOffsets;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+/**
+ * Low-level storage operations for producer offset snapshots.
+ *
+ * <p>This class handles:
+ *
+ * <ul>
+ * <li>Writing offset data to remote storage (OSS/S3/HDFS)
+ * <li>Reading offset data from remote storage
+ * <li>Registering snapshot metadata in ZooKeeper
+ * <li>Deleting snapshots from both ZK and remote storage
+ * </ul>
+ *
+ * <p>This class is stateless and thread-safe. Lifecycle management should be
handled by {@link
+ * ProducerSnapshotManager}.
+ */
+public class ProducerSnapshotStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ProducerSnapshotStore.class);
+
+ private static final int MAX_RETRIES = 3;
+ private static final long INITIAL_BACKOFF_MS = 100;
+ private static final long MAX_BACKOFF_MS = 1000;
+
+ private final ZooKeeperClient zkClient;
+ private final String remoteDataDir;
+
+ public ProducerSnapshotStore(ZooKeeperClient zkClient, String
remoteDataDir) {
+ this.zkClient = zkClient;
+ this.remoteDataDir = remoteDataDir;
+ }
+
+ // ------------------------------------------------------------------------
+ // Core CRUD Operations
+ // ------------------------------------------------------------------------
+
+ /**
+ * Atomically stores a new producer offset snapshot.
+ *
+ * <p>This method first writes offset files to remote storage, then
attempts to atomically
+ * create the ZK metadata. If the ZK node already exists, this method
returns false and cleans
+ * up the remote files.
+ *
+ * <p><b>Note on potential orphan files:</b> There is a small window where
orphan files can be
+ * created if the process crashes after writing remote files but before
creating ZK metadata, or
+ * if ZK metadata creation fails. This is an acceptable trade-off because:
+ *
+ * <ul>
+ * <li>The alternative (ZK first, then files) would leave ZK metadata
pointing to non-existent
+ * files, which is worse
+ * <li>Orphan files are harmless and will be cleaned up by {@link
+ * ProducerSnapshotManager#cleanupOrphanFiles()}
+ * <li>A unified orphan file cleanup mechanism will handle these cases
in the future
+ * </ul>
+ *
+ * @param producerId the producer ID
+ * @param offsets map of TableBucket to offset
+ * @param expirationTime the expiration timestamp in milliseconds
+ * @return true if created, false if already existed
+ * @throws Exception if the operation fails
+ */
+ public boolean tryStoreSnapshot(
+ String producerId, Map<TableBucket, Long> offsets, long
expirationTime)
+ throws Exception {
+
+ Map<Long, Map<TableBucket, Long>> offsetsByTable =
groupOffsetsByTable(offsets);
+ List<ProducerSnapshot.TableOffsetMetadata> tableMetadatas = new
ArrayList<>();
+ List<FsPath> createdFiles = new ArrayList<>();
+
+ try {
+ // Write offset files to remote storage
+ for (Map.Entry<Long, Map<TableBucket, Long>> entry :
offsetsByTable.entrySet()) {
+ FsPath path = writeOffsetsFile(producerId, entry.getKey(),
entry.getValue());
+ createdFiles.add(path);
+ tableMetadatas.add(new
ProducerSnapshot.TableOffsetMetadata(entry.getKey(), path));
+ }
+
+ // Atomically create ZK metadata
+ ProducerSnapshot snapshot = new ProducerSnapshot(expirationTime,
tableMetadatas);
+ boolean created = zkClient.tryRegisterProducerSnapshot(producerId,
snapshot);
+
+ if (!created) {
+ LOG.info(
+ "Snapshot already exists for producer {}, cleaning up
{} files",
+ producerId,
+ createdFiles.size());
+ cleanupFilesSafely(createdFiles);
+ return false;
+ }
+
+ LOG.info(
+ "Stored snapshot for producer {} with {} tables, expires
at {}",
+ producerId,
+ tableMetadatas.size(),
+ expirationTime);
+ return true;
+
+ } catch (Exception e) {
+ LOG.warn(
+ "Failed to store snapshot for producer {}, cleaning up {}
files",
+ producerId,
+ createdFiles.size(),
+ e);
+ cleanupFilesSafely(createdFiles);
+ throw e;
+ }
+ }
+
+ /** Gets the snapshot metadata for a producer. */
+ public Optional<ProducerSnapshot> getSnapshotMetadata(String producerId)
throws Exception {
+ return zkClient.getProducerSnapshot(producerId);
+ }
+
+ /**
+ * Gets the snapshot metadata for a producer along with its ZK version.
+ *
+ * <p>The version can be used for conditional deletes to handle concurrent
modifications safely.
+ *
+ * @param producerId the producer ID
+ * @return Optional containing a Tuple2 of (ProducerSnapshot, version) if
exists
+ * @throws Exception if the operation fails
+ */
+ public Optional<Tuple2<ProducerSnapshot, Integer>>
getSnapshotMetadataWithVersion(
+ String producerId) throws Exception {
+ return zkClient.getProducerSnapshotWithVersion(producerId);
+ }
+
+ /** Reads all offsets for a producer from remote storage. */
+ public Map<TableBucket, Long> readOffsets(String producerId) throws
Exception {
+ Optional<ProducerSnapshot> optSnapshot =
zkClient.getProducerSnapshot(producerId);
+ if (!optSnapshot.isPresent()) {
+ return new HashMap<>();
+ }
+
+ Map<TableBucket, Long> allOffsets = new HashMap<>();
+ for (ProducerSnapshot.TableOffsetMetadata metadata :
optSnapshot.get().getTableOffsets()) {
+ allOffsets.putAll(readOffsetsFile(metadata.getOffsetsPath()));
+ }
+ return allOffsets;
+ }
+
+ /** Deletes a producer snapshot (both ZK metadata and remote files). */
+ public void deleteSnapshot(String producerId) throws Exception {
+ Optional<ProducerSnapshot> optSnapshot =
zkClient.getProducerSnapshot(producerId);
+ if (!optSnapshot.isPresent()) {
+ LOG.debug("No snapshot found for producer {}", producerId);
+ return;
+ }
+
+ // Delete remote files
+ for (ProducerSnapshot.TableOffsetMetadata metadata :
optSnapshot.get().getTableOffsets()) {
+ deleteRemoteFile(metadata.getOffsetsPath());
+ }
+
+ // Delete ZK metadata
+ zkClient.deleteProducerSnapshot(producerId);
+ LOG.info("Deleted snapshot for producer {}", producerId);
+ }
+
+ /**
+ * Deletes a producer snapshot only if the ZK version matches.
+ *
+ * <p>This provides optimistic concurrency control - the delete will only
succeed if no other
+ * process has modified the snapshot since it was read.
+ *
+ * @param producerId the producer ID
+ * @param snapshot the snapshot to delete (used to get file paths)
+ * @param expectedVersion the expected ZK version
+ * @return true if deleted successfully, false if version mismatch
+ * @throws Exception if the operation fails for reasons other than version
mismatch
+ */
+ public boolean deleteSnapshotIfVersion(
+ String producerId, ProducerSnapshot snapshot, int expectedVersion)
throws Exception {
+ // First try to delete ZK metadata with version check
+ boolean deleted = zkClient.deleteProducerSnapshotIfVersion(producerId,
expectedVersion);
+ if (!deleted) {
+ LOG.debug(
+ "Failed to delete snapshot for producer {} - version
mismatch, "
+ + "snapshot was modified by another process",
+ producerId);
+ return false;
+ }
+
+ // ZK metadata deleted successfully, now clean up remote files
+ // Even if file deletion fails, the snapshot is already gone from ZK
+ // Orphan files will be cleaned up by the periodic cleanup task
+ for (ProducerSnapshot.TableOffsetMetadata metadata :
snapshot.getTableOffsets()) {
+ deleteRemoteFile(metadata.getOffsetsPath());
+ }
+
+ LOG.info("Deleted snapshot for producer {} with version {}",
producerId, expectedVersion);
+ return true;
+ }
+
+ /** Lists all producer IDs with registered snapshots. */
+ public List<String> listProducerIds() throws Exception {
+ return zkClient.listProducerIds();
+ }
+
+ // ------------------------------------------------------------------------
+ // Remote Storage Operations
+ // ------------------------------------------------------------------------
+
+ /** Gets the base directory for producer snapshots in remote storage. */
+ public FsPath getProducersDirectory() {
+ return new FsPath(remoteDataDir + "/producers");
+ }
+
+ /**
+ * Deletes a remote file, logging but not throwing on failure.
+ *
+ * @param filePath the file path to delete
+ * @return true if deleted successfully or file didn't exist, false if
deletion failed
+ */
+ public boolean deleteRemoteFile(FsPath filePath) {
+ try {
+ FileSystem fs = filePath.getFileSystem();
+ if (fs.exists(filePath)) {
+ fs.delete(filePath, false);
+ LOG.debug("Deleted remote file: {}", filePath);
+ }
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Failed to delete remote file: {}", filePath, e);
+ return false;
+ }
+ }
+
+ /** Recursively deletes a directory, returning the count of deleted files.
*/
+ public int deleteDirectoryRecursively(FileSystem fs, FsPath path) {
+ int count = 0;
+ try {
+ FileStatus[] contents = fs.listStatus(path);
+ if (contents != null) {
+ for (FileStatus status : contents) {
+ if (status.isDir()) {
+ count += deleteDirectoryRecursively(fs,
status.getPath());
+ } else if (fs.delete(status.getPath(), false)) {
+ count++;
+ }
+ }
+ }
+ fs.delete(path, false);
+ } catch (IOException e) {
+ LOG.warn("Failed to delete directory: {}", path, e);
+ }
+ return count;
+ }
+
+ // ------------------------------------------------------------------------
+ // Private Helpers
+ // ------------------------------------------------------------------------
+
+ private Map<Long, Map<TableBucket, Long>>
groupOffsetsByTable(Map<TableBucket, Long> offsets) {
+ Map<Long, Map<TableBucket, Long>> result = new HashMap<>();
+ for (Map.Entry<TableBucket, Long> entry : offsets.entrySet()) {
+ result.computeIfAbsent(entry.getKey().getTableId(), k -> new
HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ }
+ return result;
+ }
+
+ private FsPath writeOffsetsFile(String producerId, long tableId,
Map<TableBucket, Long> offsets)
+ throws IOException {
+
+ String fileName = UUID.randomUUID() + ".offsets";
+ FsPath path =
+ new FsPath(
+ new FsPath(
+ new FsPath(new FsPath(remoteDataDir,
"producers"), producerId),
+ String.valueOf(tableId)),
+ fileName);
Review Comment:
We maintain both local and remote paths in a centralized location,
`FlussPaths`. This helps ensure consistency, avoids errors from manual path
concatenation, and simplifies debugging.
Could you add this path to `FlussPaths` by introducing a method like
`remoteProducerOffsetsPath(remoteDataDir, producerId, tableId, UUID)`, along
with Javadoc that clearly explains the structure and meaning of the resulting
path string?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotManager.java:
##########
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileStatus;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manager for producer offset snapshots lifecycle.
+ *
+ * <p>This manager handles:
+ *
+ * <ul>
+ * <li>Registering new producer snapshots with atomic "check and register"
semantics
+ * <li>Retrieving producer snapshot offsets
+ * <li>Deleting producer snapshots
+ * <li>Periodic cleanup of expired snapshots
+ * <li>Cleanup of orphan files in remote storage
+ * </ul>
+ *
+ * <p>The manager uses a background scheduler to periodically clean up expired
snapshots and orphan
+ * files. The cleanup interval and snapshot TTL are configurable.
+ *
+ * @see ProducerSnapshotStore for low-level storage operations
+ */
+public class ProducerSnapshotManager implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ProducerSnapshotManager.class);
+
+ /** Maximum number of attempts for snapshot registration to avoid infinite
loops. */
+ private static final int MAX_REGISTER_ATTEMPTS = 3;
+
+ private final ProducerSnapshotStore snapshotStore;
+ private final long defaultTtlMs;
+ private final long cleanupIntervalMs;
+ private final ScheduledExecutorService cleanupScheduler;
+
+ public ProducerSnapshotManager(Configuration conf, ZooKeeperClient
zkClient) {
+ this(
+ new ProducerSnapshotStore(zkClient,
conf.getString(ConfigOptions.REMOTE_DATA_DIR)),
+
conf.get(ConfigOptions.COORDINATOR_PRODUCER_SNAPSHOT_TTL).toMillis(),
+
conf.get(ConfigOptions.COORDINATOR_PRODUCER_SNAPSHOT_CLEANUP_INTERVAL).toMillis());
+ }
+
+ @VisibleForTesting
+ ProducerSnapshotManager(
+ ProducerSnapshotStore snapshotStore, long defaultTtlMs, long
cleanupIntervalMs) {
+ this.snapshotStore = snapshotStore;
+ this.defaultTtlMs = defaultTtlMs;
+ this.cleanupIntervalMs = cleanupIntervalMs;
+ this.cleanupScheduler =
+ Executors.newSingleThreadScheduledExecutor(
+ new
ExecutorThreadFactory("producer-snapshot-cleanup"));
+ }
+
+ /** Starts the background cleanup task. */
+ public void start() {
+ cleanupScheduler.scheduleAtFixedRate(
+ this::runCleanup, cleanupIntervalMs, cleanupIntervalMs,
TimeUnit.MILLISECONDS);
+ LOG.info(
+ "Started producer snapshot manager with TTL={} ms, cleanup
interval={} ms",
+ defaultTtlMs,
+ cleanupIntervalMs);
+ }
+
+ // ------------------------------------------------------------------------
+ // Public API
+ // ------------------------------------------------------------------------
+
+ /**
+ * Registers a new producer offset snapshot with atomic "check and
register" semantics.
+ *
+ * <p>This method uses ZooKeeper's version mechanism to handle concurrent
requests safely:
+ *
+ * <ul>
+ * <li>Atomicity: Uses ZK's atomic create for new snapshots
+ * <li>Idempotency: Concurrent requests with same producerId will have
exactly one succeed
+ * with CREATED, others return ALREADY_EXISTS
+ * <li>Version-based cleanup: Uses ZK version to safely delete expired
snapshots without race
+ * conditions
+ * </ul>
+ *
+ * @param producerId the producer ID (typically Flink job ID)
+ * @param offsets map of TableBucket to offset for all tables
+ * @param ttlMs TTL in milliseconds for the snapshot, or null to use
default
+ * @return true if a new snapshot was created (CREATED), false if snapshot
already existed
+ * (ALREADY_EXISTS)
+ * @throws Exception if the operation fails
+ */
+ public boolean registerSnapshot(String producerId, Map<TableBucket, Long>
offsets, Long ttlMs)
Review Comment:
We should validate that `producerId` is a valid string, as it will be used
as both a file name and a ZooKeeper node name. An invalid name could lead to
serious issues such as file system corruption or ZK operation failures.
We can reuse the existing utility
`org.apache.fluss.metadata.TablePath#detectInvalidName` for this validation.
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStoreTest.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link ProducerSnapshotStore}. */
+class ProducerSnapshotStoreTest {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(1)
+ .setClusterConf(new Configuration())
+ .build();
+
+ @TempDir Path tempDir;
+
+ private ProducerSnapshotStore store;
+ private ZooKeeperClient zkClient;
+
+ @BeforeEach
+ void setUp() {
+ zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+ store = new ProducerSnapshotStore(zkClient, tempDir.toString());
+ }
+
+ @Test
+ void testTryStoreSnapshotSuccess() throws Exception {
+ String producerId = "test-producer-store";
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ offsets.put(new TableBucket(1L, 0), 100L);
+ offsets.put(new TableBucket(1L, 1), 200L);
+ offsets.put(new TableBucket(2L, 0), 300L);
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+
+ // First store should succeed
+ boolean created = store.tryStoreSnapshot(producerId, offsets,
expirationTime);
+ assertThat(created).isTrue();
+
+ // Verify metadata was stored
+ Optional<ProducerSnapshot> snapshot =
store.getSnapshotMetadata(producerId);
+ assertThat(snapshot).isPresent();
+
assertThat(snapshot.get().getExpirationTime()).isEqualTo(expirationTime);
+ assertThat(snapshot.get().getTableOffsets()).hasSize(2); // 2 tables
+ }
+
+ @Test
+ void testTryStoreSnapshotAlreadyExists() throws Exception {
+ String producerId = "test-producer-exists";
+ Map<TableBucket, Long> offsets1 = new HashMap<>();
+ offsets1.put(new TableBucket(1L, 0), 100L);
+
+ Map<TableBucket, Long> offsets2 = new HashMap<>();
+ offsets2.put(new TableBucket(1L, 0), 999L);
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+
+ // First store should succeed
+ assertThat(store.tryStoreSnapshot(producerId, offsets1,
expirationTime)).isTrue();
+
+ // Second store should return false (already exists)
+ assertThat(store.tryStoreSnapshot(producerId, offsets2,
expirationTime)).isFalse();
+
+ // Original offsets should be preserved
+ Map<TableBucket, Long> retrieved = store.readOffsets(producerId);
+ assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L);
+ }
+
+ @Test
+ void testReadOffsets() throws Exception {
+ String producerId = "test-producer-read";
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ offsets.put(new TableBucket(1L, 0), 100L);
+ offsets.put(new TableBucket(1L, 1), 200L);
+ offsets.put(new TableBucket(2L, 0), 300L);
+ offsets.put(new TableBucket(3L, 100L, 0), 400L); // partitioned table
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+ store.tryStoreSnapshot(producerId, offsets, expirationTime);
+
+ // Read offsets back
+ Map<TableBucket, Long> retrieved = store.readOffsets(producerId);
+ assertThat(retrieved).hasSize(4);
+ assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L);
+ assertThat(retrieved.get(new TableBucket(1L, 1))).isEqualTo(200L);
+ assertThat(retrieved.get(new TableBucket(2L, 0))).isEqualTo(300L);
+ assertThat(retrieved.get(new TableBucket(3L, 100L,
0))).isEqualTo(400L);
+ }
+
+ @Test
+ void testReadOffsetsNonExistent() throws Exception {
+ Map<TableBucket, Long> offsets =
store.readOffsets("non-existent-producer");
+ assertThat(offsets).isEmpty();
+ }
+
+ @Test
+ void testDeleteSnapshot() throws Exception {
+ String producerId = "test-producer-delete";
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ offsets.put(new TableBucket(1L, 0), 100L);
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+ store.tryStoreSnapshot(producerId, offsets, expirationTime);
+
+ // Verify it exists
+ assertThat(store.getSnapshotMetadata(producerId)).isPresent();
+
+ // Delete
+ store.deleteSnapshot(producerId);
+
+ // Verify it's gone
+ assertThat(store.getSnapshotMetadata(producerId)).isEmpty();
+ assertThat(store.readOffsets(producerId)).isEmpty();
+ }
+
+ @Test
+ void testDeleteNonExistentSnapshot() throws Exception {
+ // Should not throw
+ store.deleteSnapshot("non-existent-producer");
+ }
+
+ @Test
+ void testListProducerIds() throws Exception {
+ // Use unique prefix to avoid conflicts with other tests
+ String prefix = "list-test-" + System.currentTimeMillis() + "-";
+
+ // Create multiple snapshots
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ offsets.put(new TableBucket(1L, 0), 100L);
+ long expirationTime = System.currentTimeMillis() + 3600000;
+
+ store.tryStoreSnapshot(prefix + "producer-1", offsets, expirationTime);
+ store.tryStoreSnapshot(prefix + "producer-2", offsets, expirationTime);
+ store.tryStoreSnapshot(prefix + "producer-3", offsets, expirationTime);
+
+ List<String> producerIds = store.listProducerIds();
+ assertThat(producerIds)
+ .contains(prefix + "producer-1", prefix + "producer-2", prefix
+ "producer-3");
+ }
+
+ @Test
+ void testGetProducersDirectory() {
+ FsPath producersDir = store.getProducersDirectory();
+ assertThat(producersDir.toString()).isEqualTo(tempDir.toString() +
"/producers");
+ }
+
+ @Test
+ void testSnapshotWithPartitionedTable() throws Exception {
+ String producerId = "test-producer-partitioned";
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ // Non-partitioned table
+ offsets.put(new TableBucket(1L, 0), 100L);
+ // Partitioned table with different partitions
+ offsets.put(new TableBucket(2L, 10L, 0), 200L);
+ offsets.put(new TableBucket(2L, 10L, 1), 201L);
+ offsets.put(new TableBucket(2L, 20L, 0), 300L);
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+ store.tryStoreSnapshot(producerId, offsets, expirationTime);
+
+ Map<TableBucket, Long> retrieved = store.readOffsets(producerId);
+ assertThat(retrieved).hasSize(4);
+ assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L);
+ assertThat(retrieved.get(new TableBucket(2L, 10L, 0))).isEqualTo(200L);
+ assertThat(retrieved.get(new TableBucket(2L, 10L, 1))).isEqualTo(201L);
+ assertThat(retrieved.get(new TableBucket(2L, 20L, 0))).isEqualTo(300L);
Review Comment:
ditto
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/producer/ProducerSnapshotStoreTest.java:
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.producer;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.producer.ProducerSnapshot;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link ProducerSnapshotStore}. */
+class ProducerSnapshotStoreTest {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(1)
+ .setClusterConf(new Configuration())
+ .build();
+
+ @TempDir Path tempDir;
+
+ private ProducerSnapshotStore store;
+ private ZooKeeperClient zkClient;
+
+ @BeforeEach
+ void setUp() {
+ zkClient = FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+ store = new ProducerSnapshotStore(zkClient, tempDir.toString());
+ }
+
+ @Test
+ void testTryStoreSnapshotSuccess() throws Exception {
+ String producerId = "test-producer-store";
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ offsets.put(new TableBucket(1L, 0), 100L);
+ offsets.put(new TableBucket(1L, 1), 200L);
+ offsets.put(new TableBucket(2L, 0), 300L);
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+
+ // First store should succeed
+ boolean created = store.tryStoreSnapshot(producerId, offsets,
expirationTime);
+ assertThat(created).isTrue();
+
+ // Verify metadata was stored
+ Optional<ProducerSnapshot> snapshot =
store.getSnapshotMetadata(producerId);
+ assertThat(snapshot).isPresent();
+
assertThat(snapshot.get().getExpirationTime()).isEqualTo(expirationTime);
+ assertThat(snapshot.get().getTableOffsets()).hasSize(2); // 2 tables
+ }
+
+ @Test
+ void testTryStoreSnapshotAlreadyExists() throws Exception {
+ String producerId = "test-producer-exists";
+ Map<TableBucket, Long> offsets1 = new HashMap<>();
+ offsets1.put(new TableBucket(1L, 0), 100L);
+
+ Map<TableBucket, Long> offsets2 = new HashMap<>();
+ offsets2.put(new TableBucket(1L, 0), 999L);
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+
+ // First store should succeed
+ assertThat(store.tryStoreSnapshot(producerId, offsets1,
expirationTime)).isTrue();
+
+ // Second store should return false (already exists)
+ assertThat(store.tryStoreSnapshot(producerId, offsets2,
expirationTime)).isFalse();
+
+ // Original offsets should be preserved
+ Map<TableBucket, Long> retrieved = store.readOffsets(producerId);
+ assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L);
+ }
+
+ @Test
+ void testReadOffsets() throws Exception {
+ String producerId = "test-producer-read";
+ Map<TableBucket, Long> offsets = new HashMap<>();
+ offsets.put(new TableBucket(1L, 0), 100L);
+ offsets.put(new TableBucket(1L, 1), 200L);
+ offsets.put(new TableBucket(2L, 0), 300L);
+ offsets.put(new TableBucket(3L, 100L, 0), 400L); // partitioned table
+
+ long expirationTime = System.currentTimeMillis() + 3600000;
+ store.tryStoreSnapshot(producerId, offsets, expirationTime);
+
+ // Read offsets back
+ Map<TableBucket, Long> retrieved = store.readOffsets(producerId);
+ assertThat(retrieved).hasSize(4);
+ assertThat(retrieved.get(new TableBucket(1L, 0))).isEqualTo(100L);
+ assertThat(retrieved.get(new TableBucket(1L, 1))).isEqualTo(200L);
+ assertThat(retrieved.get(new TableBucket(2L, 0))).isEqualTo(300L);
+ assertThat(retrieved.get(new TableBucket(3L, 100L,
0))).isEqualTo(400L);
Review Comment:
Can simply assert by `assertThat(retrieved).isEqualTo(offsets);`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]