Jackeyzhe commented on code in PR #2134:
URL: https://github.com/apache/fluss/pull/2134#discussion_r2633377433


##########
fluss-client/src/main/java/org/apache/fluss/client/write/ClientColumnLockManager.java:
##########
@@ -0,0 +1,1214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.client.write;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.client.metadata.MetadataUpdater;
+import org.apache.fluss.cluster.ServerNode;
+import org.apache.fluss.rpc.gateway.TabletServerGateway;
+import org.apache.fluss.rpc.messages.AcquireColumnLockRequest;
+import org.apache.fluss.rpc.messages.ReleaseColumnLockRequest;
+import org.apache.fluss.rpc.messages.RenewColumnLockRequest;
+import org.apache.fluss.rpc.protocol.Errors;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.concurrent.FutureUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client-side column lock manager for write operations.
+ *
+ * <p>This class handles acquiring, renewing, and releasing column locks for 
tables. It maintains a
+ * background thread that automatically renews locks before they expire.
+ *
+ * <p>Locks are table-scoped, not bucket-scoped or partition-scoped. A lock 
acquired for a table
+ * applies to the specified columns (or all columns if none specified) across 
all buckets and
+ * partitions in that table.
+ */
+@ThreadSafe
+@Internal
+public class ClientColumnLockManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClientColumnLockManager.class);
+
+    private static final String LOCK_RENEW_THREAD_NAME = "fluss-lock-renewer";
+    // 30 seconds default TTL
+    private static final long DEFAULT_LOCK_TTL_MS = 30000;
+    // Check every 5 seconds
+    private static final long RENEWAL_CHECK_INTERVAL_MS = 5000;
+    // Renew lock when half of TTL has elapsed
+    private static final double RENEWAL_THRESHOLD_RATIO = 0.5;
+    // Timeout for executor shutdown during close
+    private static final long EXECUTOR_SHUTDOWN_TIMEOUT_SECONDS = 5;
+    // Timeout for releasing all locks during close
+    private static final long CLOSE_RELEASE_TIMEOUT_SECONDS = 10;
+
+    /** Clock abstraction for testability. */
+    @VisibleForTesting
+    interface Clock {
+        long currentTimeMillis();
+
+        Clock SYSTEM = System::currentTimeMillis;
+    }
+
+    /** Lock state enum to track reacquisition status. */
+    private enum LockState {
+        ACTIVE,
+        REACQUIRING
+    }
+
+    private final MetadataUpdater metadataUpdater;
+    private final Map<LockKey, LockInfo> activeLocks;
+    private final Map<LockKey, LockState> lockStates;
+    private final ScheduledExecutorService renewalExecutor;
+    private final Clock clock;
+    private volatile boolean closed = false;
+
+    public ClientColumnLockManager(MetadataUpdater metadataUpdater) {
+        this(metadataUpdater, Clock.SYSTEM);
+    }
+
+    @VisibleForTesting
+    ClientColumnLockManager(MetadataUpdater metadataUpdater, Clock clock) {
+        this.metadataUpdater = metadataUpdater;
+        this.activeLocks = MapUtils.newConcurrentHashMap();
+        this.lockStates = MapUtils.newConcurrentHashMap();
+        this.clock = clock;
+        this.renewalExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        r -> {
+                            Thread t = new Thread(r, LOCK_RENEW_THREAD_NAME);
+                            t.setDaemon(true);
+                            return t;
+                        });
+
+        // Start the renewal task
+        renewalExecutor.scheduleAtFixedRate(
+                this::renewLocks,
+                RENEWAL_CHECK_INTERVAL_MS,
+                RENEWAL_CHECK_INTERVAL_MS,
+                TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Lock key that uniquely identifies a column lock. Includes table ID and 
column indexes to
+     * support fine-grained column-level locking.
+     *
+     * <p>This is used by writers to identify and release their locks. For 
partitioned tables, the
+     * lock is table-scoped (applies to all partitions).
+     */
+    public static class LockKey {
+        private final long tableId;
+        private final int[] columnIndexes; // null means all columns
+        private final int hashCode;
+
+        LockKey(long tableId, @Nullable int[] columnIndexes) {
+            this.tableId = tableId;
+            // Defensive copy to prevent external modification
+            this.columnIndexes =
+                    columnIndexes != null
+                            ? Arrays.copyOf(columnIndexes, 
columnIndexes.length)
+                            : null;
+            // Pre-compute hash code for better performance
+            this.hashCode = computeHashCode(tableId, this.columnIndexes);
+        }
+
+        private static int computeHashCode(long tableId, @Nullable int[] 
columnIndexes) {
+            int result = Long.hashCode(tableId);
+            result = 31 * result + Arrays.hashCode(columnIndexes);
+            return result;
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            LockKey lockKey = (LockKey) o;
+            return tableId == lockKey.tableId
+                    && Arrays.equals(columnIndexes, lockKey.columnIndexes);
+        }
+
+        @Override
+        public int hashCode() {
+            return hashCode;
+        }
+
+        @Override
+        public String toString() {
+            return "LockKey{"
+                    + "tableId="
+                    + tableId
+                    + ", columns="
+                    + (columnIndexes == null ? "all" : 
Arrays.toString(columnIndexes))
+                    + '}';
+        }
+    }
+
+    /** Internal class to hold lock state information. */
+    private static class LockInfo {
+        private final long tableId;
+        private final String ownerId;
+        private final int schemaId;
+        private final @Nullable int[] columnIndexes;
+        private final long ttlMs;
+        private final long acquiredTimeMs;
+        private volatile long lastRenewTimeMs;
+        // Store all TabletServer IDs that have been locked
+        private final List<Integer> lockedServerIds;
+
+        LockInfo(
+                long tableId,
+                List<Integer> lockedServerIds,
+                String ownerId,
+                int schemaId,
+                @Nullable int[] columnIndexes,
+                long ttlMs,
+                long acquiredTimeMs) {
+            this.tableId = tableId;
+            this.lockedServerIds = lockedServerIds;
+            this.ownerId = ownerId;
+            this.schemaId = schemaId;
+            this.columnIndexes = columnIndexes;
+            this.ttlMs = ttlMs;
+            this.acquiredTimeMs = acquiredTimeMs;
+            this.lastRenewTimeMs = acquiredTimeMs;
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public List<Integer> getLockedServerIds() {
+            return lockedServerIds;
+        }
+
+        void updateRenewTime(long renewTimeMs) {
+            this.lastRenewTimeMs = renewTimeMs;
+        }
+
+        boolean needsRenewal(long currentTimeMs) {
+            return (currentTimeMs - lastRenewTimeMs) > (ttlMs * 
RENEWAL_THRESHOLD_RATIO);
+        }
+
+        boolean isExpired(long currentTimeMs) {
+            return (currentTimeMs - lastRenewTimeMs) > ttlMs;
+        }
+
+        @Override
+        public String toString() {
+            return "LockInfo{"
+                    + "tableId="
+                    + tableId
+                    + ", lockedServers="
+                    + lockedServerIds.size()
+                    + ", ownerId='"
+                    + ownerId
+                    + '\''
+                    + ", schemaId="
+                    + schemaId
+                    + ", columnIndexes="
+                    + (columnIndexes == null ? "all" : columnIndexes.length)
+                    + ", ttlMs="
+                    + ttlMs
+                    + ", acquiredTimeMs="
+                    + acquiredTimeMs
+                    + ", lastRenewTimeMs="
+                    + lastRenewTimeMs
+                    + '}';
+        }
+    }
+
+    /**
+     * Acquire a column lock for the specified table.
+     *
+     * <p>Note: Column locks are table-scoped, not bucket-scoped or 
partition-scoped. The lock
+     * acquired applies to all buckets and partitions in the table.
+     *
+     * <p>This method acquires locks on ALL TabletServers in the cluster in 
sequential order
+     * (ordered by server ID) to ensure global consistency.
+     *
+     * @param tableId the table ID
+     * @param ownerId the unique identifier of the lock owner
+     * @param schemaId the schema id that the column indexes belong to
+     * @param columnIndexes the column indexes to lock, null means lock all 
columns
+     * @param ttlMs the Time-To-Live for the lock, null to use default
+     * @return a CompletableFuture that completes with the LockKey when lock 
is acquired on all
+     *     TabletServers
+     */
+    public CompletableFuture<LockKey> acquireLock(
+            long tableId,
+            String ownerId,
+            int schemaId,
+            @Nullable int[] columnIndexes,
+            @Nullable Long ttlMs) {
+        if (closed) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException("ClientColumnLockManager has 
been closed"));
+        }
+
+        long effectiveTtl = ttlMs != null ? ttlMs : DEFAULT_LOCK_TTL_MS;
+        LockKey lockKey = new LockKey(tableId, columnIndexes);
+
+        // Get sorted list of all alive TabletServers
+        List<Integer> sortedServerIds = getSortedServerIds();
+        if (sortedServerIds.isEmpty()) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException("No alive TabletServers found in 
cluster"));
+        }
+
+        LOG.debug(
+                "Acquiring column lock for table {} on {} TabletServers: {}",
+                tableId,
+                sortedServerIds.size(),
+                sortedServerIds);
+
+        // Build acquire request
+        AcquireColumnLockRequest request =
+                buildAcquireRequest(tableId, ownerId, schemaId, columnIndexes, 
effectiveTtl);
+
+        // Acquire locks sequentially with automatic rollback on failure
+        return acquireLocksSequentially(sortedServerIds, request)
+                .thenApply(
+                        acquiredServerIds -> {
+                            // Store lock info after successful acquisition
+                            LockInfo lockInfo =
+                                    new LockInfo(
+                                            tableId,
+                                            acquiredServerIds,
+                                            ownerId,
+                                            schemaId,
+                                            columnIndexes,
+                                            effectiveTtl,
+                                            clock.currentTimeMillis());
+
+                            activeLocks.put(lockKey, lockInfo);
+                            lockStates.put(lockKey, LockState.ACTIVE);
+                            LOG.debug(
+                                    "Acquired column lock on all 
TabletServers: {}, lockKey: {}",
+                                    lockInfo,
+                                    lockKey);
+                            return lockKey;
+                        });
+    }
+
+    /**
+     * Acquire locks on multiple TabletServers sequentially.
+     *
+     * <p>Locks are acquired one by one in the order of server IDs. If any 
acquisition fails, all
+     * previously acquired locks are automatically rolled back.
+     *
+     * <p>Thread safety: acquiredServerIds is modified only within 
thenCompose() callbacks which
+     * execute sequentially, ensuring thread-safe access without explicit 
synchronization.
+     *
+     * @param sortedServerIds server IDs in sorted order
+     * @param request the acquire lock request
+     * @return a CompletableFuture that completes with the list of 
successfully acquired server IDs
+     */
+    private CompletableFuture<List<Integer>> acquireLocksSequentially(
+            List<Integer> sortedServerIds, AcquireColumnLockRequest request) {
+        // Track acquired servers for rollback on failure
+        // Safe to use ArrayList: modified only in sequential thenCompose 
callbacks
+        List<Integer> acquiredServerIds = new ArrayList<>();
+
+        // Build sequential acquisition chain
+        CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
+        for (Integer serverId : sortedServerIds) {
+            chain =
+                    chain.thenCompose(
+                            v -> acquireLockOnServer(serverId, request, 
acquiredServerIds));
+        }
+
+        // Handle failure with rollback
+        return handleAcquisitionResult(chain, acquiredServerIds, request);
+    }
+
+    /**
+     * Handle the result of lock acquisition chain, performing rollback on 
failure.
+     *
+     * @param chain the acquisition chain future
+     * @param acquiredServerIds list of successfully acquired server IDs
+     * @param request the acquire lock request
+     * @return a CompletableFuture that completes with the list of acquired 
server IDs on success,
+     *     or fails with the original error after rollback
+     */
+    private CompletableFuture<List<Integer>> handleAcquisitionResult(
+            CompletableFuture<Void> chain,
+            List<Integer> acquiredServerIds,
+            AcquireColumnLockRequest request) {
+        return chain.handle(
+                        (result, error) -> {
+                            if (error != null) {
+                                return rollbackAndFail(acquiredServerIds, 
request, error);
+                            }
+                            return CompletableFuture.completedFuture(result);
+                        })
+                .thenCompose(f -> f)
+                .thenApply(v -> acquiredServerIds);
+    }
+
+    /**
+     * Rollback acquired locks and propagate the original error.
+     *
+     * @param acquiredServerIds list of server IDs where locks were acquired
+     * @param request the acquire lock request
+     * @param originalError the original error that triggered rollback
+     * @return a CompletableFuture that fails with the original error after 
rollback completes
+     */
+    private CompletableFuture<Void> rollbackAndFail(
+            List<Integer> acquiredServerIds,
+            AcquireColumnLockRequest request,
+            Throwable originalError) {
+        LOG.warn(
+                "Failed to acquire lock on some servers, rolling back {} 
acquired locks",
+                acquiredServerIds.size());
+        return rollbackAcquiredLocks(acquiredServerIds, request.getTableId(), 
request.getOwnerId())
+                .thenCompose(
+                        v -> {
+                            // Re-throw original error after rollback
+                            return FutureUtils.completedExceptionally(
+                                    unwrapException(originalError));
+                        });
+    }
+
+    /**
+     * Acquire lock on a single TabletServer.
+     *
+     * @param serverId the server ID
+     * @param request the acquire lock request
+     * @param acquiredServerIds list to track acquired servers (modified on 
success)
+     * @return a CompletableFuture that completes when lock is acquired
+     */
+    private CompletableFuture<Void> acquireLockOnServer(
+            Integer serverId, AcquireColumnLockRequest request, List<Integer> 
acquiredServerIds) {
+        TabletServerGateway gateway = 
metadataUpdater.newTabletServerClientForNode(serverId);
+        if (gateway == null) {
+            return FutureUtils.completedExceptionally(
+                    new IllegalStateException(
+                            formatLockError(
+                                    "TabletServer gateway not found",
+                                    request.getTableId(),
+                                    request.getOwnerId(),
+                                    serverId)));
+        }
+
+        return gateway.acquireColumnLock(request)
+                .thenApply(
+                        response -> {
+                            validateResponse(
+                                    response.hasErrorCode(),
+                                    () -> 
Errors.forCode(response.getErrorCode()),
+                                    () ->
+                                            response.hasErrorMessage()
+                                                    ? 
response.getErrorMessage()
+                                                    : formatLockError(
+                                                            "Failed to acquire 
column lock",
+                                                            
request.getTableId(),
+                                                            
request.getOwnerId(),
+                                                            serverId));
+
+                            acquiredServerIds.add(serverId);
+                            LOG.debug("Acquired column lock on TabletServer 
{}", serverId);
+                            return null;
+                        });
+    }
+
+    /**
+     * Rollback locks that have been acquired on some servers.
+     *
+     * <p>This method is called when lock acquisition fails on some servers 
after successfully
+     * acquiring locks on other servers. It attempts to release the locks that 
were acquired to
+     * prevent resource leakage.
+     *
+     * <p>Locks are released in reverse order (LIFO) to maintain consistency 
and avoid potential
+     * deadlock scenarios. Failed releases are logged but do not fail the 
overall rollback
+     * operation.
+     *
+     * @param acquiredServerIds list of server IDs where locks were 
successfully acquired
+     * @param tableId the table ID
+     * @param ownerId the owner ID of the locks
+     * @return a CompletableFuture that completes when all rollback operations 
finish
+     */
+    private CompletableFuture<Void> rollbackAcquiredLocks(
+            List<Integer> acquiredServerIds, long tableId, String ownerId) {
+        if (acquiredServerIds.isEmpty()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        ReleaseColumnLockRequest request = buildReleaseRequest(tableId, 
ownerId);
+
+        // Release locks in reverse order (LIFO)
+        List<Integer> reversedServerIds = new ArrayList<>(acquiredServerIds);
+        Collections.reverse(reversedServerIds);
+
+        List<Integer> failedServers = new ArrayList<>();
+
+        // Build sequential release chain in reverse order
+        CompletableFuture<Void> chain = 
CompletableFuture.completedFuture(null);
+        for (Integer serverId : reversedServerIds) {
+            chain =
+                    chain.thenCompose(
+                            v ->
+                                    releaseLockOnServer(serverId, request)
+                                            .handle(
+                                                    (result, error) -> {
+                                                        if (error != null) {
+                                                            
failedServers.add(serverId);
+                                                            LOG.warn(
+                                                                    "Failed to 
release lock on server {} during rollback: {}",
+                                                                    serverId,
+                                                                    
error.getMessage());
+                                                        } else {
+                                                            LOG.debug(
+                                                                    "Released 
column lock on TabletServer {} during rollback",
+                                                                    serverId);
+                                                        }
+                                                        // Always succeed to 
allow other releases to
+                                                        // continue
+                                                        return null;
+                                                    }));
+        }
+
+        // Wait for all releases and log summary
+        return chain.thenRun(
+                () -> {
+                    if (!failedServers.isEmpty()) {

Review Comment:
   Do `failedServers` need to retry util they release success?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to