wuchong commented on code in PR #2434: URL: https://github.com/apache/fluss/pull/2434#discussion_r2728094983
########## fluss-common/src/main/java/org/apache/fluss/rpc/messages/RegisterResult.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.rpc.messages; + +import org.apache.fluss.annotation.PublicEvolving; + +/** + * Result of producer offset registration. + * + * <p>This enum indicates whether a producer offset snapshot was newly created or already existed + * when calling registerProducerOffsets API. + * + * @since 0.9 + */ +@PublicEvolving +public enum RegisterResult { Review Comment: This class is duplicated with `org.apache.fluss.client.admin.RegisterResult`. We can remove this class, and just use `response.setResult(created ? 0 : 1)` in `CoordinatorService`. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java: ########## @@ -983,4 +1010,140 @@ public FlussPrincipal getFlussPrincipal() { return flussPrincipal; } } + + // ================================================================================== + // Producer Offset Management APIs (for Exactly-Once Semantics) + // ================================================================================== + + @Override + public CompletableFuture<RegisterProducerOffsetsResponse> registerProducerOffsets( + RegisterProducerOffsetsRequest request) { + // Authorization: require WRITE permission on all tables in the request + if (authorizer != null) { + for (PbProducerTableOffsets tableOffsets : request.getTableOffsetsList()) { + long tableId = tableOffsets.getTableId(); + authorizeTable(OperationType.WRITE, tableId); + } + } + + return CompletableFuture.supplyAsync( + () -> { + try { + String producerId = request.getProducerId(); + Map<TableBucket, Long> offsets = + ServerRpcMessageUtils.toTableBucketOffsets( + request.getTableOffsetsList()); + + // Use custom TTL if provided, otherwise use default (null means use + // manager's default) + Long ttlMs = request.hasTtlMs() ? request.getTtlMs() : null; + + // Register with atomic "check and register" semantics + boolean created = + producerOffsetsManager.registerSnapshot(producerId, offsets, ttlMs); + + RegisterProducerOffsetsResponse response = + new RegisterProducerOffsetsResponse(); + response.setResult( + created + ? RegisterResult.CREATED.getCode() + : RegisterResult.ALREADY_EXISTS.getCode()); + return response; + } catch (Exception e) { + throw new RuntimeException( + "Failed to register producer offsets for producer " + + request.getProducerId(), + e); + } + }, + ioExecutor); + } + + @Override + public CompletableFuture<GetProducerOffsetsResponse> getProducerOffsets( + GetProducerOffsetsRequest request) { + String producerId = request.getProducerId(); + + return CompletableFuture.supplyAsync( + () -> { + try { + Optional<ProducerOffsets> optSnapshot = + producerOffsetsManager.getSnapshotMetadata(producerId); + if (!optSnapshot.isPresent()) { + return new GetProducerOffsetsResponse(); + } + + ProducerOffsets snapshot = optSnapshot.get(); + Map<TableBucket, Long> allOffsets = + producerOffsetsManager.getOffsets(producerId); + Map<Long, Map<TableBucket, Long>> offsetsByTable = + groupOffsetsByTableId(allOffsets); + + // Authorization: filter tables by READ permission + if (authorizer != null) { + offsetsByTable + .keySet() + .removeIf( + tableId -> { + try { + authorizeTable(OperationType.READ, tableId); + return false; // keep this table + } catch (Exception e) { + return true; // remove this table + } + }); + } + + GetProducerOffsetsResponse response = new GetProducerOffsetsResponse(); + response.setProducerId(producerId); + response.setExpirationTime(snapshot.getExpirationTime()); + + for (Map.Entry<Long, Map<TableBucket, Long>> entry : + offsetsByTable.entrySet()) { + addTableOffsetsToResponse(response, entry.getKey(), entry.getValue()); + } + + return response; + } catch (Exception e) { + throw new RuntimeException( + "Failed to get producer offsets for producer " + producerId, e); + } + }, + ioExecutor); + } + + @Override + public CompletableFuture<DeleteProducerOffsetsResponse> deleteProducerOffsets( + DeleteProducerOffsetsRequest request) { + return CompletableFuture.supplyAsync( + () -> { + try { + String producerId = request.getProducerId(); + + // Authorization: require WRITE permission on all tables in the snapshot + if (authorizer != null) { + Map<TableBucket, Long> offsets = + producerOffsetsManager.getOffsets(producerId); + // Extract unique table IDs from the snapshot + Set<Long> tableIds = + offsets.keySet().stream() + .map(TableBucket::getTableId) + .collect(Collectors.toSet()); + // Check WRITE permission for each table + for (Long tableId : tableIds) { + authorizeTable(OperationType.WRITE, tableId); + } + } + + producerOffsetsManager.deleteSnapshot(producerId); + return new DeleteProducerOffsetsResponse(); + } catch (Exception e) { Review Comment: We should re-throw all `ApiException`s as-is; otherwise, exceptions such as `AuthorizationException` or `UnknownTableOrBucketException` thrown by `authorizeTable` will not be properly recognized by the client as their original types. Please add permissions test in `ProducerOffsetsITCase` to verify the exception type and message for all producer offsets operations. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/producer/ProducerOffsetsManager.java: ########## @@ -0,0 +1,611 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator.producer; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.fs.FileStatus; +import org.apache.fluss.fs.FileSystem; +import org.apache.fluss.fs.FsPath; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.producer.ProducerOffsets; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; +import org.apache.fluss.utils.types.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Manager for producer offset snapshots lifecycle. + * + * <p>This manager handles: + * + * <ul> + * <li>Registering new producer snapshots with atomic "check and register" semantics + * <li>Retrieving producer snapshot offsets + * <li>Deleting producer snapshots + * <li>Periodic cleanup of expired snapshots + * <li>Cleanup of orphan files in remote storage + * </ul> + * + * <p>The manager uses a background scheduler to periodically clean up expired snapshots and orphan + * files. The cleanup interval and snapshot TTL are configurable. + * + * @see ProducerOffsetsStore for low-level storage operations + */ +public class ProducerOffsetsManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(ProducerOffsetsManager.class); + + /** Maximum number of attempts for snapshot registration to avoid infinite loops. */ + private static final int MAX_REGISTER_ATTEMPTS = 3; + + private final ProducerOffsetsStore offsetsStore; + private final long defaultTtlMs; + private final long cleanupIntervalMs; + private final ScheduledExecutorService cleanupScheduler; + + public ProducerOffsetsManager(Configuration conf, ZooKeeperClient zkClient) { + this( + new ProducerOffsetsStore(zkClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)), + conf.get(ConfigOptions.COORDINATOR_PRODUCER_OFFSETS_TTL).toMillis(), + conf.get(ConfigOptions.COORDINATOR_PRODUCER_OFFSETS_CLEANUP_INTERVAL).toMillis()); + } + + @VisibleForTesting + ProducerOffsetsManager( + ProducerOffsetsStore offsetsStore, long defaultTtlMs, long cleanupIntervalMs) { + this.offsetsStore = offsetsStore; + this.defaultTtlMs = defaultTtlMs; + this.cleanupIntervalMs = cleanupIntervalMs; + this.cleanupScheduler = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("producer-snapshot-cleanup")); + } + + /** Starts the background cleanup task. */ + public void start() { + cleanupScheduler.scheduleAtFixedRate( + this::runCleanup, cleanupIntervalMs, cleanupIntervalMs, TimeUnit.MILLISECONDS); + LOG.info( + "Started producer snapshot manager with TTL={} ms, cleanup interval={} ms", + defaultTtlMs, + cleanupIntervalMs); + } + + // ------------------------------------------------------------------------ + // Public API + // ------------------------------------------------------------------------ + + /** + * Registers a new producer offset snapshot with atomic "check and register" semantics. + * + * <p>This method uses ZooKeeper's version mechanism to handle concurrent requests safely: + * + * <ul> + * <li>Atomicity: Uses ZK's atomic create for new snapshots + * <li>Idempotency: Concurrent requests with same producerId will have exactly one succeed + * with CREATED, others return ALREADY_EXISTS + * <li>Version-based cleanup: Uses ZK version to safely delete expired snapshots without race + * conditions + * </ul> + * + * @param producerId the producer ID (typically Flink job ID) + * @param offsets map of TableBucket to offset for all tables + * @param ttlMs TTL in milliseconds for the snapshot, or null to use default + * @return true if a new snapshot was created (CREATED), false if snapshot already existed + * (ALREADY_EXISTS) + * @throws IllegalArgumentException if producerId is invalid (null, empty, contains invalid + * characters, etc.) + * @throws Exception if the operation fails + */ + public boolean registerSnapshot(String producerId, Map<TableBucket, Long> offsets, Long ttlMs) + throws Exception { + // Validate producerId as it will be used as both file name and ZK node name + validateProducerId(producerId); + + long effectiveTtlMs = ttlMs != null ? ttlMs : defaultTtlMs; + + // Use loop instead of recursion to avoid stack overflow risk + for (int attempt = 0; attempt < MAX_REGISTER_ATTEMPTS; attempt++) { + long currentTimeMs = System.currentTimeMillis(); + long expirationTime = currentTimeMs + effectiveTtlMs; + + // Step 1: Try to atomically create the snapshot (common case) + if (offsetsStore.tryStoreSnapshot(producerId, offsets, expirationTime)) { + return true; + } + + // Step 2: Snapshot exists - check if it's valid or expired + RegisterAttemptResult result = + handleExistingSnapshot(producerId, offsets, expirationTime, currentTimeMs); + + switch (result) { + case ALREADY_EXISTS: + return false; + case CREATED: + return true; + case RETRY: + // Continue to next iteration + LOG.debug( + "Retrying snapshot registration for producer {} (attempt {}/{})", + producerId, + attempt + 1, + MAX_REGISTER_ATTEMPTS); + break; + } + } + + // Exhausted all attempts + LOG.warn( + "Failed to register snapshot for producer {} after {} attempts, " + + "concurrent modifications may have interfered", + producerId, + MAX_REGISTER_ATTEMPTS); + return false; + } + + /** + * Handles the case where a snapshot already exists for the producer. + * + * @return the result indicating next action: ALREADY_EXISTS, CREATED, or RETRY + */ + private RegisterAttemptResult handleExistingSnapshot( + String producerId, + Map<TableBucket, Long> offsets, + long expirationTime, + long currentTimeMs) + throws Exception { + + Optional<Tuple2<ProducerOffsets, Integer>> existingWithVersion = + offsetsStore.getSnapshotMetadataWithVersion(producerId); + + // Case 1: Snapshot was deleted between our create attempt and this check + if (!existingWithVersion.isPresent()) { + return offsetsStore.tryStoreSnapshot(producerId, offsets, expirationTime) + ? RegisterAttemptResult.CREATED + : RegisterAttemptResult.RETRY; + } + + ProducerOffsets existingSnapshot = existingWithVersion.get().f0; + int version = existingWithVersion.get().f1; + + // Case 2: Valid (non-expired) snapshot exists + if (!existingSnapshot.isExpired(currentTimeMs)) { + LOG.info( + "Producer snapshot already exists for producer {} (expires at {}, version {})", + producerId, + existingSnapshot.getExpirationTime(), + version); + return RegisterAttemptResult.ALREADY_EXISTS; + } + + // Case 3: Expired snapshot - try to clean up and create new one + return tryReplaceExpiredSnapshot( + producerId, offsets, expirationTime, existingSnapshot, version, currentTimeMs); + } + + /** + * Attempts to replace an expired snapshot with a new one. + * + * @return the result indicating next action + */ + private RegisterAttemptResult tryReplaceExpiredSnapshot( + String producerId, + Map<TableBucket, Long> offsets, + long expirationTime, + ProducerOffsets expiredSnapshot, + int version, + long currentTimeMs) + throws Exception { + + LOG.info( + "Found expired snapshot for producer {} (version {}), attempting cleanup", + producerId, + version); + + // Try version-based conditional delete + boolean deleted = + offsetsStore.deleteSnapshotIfVersion(producerId, expiredSnapshot, version); + + if (!deleted) { + // Version mismatch - another process modified the snapshot + LOG.debug( + "Version mismatch during delete for producer {}, checking current state", + producerId); + return checkCurrentSnapshotState(producerId, currentTimeMs); + } + + // Successfully deleted, try to create new snapshot + if (offsetsStore.tryStoreSnapshot(producerId, offsets, expirationTime)) { + return RegisterAttemptResult.CREATED; + } + + // Another concurrent request created a snapshot after our delete + LOG.debug( + "Concurrent creation detected for producer {} after delete, checking state", + producerId); + return checkCurrentSnapshotState(producerId, currentTimeMs); + } + + /** + * Checks the current snapshot state and returns the appropriate result. + * + * <p>This method is used after detecting concurrent modifications to determine whether a valid + * snapshot now exists or if we should retry. + * + * @param producerId the producer ID + * @param currentTimeMs the current time for expiration check + * @return ALREADY_EXISTS if a valid snapshot exists, RETRY otherwise + */ + private RegisterAttemptResult checkCurrentSnapshotState(String producerId, long currentTimeMs) + throws Exception { + Optional<ProducerOffsets> producerOffsets = offsetsStore.getSnapshotMetadata(producerId); + + if (isValidSnapshot(producerOffsets, currentTimeMs)) { + LOG.info( + "Valid snapshot exists for producer {} after concurrent modification", + producerId); + return RegisterAttemptResult.ALREADY_EXISTS; + } + + LOG.debug("No valid snapshot for producer {} after concurrent modification", producerId); + return RegisterAttemptResult.RETRY; + } + + /** + * Checks if a snapshot is present and not expired. + * + * @param producerOffsets the optional snapshot + * @param currentTimeMs the current time for expiration check + * @return true if snapshot exists and is not expired + */ + private boolean isValidSnapshot(Optional<ProducerOffsets> producerOffsets, long currentTimeMs) { + return producerOffsets.isPresent() && !producerOffsets.get().isExpired(currentTimeMs); + } + + /** Result of a single registration attempt. */ + private enum RegisterAttemptResult { + /** Snapshot was successfully created. */ + CREATED, + /** A valid snapshot already exists. */ + ALREADY_EXISTS, + /** Should retry the registration. */ + RETRY + } + + /** + * Gets the snapshot metadata for a producer. + * + * @param producerId the producer ID + * @return Optional containing the snapshot if exists + * @throws IllegalArgumentException if producerId is invalid + * @throws Exception if the operation fails + */ + public Optional<ProducerOffsets> getSnapshotMetadata(String producerId) throws Exception { + validateProducerId(producerId); + return offsetsStore.getSnapshotMetadata(producerId); + } + + /** + * Reads all offsets for a producer. + * + * @param producerId the producer ID + * @return map of TableBucket to offset, or empty map if no snapshot exists + * @throws IllegalArgumentException if producerId is invalid + * @throws Exception if the operation fails + */ + public Map<TableBucket, Long> getOffsets(String producerId) throws Exception { + validateProducerId(producerId); + return offsetsStore.readOffsets(producerId); + } + + /** + * Deletes a producer snapshot. + * + * @param producerId the producer ID + * @throws IllegalArgumentException if producerId is invalid + * @throws Exception if the operation fails + */ + public void deleteSnapshot(String producerId) throws Exception { + validateProducerId(producerId); + offsetsStore.deleteSnapshot(producerId); + } + + /** + * Gets the default TTL in milliseconds. + * + * @return the default TTL + */ + public long getDefaultTtlMs() { + return defaultTtlMs; + } + + // ------------------------------------------------------------------------ + // Validation + // ------------------------------------------------------------------------ + + /** + * Validates that a producer ID is valid for use as both a file name and ZK node name. + * + * @param producerId the producer ID to validate + * @throws IllegalArgumentException if producerId is invalid + */ + private void validateProducerId(String producerId) { + String invalidReason = TablePath.detectInvalidName(producerId); + if (invalidReason != null) { + throw new IllegalArgumentException( Review Comment: We should use `ApiException` whenever an exception needs to be propagated to the client. Otherwise, it will be wrapped as an `UnknownServerException`, which we should avoid. In this case, maybe we can introduce a `InvalidProducerIdException`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
