platinumhamburg commented on code in PR #2179:
URL: https://github.com/apache/fluss/pull/2179#discussion_r2684178818
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -412,6 +415,56 @@ CompletableFuture<Void> dropPartition(
CompletableFuture<KvSnapshotMetadata> getKvSnapshotMetadata(
TableBucket bucket, long snapshotId);
+ /**
+ * Acquires a lease for specific KV snapshots of the given tableBuckets
asynchronously.
+ *
+ * <p>Once acquired, the specified KV snapshots will be protected from
garbage collection for
+ * the duration of the {@code leaseDuration}. The client must call {@link
+ * #releaseKvSnapshotLease} to release the lock early when reading is
finished.
+ *
+ * <p>If the lease expires (no renew received within duration), the server
is free to delete the
+ * snapshot files.
+ *
+ * <p>The following exceptions can be anticipated when calling {@code
get()} on returned future:
+ *
+ * <ul>
+ * <li>{@link TableNotExistException} if the table does not exist.
+ * <li>{@link PartitionNotExistException} if the partition does not
exist.
+ * </ul>
+ *
+ * @param leaseId The unique ID for this lease session (usually a UUID
generated by client).
+ * @param snapshotIds The snapshots to lease, a map from TableBucket to
kvSnapshotId.
+ * @param leaseDuration The duration (in milliseconds) for which the
snapshots should be kept.
+ * @return The result of the acquire operation, containing any buckets
that failed to be locked.
+ */
+ CompletableFuture<AcquireKvSnapshotLeaseResult> acquireKvSnapshotLease(
+ String leaseId, Map<TableBucket, Long> snapshotIds, long
leaseDuration);
+
+ /**
+ * Releases the lease for specific tableBuckets asynchronously.
+ *
+ * <p>This is typically called when a client finishes reading a specific
bucket (or a batch of
+ * buckets) but is still reading others under the same leaseId.
+ *
+ * <p>If {@code bucketsToRelease} contains all buckets under this leaseId,
the lease itself will
+ * be removed.
+ *
+ * @param leaseId The lease id.
+ * @param bucketsToRelease The specific tableBuckets to release.
+ */
+ CompletableFuture<Void> releaseKvSnapshotLease(
+ String leaseId, Set<TableBucket> bucketsToRelease);
+
+ /**
+ * Releases the entire lease asynchronously.
+ *
+ * <p>All snapshots locked under this {@code leaseId} will be released
immediately. This is
+ * equivalent to calling {@link #releaseKvSnapshotLease} with all held
buckets.
+ *
+ * @param leaseId The lease id to release.
+ */
+ CompletableFuture<Void> dropKvSnapshotLease(String leaseId);
Review Comment:
rename to releaseAllSnapshotLeases()
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -387,6 +392,33 @@ public CompletableFuture<KvSnapshotMetadata>
getKvSnapshotMetadata(
.thenApply(ClientRpcMessageUtils::toKvSnapshotMetadata);
}
+ @Override
+ public CompletableFuture<AcquireKvSnapshotLeaseResult>
acquireKvSnapshotLease(
+ String leaseId, Map<TableBucket, Long> snapshotIds, long
leaseDuration) {
+ if (snapshotIds.isEmpty()) {
+ throw new IllegalArgumentException(
+ "The snapshotIds to acquire kv snapshot lease is empty");
+ }
+
+ return gateway.acquireKvSnapshotLease(
+ makeAcquireKvSnapshotLeaseRequest(leaseId,
snapshotIds, leaseDuration))
+
.thenApply(ClientRpcMessageUtils::toAcquireKvSnapshotLeaseResult);
+ }
+
+ @Override
+ public CompletableFuture<Void> releaseKvSnapshotLease(
+ String leaseId, Set<TableBucket> bucketsToRelease) {
+ return gateway.releaseKvSnapshotLease(
+ makeReleaseKvSnapshotLeaseRequest(leaseId,
bucketsToRelease))
+ .thenApply(r -> null);
+ }
+
+ @Override
+ public CompletableFuture<Void> dropKvSnapshotLease(String leaseId) {
Review Comment:
rename to releaseAllKvSnapshotLeases()
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ conf,
+ metadataHelper,
+ coordinatorContext,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+ clock,
+ coordinatorMetricGroup);
+ }
+
+ @VisibleForTesting
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ ScheduledExecutorService scheduledExecutor,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this.metadataHelper = metadataHelper;
+ this.conf = conf;
+ this.scheduledExecutor = scheduledExecutor;
+ this.coordinatorContext = coordinatorContext;
+ this.clock = clock;
+ this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+ registerMetrics(coordinatorMetricGroup);
+ }
+
+ public void start() {
+ LOG.info("kv snapshot lease manager has been started.");
+ scheduledExecutor.scheduleWithFixedDelay(
+ this::expireLeases,
+ 0L,
+
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void initialize() throws Exception {
+ for (String leaseId : metadataHelper.getLeasesList()) {
+ Optional<KvSnapshotLease> kvSnapshotLeaseOpt =
metadataHelper.getLease(leaseId);
Review Comment:
In high-availability mode, can this initialization model ensure distributed
concurrency safety?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
Review Comment:
It's very odd for an atomic type to be used as a constant.
##########
fluss-server/src/test/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataManagerTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.server.zk.NOPErrorHandler;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.ZooKeeperExtension;
+import org.apache.fluss.testutils.common.AllCallbackWrapper;
+import org.apache.fluss.utils.IOUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link KvSnapshotLeaseMetadataManager}. */
+public class KvSnapshotLeaseMetadataManagerTest {
+
+ @RegisterExtension
+ public static final AllCallbackWrapper<ZooKeeperExtension>
ZOO_KEEPER_EXTENSION_WRAPPER =
+ new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+ protected static ZooKeeperClient zookeeperClient;
+ private @TempDir Path tempDir;
+ private KvSnapshotLeaseMetadataManager metadataHelper;
+
+ @BeforeAll
+ static void beforeAll() {
+ zookeeperClient =
+ ZOO_KEEPER_EXTENSION_WRAPPER
+ .getCustomExtension()
+ .getZooKeeperClient(NOPErrorHandler.INSTANCE);
+ }
+
+ @BeforeEach
+ void beforeEach() {
+ metadataHelper = new KvSnapshotLeaseMetadataManager(zookeeperClient,
tempDir.toString());
+ }
+
+ @AfterEach
+ void afterEach() {
+ ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();
+ }
+
+ @Test
+ void testGetLeasesList() throws Exception {
+ List<String> leasesList = metadataHelper.getLeasesList();
+ assertThat(leasesList).isEmpty();
+
+ metadataHelper.registerLease("leaseId1", new KvSnapshotLease(1000L));
+
+ Map<Long, KvSnapshotTableLease> tableIdToTableLease = new HashMap<>();
+ tableIdToTableLease.put(1L, new KvSnapshotTableLease(1L, new Long[]
{100L, -1L}));
+ metadataHelper.registerLease("leaseId2", new KvSnapshotLease(2000L,
tableIdToTableLease));
+ leasesList = metadataHelper.getLeasesList();
+ assertThat(leasesList).containsExactlyInAnyOrder("leaseId1",
"leaseId2");
+ }
+
+ @Test
+ void testRegisterAndUpdateLease() throws Exception {
+ Map<Long, KvSnapshotTableLease> tableIdToTableLease = new HashMap<>();
+ tableIdToTableLease.put(1L, new KvSnapshotTableLease(1L, new Long[]
{100L, -1L}));
+
+ Map<Long, Long[]> partitionSnapshots = new HashMap<>();
+ partitionSnapshots.put(1000L, new Long[] {111L, 122L});
+ partitionSnapshots.put(1001L, new Long[] {122L, -1L});
+ tableIdToTableLease.put(2L, new KvSnapshotTableLease(2L,
partitionSnapshots));
+
+ KvSnapshotLease expectedLease = new KvSnapshotLease(1000L,
tableIdToTableLease);
+ metadataHelper.registerLease("leaseId1", expectedLease);
Review Comment:
The metaHelper in the code can be uniformly renamed to leaseMetadataManager.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLease.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/** The entity of kv snapshot lease. */
+@NotThreadSafe
+public class KvSnapshotLease {
+ private long expirationTime;
+
+ /** A map from table id to kv snapshot lease for one table. */
+ private final Map<Long, KvSnapshotTableLease> tableIdToTableLease;
+
+ public KvSnapshotLease(long leaseDuration) {
+ this(leaseDuration, MapUtils.newConcurrentHashMap());
+ }
+
+ public KvSnapshotLease(
+ long expirationTime, Map<Long, KvSnapshotTableLease>
tableIdToTableLease) {
Review Comment:
expirationTime is a more precise variable and parameter name than duration.
However, the codebase extensively mixes both duration and expirationTime;
please standardize them.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Procedure to release kv snapshot lease. */
+public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "leaseId", type =
@DataTypeHint("STRING")),
+ })
+ public String[] call(ProcedureContext context, String leaseId) throws
Exception {
Review Comment:
Add a description in the procedures.md document.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLease.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Objects;
+
+/** The entity of kv snapshot lease. */
+@NotThreadSafe
+public class KvSnapshotLease {
+ private long expirationTime;
+
+ /** A map from table id to kv snapshot lease for one table. */
+ private final Map<Long, KvSnapshotTableLease> tableIdToTableLease;
+
+ public KvSnapshotLease(long leaseDuration) {
+ this(leaseDuration, MapUtils.newConcurrentHashMap());
+ }
+
+ public KvSnapshotLease(
+ long expirationTime, Map<Long, KvSnapshotTableLease>
tableIdToTableLease) {
Review Comment:
See `FlinkConnectorOptions.SCAN_KV_SNAPSHOT_LEASE_DURATION` also.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ conf,
+ metadataHelper,
+ coordinatorContext,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+ clock,
+ coordinatorMetricGroup);
+ }
+
+ @VisibleForTesting
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ ScheduledExecutorService scheduledExecutor,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this.metadataHelper = metadataHelper;
+ this.conf = conf;
+ this.scheduledExecutor = scheduledExecutor;
+ this.coordinatorContext = coordinatorContext;
+ this.clock = clock;
+ this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+ registerMetrics(coordinatorMetricGroup);
+ }
+
+ public void start() {
+ LOG.info("kv snapshot lease manager has been started.");
+ scheduledExecutor.scheduleWithFixedDelay(
+ this::expireLeases,
+ 0L,
+
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void initialize() throws Exception {
+ for (String leaseId : metadataHelper.getLeasesList()) {
+ Optional<KvSnapshotLease> kvSnapshotLeaseOpt =
metadataHelper.getLease(leaseId);
+ if (kvSnapshotLeaseOpt.isPresent()) {
+ KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+ this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+ this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+ initializeRefCount(kvSnapshotLease);
+
+
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+ }
+ }
+ }
+
+ public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket
kvSnapshotLeaseForBucket) {
+ return inReadLock(
+ refCountLock,
+ () ->
+ refCount.getOrDefault(kvSnapshotLeaseForBucket, new
AtomicInteger(0)).get()
+ <= 0);
+ }
+
+ /**
+ * Acquire kv snapshot lease.
+ *
+ * @param leaseId the lease id
+ * @param leaseDuration the lease duration
+ * @param tableIdToLeaseBucket the table id to lease bucket
+ * @return the map of unavailable snapshots that failed to be leased
+ */
+ public Map<TableBucket, Long> acquireLease(
+ String leaseId,
+ long leaseDuration,
+ Map<Long, List<KvSnapshotLeaseForBucket>> tableIdToLeaseBucket)
+ throws Exception {
+ ReadWriteLock lock = leaseLocks.computeIfAbsent(leaseId, k -> new
ReentrantReadWriteLock());
+ return inWriteLock(
+ lock,
+ () -> {
+ // To record the unavailable snapshots such as the kv
snapshotId to lease not
+ // exists.
+ Map<TableBucket, Long> unavailableSnapshots = new
HashMap<>();
+
+ boolean update = kvSnapshotLeaseMap.containsKey(leaseId);
+ KvSnapshotLease kvSnapshotLease;
+ long newLeaseDuration = clock.milliseconds() +
leaseDuration;
+ if (!update) {
+ // set the expiration time as: current time +
leaseDuration
+ kvSnapshotLease = new
KvSnapshotLease(newLeaseDuration);
+ kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+ LOG.info(
+ "kv snapshot lease '"
+ + leaseId
+ + "' has been acquired. The lease
expiration time is "
+ + kvSnapshotLease.getExpirationTime());
+ } else {
+ kvSnapshotLease = kvSnapshotLeaseMap.get(leaseId);
+ kvSnapshotLease.setExpirationTime(newLeaseDuration);
+ }
+
+ for (Map.Entry<Long, List<KvSnapshotLeaseForBucket>> entry
:
+ tableIdToLeaseBucket.entrySet()) {
+ Long tableId = entry.getKey();
+ TableInfo tableInfo =
coordinatorContext.getTableInfoById(tableId);
+ int numBuckets = tableInfo.getNumBuckets();
+ List<KvSnapshotLeaseForBucket> buckets =
entry.getValue();
+ for (KvSnapshotLeaseForBucket bucket : buckets) {
+
+ TableBucket tableBucket = bucket.getTableBucket();
+ long kvSnapshotId = bucket.getKvSnapshotId();
+ try {
+ boolean snapshotExists =
+
metadataHelper.isSnapshotExists(tableBucket, kvSnapshotId);
+ if (!snapshotExists) {
+ unavailableSnapshots.put(tableBucket,
kvSnapshotId);
+ continue;
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Failed to check snapshotExists for
tableBucket when acquire kv "
+ + "snapshot kvSnapshotLease
{}.",
+ tableBucket,
+ e);
+ unavailableSnapshots.put(tableBucket,
kvSnapshotId);
+ continue;
+ }
+
+ long originalSnapshotId =
+ kvSnapshotLease.acquireBucket(
+ tableBucket, kvSnapshotId,
numBuckets);
+ if (originalSnapshotId == -1L) {
+ leasedBucketCount.incrementAndGet();
+ } else {
+ // clear the original ref.
+ decrementRefCount(
+ new KvSnapshotLeaseForBucket(
+ tableBucket,
originalSnapshotId));
+ }
+ incrementRefCount(bucket);
+ }
+ }
+
+ if (update) {
+ metadataHelper.updateLease(leaseId, kvSnapshotLease);
+ } else {
+ metadataHelper.registerLease(leaseId, kvSnapshotLease);
+ }
+
+ return unavailableSnapshots;
+ });
+ }
+
+ public void release(String leaseId, Map<Long, List<TableBucket>>
tableIdToUnregisterBucket)
+ throws Exception {
+ ReadWriteLock lock = leaseLocks.get(leaseId);
+ if (lock == null) {
+ return;
+ }
+
+ inWriteLock(
+ lock,
+ () -> {
+ KvSnapshotLease lease = kvSnapshotLeaseMap.get(leaseId);
+ if (lease == null) {
+ return;
+ }
+
+ for (Map.Entry<Long, List<TableBucket>> entry :
+ tableIdToUnregisterBucket.entrySet()) {
+ List<TableBucket> buckets = entry.getValue();
+ for (TableBucket bucket : buckets) {
+ long snapshotId = lease.releaseBucket(bucket);
+ if (snapshotId != -1L) {
+ leasedBucketCount.decrementAndGet();
+ decrementRefCount(new
KvSnapshotLeaseForBucket(bucket, snapshotId));
+ }
+ }
+ }
+
+ if (lease.isEmpty()) {
+ drop(leaseId);
+ } else {
+ metadataHelper.updateLease(leaseId, lease);
+ }
+ });
+ }
+
+ /**
+ * Drop kv snapshot lease.
+ *
+ * @param leaseId the lease id
+ * @return true if clear success, false if lease not exist
+ */
+ public boolean drop(String leaseId) throws Exception {
+ ReadWriteLock lock = leaseLocks.get(leaseId);
+ if (lock == null) {
+ return false;
+ }
+
+ boolean exist =
+ inWriteLock(
+ lock,
+ () -> {
+ KvSnapshotLease kvSnapshotLease =
kvSnapshotLeaseMap.remove(leaseId);
+ if (kvSnapshotLease == null) {
+ return false;
+ }
+
+ clearRefCount(kvSnapshotLease);
+ metadataHelper.deleteLease(leaseId);
+
+ LOG.info("kv snapshot lease '" + leaseId + "' has
been dropped.");
+ return true;
+ });
+
+ leaseLocks.remove(leaseId);
+ return exist;
+ }
+
+ private void initializeRefCount(KvSnapshotLease lease) {
+ for (Map.Entry<Long, KvSnapshotTableLease> tableEntry :
+ lease.getTableIdToTableLease().entrySet()) {
+ long tableId = tableEntry.getKey();
+ KvSnapshotTableLease tableLease = tableEntry.getValue();
+ if (tableLease.getBucketSnapshots() != null) {
+ Long[] snapshots = tableLease.getBucketSnapshots();
+ for (int i = 0; i < snapshots.length; i++) {
+ if (snapshots[i] == -1L) {
+ continue;
+ }
+
+ incrementRefCount(
+ new KvSnapshotLeaseForBucket(
+ new TableBucket(tableId, i),
snapshots[i]));
+ }
+ } else {
+ Map<Long, Long[]> partitionSnapshots =
tableLease.getPartitionSnapshots();
+ for (Map.Entry<Long, Long[]> entry :
partitionSnapshots.entrySet()) {
+ Long partitionId = entry.getKey();
+ Long[] snapshots = entry.getValue();
+ for (int i = 0; i < snapshots.length; i++) {
+ if (snapshots[i] == -1L) {
+ continue;
+ }
+
+ incrementRefCount(
+ new KvSnapshotLeaseForBucket(
+ new TableBucket(tableId, partitionId,
i), snapshots[i]));
+ }
+ }
+ }
+ }
+ }
+
+ private void clearRefCount(KvSnapshotLease lease) {
+ for (Map.Entry<Long, KvSnapshotTableLease> tableEntry :
+ lease.getTableIdToTableLease().entrySet()) {
+ long tableId = tableEntry.getKey();
+ KvSnapshotTableLease tableLease = tableEntry.getValue();
+ if (tableLease.getBucketSnapshots() != null) {
+ Long[] snapshots = tableLease.getBucketSnapshots();
+ for (int i = 0; i < snapshots.length; i++) {
+ if (snapshots[i] == -1L) {
+ continue;
+ }
+ decrementRefCount(
+ new KvSnapshotLeaseForBucket(
+ new TableBucket(tableId, i),
snapshots[i]));
+ leasedBucketCount.decrementAndGet();
+ }
+ } else {
+ Map<Long, Long[]> partitionSnapshots =
tableLease.getPartitionSnapshots();
+ for (Map.Entry<Long, Long[]> entry :
partitionSnapshots.entrySet()) {
+ Long partitionId = entry.getKey();
+ Long[] snapshots = entry.getValue();
+ for (int i = 0; i < snapshots.length; i++) {
+ if (snapshots[i] == -1L) {
+ continue;
+ }
+
+ decrementRefCount(
+ new KvSnapshotLeaseForBucket(
+ new TableBucket(tableId, partitionId,
i), snapshots[i]));
+ leasedBucketCount.decrementAndGet();
+ }
+ }
+ }
+ }
+ }
+
+ private void incrementRefCount(KvSnapshotLeaseForBucket
kvSnapshotLeaseForBucket) {
+ inWriteLock(
+ refCountLock,
+ () ->
+ refCount.computeIfAbsent(
+ kvSnapshotLeaseForBucket, k -> new
AtomicInteger(0))
+ .incrementAndGet());
+ }
+
+ private void decrementRefCount(KvSnapshotLeaseForBucket
kvSnapshotLeaseForBucket) {
+ inWriteLock(
+ refCountLock,
+ () -> {
+ AtomicInteger atomicInteger =
refCount.get(kvSnapshotLeaseForBucket);
+ if (atomicInteger != null) {
+ int decrementAndGet = atomicInteger.decrementAndGet();
+ if (decrementAndGet <= 0) {
+ refCount.remove(kvSnapshotLeaseForBucket);
+ }
+ }
+ });
+ }
+
+ private void expireLeases() {
+ long currentTime = clock.milliseconds();
+ kvSnapshotLeaseMap.entrySet().stream()
+ .filter(entry -> entry.getValue().getExpirationTime() <
currentTime)
+ .map(Map.Entry::getKey)
+ .forEach(
+ leaseId -> {
+ try {
+ drop(leaseId);
Review Comment:
Although ConcurrentHashMap is used, calling drop to modify the map during
iteration may lead to inconsistent behavior.
Recommendation: first collect the lease IDs that need to be removed, then
delete them in bulk.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -132,6 +140,13 @@ public class FlinkSourceEnumerator
private final OffsetsInitializer startingOffsetsInitializer;
private final OffsetsInitializer stoppingOffsetsInitializer;
+ private final LeaseContext leaseContext;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ /** checkpointId -> tableBuckets who finished consume kv snapshots. */
+ @GuardedBy("lock")
+ private final TreeMap<Long, Set<TableBucket>> consumedKvSnapshotMap = new
TreeMap<>();
Review Comment:
Is there a concurrency safety issue here? Why is a read-write lock needed?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Procedure to release kv snapshot lease. */
+public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
Review Comment:
It's recommended to improve the comments and include usage examples.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
Review Comment:
You've annotated KvSnapshotLeaseManager with @ThreadSafe, which implies that
it may be used in a multi-threaded environment and makes no guarantee that it
shares a thread with CoordinatorEventProcessor. Under this assumption, directly
accessing CoordinatorContext—which is marked as @NotThreadSafe—introduces a
thread-safety model violation.
##########
website/docs/maintenance/configuration.md:
##########
@@ -157,7 +158,7 @@ during the Fluss cluster working.
| kv.rocksdb.use-bloom-filter | Boolean | true
| If true, every newly created SST file will contain a Bloom
filter. It is enabled by default.
|
| kv.rocksdb.bloom-filter.bits-per-key | Double | 10.0
| Bits per key that bloom filter will use, this only take
effect when bloom filter is used. The default value is 10.0.
|
| kv.rocksdb.bloom-filter.block-based-mode | Boolean | false
| If true, RocksDB will use block-based filter instead of
full filter, this only take effect when bloom filter is used. The default value
is `false`.
|
-| kv.rocksdb.shared-rate-limiter-bytes-per-sec | MemorySize |
Long.MAX_VALUE | The bytes per second rate limit for RocksDB
flush and compaction operations shared across all RocksDB instances on the
TabletServer. The rate limiter is always enabled. The default value is
Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to
limit the rate. This configuration can be updated dynamically without server
restart. See [Updating Configs](operations/updating-configs.md) for more
details.
|
+| kv.rocksdb.shared-rate-limiter-bytes-per-sec | MemorySize |
Long.MAX_VALUE | The bytes per second rate limit for RocksDB
flush and compaction operations shared across all RocksDB instances on the
TabletServer. The rate limiter is always enabled. The default value is
Long.MAX_VALUE (effectively unlimited). Set to a lower value (e.g., 100MB) to
limit the rate. This configuration can be updated dynamically without server
restart. See [Updating Configs](operations/updating-configs.md) for more
details.
|
Review Comment:
Clean up unrelated changes.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
Review Comment:
Rename to metadataManager
##########
website/docs/maintenance/observability/monitor-metrics.md:
##########
@@ -325,6 +325,16 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The total number of partitions in this cluster.</td>
<td>Gauge</td>
</tr>
+ <tr>
+ <td>kvSnapshotLeaseCount</td>
+ <td>The total number of kv snapshots in this cluster.</td>
Review Comment:
The metric name does not match its description.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -531,22 +543,96 @@ private List<SourceSplitBase>
initPrimaryKeyTablePartitionSplits(
List<SourceSplitBase> splits = new ArrayList<>();
for (Partition partition : newPartitions) {
String partitionName = partition.getPartitionName();
- // get the table snapshot info
- final KvSnapshots kvSnapshots;
- try {
- kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath,
partitionName).get();
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format(
- "Failed to get table snapshot for table %s and
partition %s",
- tablePath, partitionName),
- ExceptionUtils.stripCompletionException(e));
- }
- splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
+ splits.addAll(
+ getSnapshotAndLogSplits(
+ getLatestKvSnapshotsAndRegister(partitionName),
partitionName));
}
return splits;
}
+ private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String
partitionName) {
+ long tableId;
+ Long partitionId;
+ Map<Integer, Long> snapshotIds = new HashMap<>();
+ Map<Integer, Long> logOffsets = new HashMap<>();
+
+ // retry to get the latest kv snapshots and acquire kvSnapshot lease
util all buckets
+ // acquire success. The reason is that getLatestKvSnapshots and
acquireKvSnapshotLease
+ // are not atomic operations, the latest kv snapshot obtained via get
may become outdated by
+ // the time it is passed to acquire. Therefore, this logic must
implement a retry
+ // mechanism: the unavailable tableBuckets in the
AcquiredKvSnapshotLeaseResult returned by
+ // acquireKvSnapshotLease must be retried repeatedly until all buckets
are successfully
+ // acquired.
+ try {
+ Set<TableBucket> remainingTableBuckets;
+ do {
+ KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
+ remainingTableBuckets = new
HashSet<>(kvSnapshots.getTableBuckets());
+
+ tableId = kvSnapshots.getTableId();
+ partitionId = kvSnapshots.getPartitionId();
+
+ Set<TableBucket> ignoreBuckets = new HashSet<>();
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TableBucket tb : remainingTableBuckets) {
+ int bucket = tb.getBucket();
+ OptionalLong snapshotIdOpt =
kvSnapshots.getSnapshotId(bucket);
+ OptionalLong logOffsetOpt =
kvSnapshots.getLogOffset(bucket);
+ if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
+ bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
+ } else {
+ ignoreBuckets.add(tb);
+ }
+
+ snapshotIds.put(
+ bucket, snapshotIdOpt.isPresent() ?
snapshotIdOpt.getAsLong() : null);
+ logOffsets.put(
+ bucket, logOffsetOpt.isPresent() ?
logOffsetOpt.getAsLong() : null);
+ }
+
+ if (!ignoreBuckets.isEmpty()) {
+ remainingTableBuckets.removeAll(ignoreBuckets);
+ }
+
+ if (!bucketsToLease.isEmpty()) {
+ String kvSnapshotLeaseId =
leaseContext.getKvSnapshotLeaseId();
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for table {}",
+ kvSnapshotLeaseId,
+ tablePath);
+ remainingTableBuckets =
+ flussAdmin
+ .acquireKvSnapshotLease(
+ kvSnapshotLeaseId,
+ bucketsToLease,
+
leaseContext.getKvSnapshotLeaseDurationMs())
Review Comment:
acquireKvSnapshotLease() may throw NullPointerException when [null ==
easeContext.getKvSnapshotLeaseDurationMs()]
##########
website/docs/engine-flink/options.md:
##########
@@ -91,20 +91,22 @@ See more details about [ALTER TABLE ...
SET](engine-flink/ddl.md#set-properties)
## Read Options
-| Option | Type | Default
| Description
|
-|-----------------------------------------------------|------------|-------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| scan.startup.mode | Enum | full
| The scan startup mode enables you to
specify the starting point for data consumption. Fluss currently supports the
following `scan.startup.mode` options: `full` (default), earliest, latest,
timestamp. See the [Start Reading
Position](engine-flink/reads.md#start-reading-position) for more details.
|
-| scan.startup.timestamp | Long | (None)
| The timestamp to start reading the data
from. This option is only valid when `scan.startup.mode` is set to `timestamp`.
The format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like
`1678883047356` or `2023-12-09 23:09:12`.
|
-| scan.partition.discovery.interval | Duration | 1min
| The time interval for the Fluss source
to discover the new partitions for partitioned table while scanning. A
non-positive value disables the partition discovery. The default value is 1
minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath)
requires a large number of requests to ZooKeeper in server, this option cannot
be set too small, as a small value would cause frequent requests and increase
server load. In the future, once list partitions is optimized, the default
value of this parameter can be reduced. |
-| client.scanner.log.check-crc | Boolean | true
| Automatically check the CRC3 of the
read records for LogScanner. This ensures no on-the-wire or on-disk corruption
to the messages occurred. This check adds some overhead, so it may be disabled
in cases seeking extreme performance.
|
-| client.scanner.log.max-poll-records | Integer | 500
| The maximum number of records returned
in a single call to poll() for LogScanner. Note that this config doesn't impact
the underlying fetching behavior. The Scanner will cache the records from each
fetch request and returns them incrementally from each poll.
|
-| client.scanner.log.fetch.max-bytes | MemorySize | 16mb
| The maximum amount of data the server
should return for a fetch request from client. Records are fetched in batches,
and if the first record batch in the first non-empty bucket of the fetch is
larger than this value, the record batch will still be returned to ensure that
the fetch can make progress. As such, this is not a absolute maximum.
|
-| client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb
| The maximum amount of data the server
should return for a table bucket in fetch request fom client. Records are
fetched in batches, and the max bytes size is config by this option.
|
-| client.scanner.log.fetch.min-bytes | MemorySize | 1b
| The minimum bytes expected for each
fetch log request from client to response. If not enough bytes, wait up to
client.scanner.log.fetch-wait-max-time time to return.
|
-| client.scanner.log.fetch.wait-max-time | Duration | 500ms
| The maximum time to wait for enough
bytes to be available for a fetch log request from client to response.
|
-| client.scanner.io.tmpdir | String |
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used
by client for storing the data files (like kv snapshot, log segment files) to
read temporarily
|
-| client.scanner.remote-log.prefetch-num | Integer | 4
| The number of remote log segments to
keep in local temp file for LogScanner, which download from remote storage. The
default setting is 4.
|
-| client.remote-file.download-thread-num | Integer | 3
| The number of threads the client uses
to download remote files.
|
+| Option | Type | Default
| Description
|
+|-----------------------------------------------|------------|-------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| scan.startup.mode | Enum | full
| The scan startup mode enables you to specify
the starting point for data consumption. Fluss currently supports the following
`scan.startup.mode` options: `full` (default), earliest, latest, timestamp. See
the [Start Reading Position](engine-flink/reads.md#start-reading-position) for
more details.
|
+| scan.startup.timestamp | Long | (None)
| The timestamp to start reading the data from.
This option is only valid when `scan.startup.mode` is set to `timestamp`. The
format is 'milli-second-since-epoch' or `yyyy-MM-dd HH:mm:ss`, like
`1678883047356` or `2023-12-09 23:09:12`.
|
+| scan.partition.discovery.interval | Duration | 1min
| The time interval for the Fluss source to
discover the new partitions for partitioned table while scanning. A
non-positive value disables the partition discovery. The default value is 1
minute. Currently, since Fluss Admin#listPartitions(TablePath tablePath)
requires a large number of requests to ZooKeeper in server, this option cannot
be set too small, as a small value would cause frequent requests and increase
server load. In the future, once list partitions is optimized, the default
value of this parameter can be reduced. |
+| scan.kv.snapshot.lease.id | String | UUID
| The lease id to lease kv snapshots. If set,
the acquired kv snapshots will not be deleted until the consumer finished
consuming all the snapshots or the lease duration time is reached. If not set,
an UUID will be set.
|
+| scan.kv.snapshot.lease.duration | Duration | 1day
| The time period how long to wait before
expiring the kv snapshot lease to avoid kv snapshot blocking to delete.
|
+| client.scanner.log.check-crc | Boolean | true
| Automatically check the CRC3 of the read
records for LogScanner. This ensures no on-the-wire or on-disk corruption to
the messages occurred. This check adds some overhead, so it may be disabled in
cases seeking extreme performance.
|
+| client.scanner.log.max-poll-records | Integer | 500
| The maximum number of records returned in a
single call to poll() for LogScanner. Note that this config doesn't impact the
underlying fetching behavior. The Scanner will cache the records from each
fetch request and returns them incrementally from each poll.
|
+| client.scanner.log.fetch.max-bytes | MemorySize | 16mb
| The maximum amount of data the server should
return for a fetch request from client. Records are fetched in batches, and if
the first record batch in the first non-empty bucket of the fetch is larger
than this value, the record batch will still be returned to ensure that the
fetch can make progress. As such, this is not a absolute maximum.
|
+| client.scanner.log.fetch.max-bytes-for-bucket | MemorySize | 1mb
| The maximum amount of data the server should
return for a table bucket in fetch request fom client. Records are fetched in
batches, and the max bytes size is config by this option.
|
+| client.scanner.log.fetch.min-bytes | MemorySize | 1b
| The minimum bytes expected for each fetch log
request from client to response. If not enough bytes, wait up to
client.scanner.log.fetch-wait-max-time time to return.
|
+| client.scanner.log.fetch.wait-max-time | Duration | 500ms
| The maximum time to wait for enough bytes to
be available for a fetch log request from client to response.
|
+| client.scanner.io.tmpdir | String |
System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used
by client for storing the data files (like kv snapshot, log segment files) to
read temporarily
|
+| client.scanner.remote-log.prefetch-num | Integer | 4
| The number of remote log segments to keep in
local temp file for LogScanner, which download from remote storage. The default
setting is 4.
|
Review Comment:
Clean up unrelated changes.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ conf,
+ metadataHelper,
+ coordinatorContext,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+ clock,
+ coordinatorMetricGroup);
+ }
+
+ @VisibleForTesting
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ ScheduledExecutorService scheduledExecutor,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this.metadataHelper = metadataHelper;
+ this.conf = conf;
+ this.scheduledExecutor = scheduledExecutor;
+ this.coordinatorContext = coordinatorContext;
+ this.clock = clock;
+ this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+ registerMetrics(coordinatorMetricGroup);
+ }
+
+ public void start() {
+ LOG.info("kv snapshot lease manager has been started.");
+ scheduledExecutor.scheduleWithFixedDelay(
+ this::expireLeases,
+ 0L,
+
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void initialize() throws Exception {
+ for (String leaseId : metadataHelper.getLeasesList()) {
+ Optional<KvSnapshotLease> kvSnapshotLeaseOpt =
metadataHelper.getLease(leaseId);
+ if (kvSnapshotLeaseOpt.isPresent()) {
+ KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+ this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+ this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+ initializeRefCount(kvSnapshotLease);
+
+
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+ }
+ }
+ }
+
+ public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket
kvSnapshotLeaseForBucket) {
+ return inReadLock(
+ refCountLock,
+ () ->
+ refCount.getOrDefault(kvSnapshotLeaseForBucket, new
AtomicInteger(0)).get()
+ <= 0);
+ }
+
+ /**
+ * Acquire kv snapshot lease.
+ *
+ * @param leaseId the lease id
+ * @param leaseDuration the lease duration
+ * @param tableIdToLeaseBucket the table id to lease bucket
+ * @return the map of unavailable snapshots that failed to be leased
+ */
+ public Map<TableBucket, Long> acquireLease(
+ String leaseId,
+ long leaseDuration,
+ Map<Long, List<KvSnapshotLeaseForBucket>> tableIdToLeaseBucket)
+ throws Exception {
+ ReadWriteLock lock = leaseLocks.computeIfAbsent(leaseId, k -> new
ReentrantReadWriteLock());
+ return inWriteLock(
+ lock,
+ () -> {
+ // To record the unavailable snapshots such as the kv
snapshotId to lease not
+ // exists.
+ Map<TableBucket, Long> unavailableSnapshots = new
HashMap<>();
+
+ boolean update = kvSnapshotLeaseMap.containsKey(leaseId);
+ KvSnapshotLease kvSnapshotLease;
+ long newLeaseDuration = clock.milliseconds() +
leaseDuration;
+ if (!update) {
+ // set the expiration time as: current time +
leaseDuration
+ kvSnapshotLease = new
KvSnapshotLease(newLeaseDuration);
+ kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+ LOG.info(
+ "kv snapshot lease '"
+ + leaseId
+ + "' has been acquired. The lease
expiration time is "
+ + kvSnapshotLease.getExpirationTime());
+ } else {
+ kvSnapshotLease = kvSnapshotLeaseMap.get(leaseId);
Review Comment:
Using containsKey() followed by get() is a bad pattern, as it entails
querying the map multiple times. Moreover, this approach is overly complex and
can be simplified using computeIfAbsent().
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ conf,
+ metadataHelper,
+ coordinatorContext,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+ clock,
+ coordinatorMetricGroup);
+ }
+
+ @VisibleForTesting
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ ScheduledExecutorService scheduledExecutor,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this.metadataHelper = metadataHelper;
+ this.conf = conf;
+ this.scheduledExecutor = scheduledExecutor;
+ this.coordinatorContext = coordinatorContext;
+ this.clock = clock;
+ this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+ registerMetrics(coordinatorMetricGroup);
+ }
+
+ public void start() {
+ LOG.info("kv snapshot lease manager has been started.");
+ scheduledExecutor.scheduleWithFixedDelay(
+ this::expireLeases,
+ 0L,
+
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void initialize() throws Exception {
+ for (String leaseId : metadataHelper.getLeasesList()) {
+ Optional<KvSnapshotLease> kvSnapshotLeaseOpt =
metadataHelper.getLease(leaseId);
+ if (kvSnapshotLeaseOpt.isPresent()) {
+ KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+ this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+ this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+ initializeRefCount(kvSnapshotLease);
+
+
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+ }
+ }
+ }
+
+ public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket
kvSnapshotLeaseForBucket) {
+ return inReadLock(
+ refCountLock,
+ () ->
+ refCount.getOrDefault(kvSnapshotLeaseForBucket, new
AtomicInteger(0)).get()
Review Comment:
new AtomicInteger() here is inefficient.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/LeaseContext.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.reader;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Context for lease. */
+public class LeaseContext implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // kv snapshot lease id. null for log table.
+ private final @Nullable String kvSnapshotLeaseId;
+
+ // kv snapshot lease duration. null for log table.
+ private final @Nullable Long kvSnapshotLeaseDurationMs;
+
+ public LeaseContext(
+ @Nullable String kvSnapshotLeaseId, @Nullable Long
kvSnapshotLeaseDurationMs) {
+ this.kvSnapshotLeaseId = kvSnapshotLeaseId;
+ this.kvSnapshotLeaseDurationMs = kvSnapshotLeaseDurationMs;
+ }
+
+ public @Nullable String getKvSnapshotLeaseId() {
+ return kvSnapshotLeaseId;
+ }
+
+ public @Nullable Long getKvSnapshotLeaseDurationMs() {
Review Comment:
Why allow nullable?
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/procedure/DropKvSnapshotLeaseProcedure.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.procedure;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/** Procedure to drop kv snapshot lease. */
+public class DropKvSnapshotLeaseProcedure extends ProcedureBase {
Review Comment:
I recommend to update the API name. Although it differs slightly from the
FIP, it should be fine with an explanatory comment.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotLeaseMetadataManager.java:
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.fs.FSDataInputStream;
+import org.apache.fluss.fs.FSDataOutputStream;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.BucketSnapshot;
+import org.apache.fluss.utils.FlussPaths;
+import org.apache.fluss.utils.IOUtils;
+import org.apache.fluss.utils.types.Tuple2;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+
+/**
+ * The manager to handle {@link KvSnapshotLease} to register/update/delete
metadata from zk and
+ * remote fs.
+ */
+public class KvSnapshotLeaseMetadataManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseMetadataManager.class);
+
+ private final ZooKeeperClient zkClient;
+ private final String remoteDataDir;
+
+ public KvSnapshotLeaseMetadataManager(ZooKeeperClient zkClient, String
remoteDataDir) {
+ this.zkClient = zkClient;
+ this.remoteDataDir = remoteDataDir;
+ }
+
+ public List<String> getLeasesList() throws Exception {
+ return zkClient.getKvSnapshotLeasesList();
+ }
+
+ /**
+ * Register a new kv snapshot lease to zk and remote fs.
+ *
+ * @param leaseId the lease id.
+ * @param lease the kv snapshot lease.
+ */
+ public void registerLease(String leaseId, KvSnapshotLease lease) throws
Exception {
+ Map<Long, FsPath> tableIdToRemoteMetadataFsPath =
generateMetadataFile(leaseId, lease);
+
+ // generate remote fsPath of metadata.
+ KvSnapshotLeaseMetadata leaseMetadata =
+ new KvSnapshotLeaseMetadata(
+ lease.getExpirationTime(),
tableIdToRemoteMetadataFsPath);
+
+ // register kv snapshot metadata to zk.
+ try {
+ zkClient.registerKvSnapshotLeaseMetadata(leaseId, leaseMetadata);
+ } catch (Exception e) {
+ LOG.warn("Failed to register kv snapshot lease metadata to zk.",
e);
+ leaseMetadata.discard();
+ throw e;
+ }
+ }
+
+ /**
+ * Update a kv snapshot lease to zk and remote fs.
+ *
+ * @param leaseId the lease id.
+ * @param kvSnapshotLease the kv snapshot lease.
+ */
+ public void updateLease(String leaseId, KvSnapshotLease kvSnapshotLease)
throws Exception {
+ // TODO change this to incremental update to avoid create too many
remote metadata files.
+
+ Optional<KvSnapshotLeaseMetadata> originalLeaseMetadata =
+ zkClient.getKvSnapshotLeaseMetadata(leaseId);
+
+ Map<Long, FsPath> tableIdToNewRemoteMetadataFsPath =
+ generateMetadataFile(leaseId, kvSnapshotLease);
+
+ // generate new kv snapshot lease metadata.
+ KvSnapshotLeaseMetadata newLeaseMetadata =
+ new KvSnapshotLeaseMetadata(
+ kvSnapshotLease.getExpirationTime(),
tableIdToNewRemoteMetadataFsPath);
+ // register new snapshot metadata to zk.
+ try {
+ zkClient.updateKvSnapshotLeaseMetadata(leaseId, newLeaseMetadata);
+ } catch (Exception e) {
+ LOG.warn("Failed to update kv snapshot lease metadata to zk.", e);
+ newLeaseMetadata.discard();
Review Comment:
Perhaps we should have a mechanism to scan and clean up orphaned lease files
during initialization.
##########
fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java:
##########
@@ -200,14 +208,21 @@ interface SubsumeAction {
void subsume(CompletedSnapshot snapshot) throws Exception;
}
- private static boolean canSubsume(CompletedSnapshot next,
CompletedSnapshot latest) {
+ /** A function to determine whether a snapshot can be subsumed. */
+ @FunctionalInterface
+ public interface CanSubsume {
Review Comment:
“CanSubsume” as a class name isn’t ideal—it reads more like a predicate
method (e.g., canSubsume()) than a noun that represents a type or entity. In
object-oriented design, class names should typically be nouns that clearly
convey what the class is or represents. (e.g., SubsumptionChecker).
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -531,22 +543,96 @@ private List<SourceSplitBase>
initPrimaryKeyTablePartitionSplits(
List<SourceSplitBase> splits = new ArrayList<>();
for (Partition partition : newPartitions) {
String partitionName = partition.getPartitionName();
- // get the table snapshot info
- final KvSnapshots kvSnapshots;
- try {
- kvSnapshots = flussAdmin.getLatestKvSnapshots(tablePath,
partitionName).get();
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- String.format(
- "Failed to get table snapshot for table %s and
partition %s",
- tablePath, partitionName),
- ExceptionUtils.stripCompletionException(e));
- }
- splits.addAll(getSnapshotAndLogSplits(kvSnapshots, partitionName));
+ splits.addAll(
+ getSnapshotAndLogSplits(
+ getLatestKvSnapshotsAndRegister(partitionName),
partitionName));
}
return splits;
}
+ private KvSnapshots getLatestKvSnapshotsAndRegister(@Nullable String
partitionName) {
+ long tableId;
+ Long partitionId;
+ Map<Integer, Long> snapshotIds = new HashMap<>();
+ Map<Integer, Long> logOffsets = new HashMap<>();
+
+ // retry to get the latest kv snapshots and acquire kvSnapshot lease
util all buckets
+ // acquire success. The reason is that getLatestKvSnapshots and
acquireKvSnapshotLease
+ // are not atomic operations, the latest kv snapshot obtained via get
may become outdated by
+ // the time it is passed to acquire. Therefore, this logic must
implement a retry
+ // mechanism: the unavailable tableBuckets in the
AcquiredKvSnapshotLeaseResult returned by
+ // acquireKvSnapshotLease must be retried repeatedly until all buckets
are successfully
+ // acquired.
+ try {
+ Set<TableBucket> remainingTableBuckets;
+ do {
+ KvSnapshots kvSnapshots = getLatestKvSnapshots(partitionName);
+ remainingTableBuckets = new
HashSet<>(kvSnapshots.getTableBuckets());
+
+ tableId = kvSnapshots.getTableId();
+ partitionId = kvSnapshots.getPartitionId();
+
+ Set<TableBucket> ignoreBuckets = new HashSet<>();
+ Map<TableBucket, Long> bucketsToLease = new HashMap<>();
+ for (TableBucket tb : remainingTableBuckets) {
+ int bucket = tb.getBucket();
+ OptionalLong snapshotIdOpt =
kvSnapshots.getSnapshotId(bucket);
+ OptionalLong logOffsetOpt =
kvSnapshots.getLogOffset(bucket);
+ if (snapshotIdOpt.isPresent() && !ignoreTableBucket(tb)) {
+ bucketsToLease.put(tb, snapshotIdOpt.getAsLong());
+ } else {
+ ignoreBuckets.add(tb);
+ }
+
+ snapshotIds.put(
+ bucket, snapshotIdOpt.isPresent() ?
snapshotIdOpt.getAsLong() : null);
+ logOffsets.put(
+ bucket, logOffsetOpt.isPresent() ?
logOffsetOpt.getAsLong() : null);
+ }
+
+ if (!ignoreBuckets.isEmpty()) {
+ remainingTableBuckets.removeAll(ignoreBuckets);
+ }
+
+ if (!bucketsToLease.isEmpty()) {
+ String kvSnapshotLeaseId =
leaseContext.getKvSnapshotLeaseId();
+ LOG.info(
+ "Try to acquire kv snapshot lease {} for table {}",
+ kvSnapshotLeaseId,
+ tablePath);
+ remainingTableBuckets =
+ flussAdmin
+ .acquireKvSnapshotLease(
+ kvSnapshotLeaseId,
+ bucketsToLease,
+
leaseContext.getKvSnapshotLeaseDurationMs())
+ .get()
+ .getUnavailableTableBucketSet();
+ if (!remainingTableBuckets.isEmpty()) {
+ LOG.info(
+ "Failed to acquire kv snapshot lease for table
{}: {}. Retry to re-acquire",
+ tablePath,
+ remainingTableBuckets);
+ }
+ }
+ } while (!remainingTableBuckets.isEmpty());
Review Comment:
Do not use infinite retry loops, and add a delay between requests to avoid
retry storms.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java:
##########
@@ -51,6 +57,11 @@ public class FlinkSourceReader<OUT>
extends SingleThreadMultiplexSourceReaderBaseAdapter<
RecordAndPos, OUT, SourceSplitBase, SourceSplitState> {
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkSourceReader.class);
+
+ /** the tableBuckets ignore to send FinishedKvSnapshotConsumeEvent as it
already sending. */
+ private final Set<TableBucket> ignoreBuckets;
Review Comment:
The variable name does not clearly indicate its purpose or usage.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
Review Comment:
The current state management design using Map<KvSnapshotLeaseForBucket,
AtomicInteger> refCount is inefficient. I strongly recommend using
Map<TableBucket, ConcurrentSet<String /* leaseId */>> instead, for two reasons:
① Every method that accesses this map in the code must instantiate a new
KvSnapshotLeaseForBucket object, creating unnecessary GC pressure.
② Maintaining the correctness of refCount is extremely difficult, which
severely degrades code readability and leads to code rot.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/lease/KvSnapshotTableLease.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data.lease;
+
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+
+/** The lease of kv snapshot for a table. */
+@NotThreadSafe
+public class KvSnapshotTableLease {
+ private final long tableId;
+ private final @Nullable Long[] bucketSnapshots;
+ private final Map<Long, Long[]> partitionSnapshots;
+
+ public KvSnapshotTableLease(long tableId) {
+ this(tableId, null, MapUtils.newConcurrentHashMap());
+ }
+
+ public KvSnapshotTableLease(long tableId, Long[] bucketSnapshots) {
+ this(tableId, bucketSnapshots, Collections.emptyMap());
+ }
+
+ public KvSnapshotTableLease(long tableId, Map<Long, Long[]>
partitionSnapshots) {
+ this(tableId, null, partitionSnapshots);
+ }
+
+ public KvSnapshotTableLease(
+ long tableId, @Nullable Long[] bucketSnapshots, Map<Long, Long[]>
partitionSnapshots) {
+ this.tableId = tableId;
+ this.bucketSnapshots = bucketSnapshots;
+ this.partitionSnapshots = partitionSnapshots;
+ }
+
+ public long getTableId() {
+ return tableId;
+ }
+
+ public @Nullable Long[] getBucketSnapshots() {
+ return bucketSnapshots;
+ }
+
+ public @Nullable Long[] getBucketSnapshots(long partitionId) {
+ return partitionSnapshots.get(partitionId);
+ }
+
+ public Map<Long, Long[]> getPartitionSnapshots() {
+ return partitionSnapshots;
+ }
+
+ public void addPartitionSnapshots(long partitionId, Long[] snapshots) {
+ if (bucketSnapshots != null) {
+ throw new IllegalStateException("This is an none partition table
lease.");
+ }
+ partitionSnapshots.put(partitionId, snapshots);
+ }
+
+ public int getLeasedSnapshotCount() {
+ int count = 0;
+ if (bucketSnapshots != null) {
+ for (Long snapshot : bucketSnapshots) {
+ if (snapshot != -1L) {
+ count++;
+ }
+ }
+ } else {
+ for (Long[] snapshots : partitionSnapshots.values()) {
+ for (Long snapshot : snapshots) {
+ if (snapshot != -1L) {
+ count++;
+ }
+ }
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public String toString() {
+ String partitionSnapshotsStr = formatLongArrayMap(partitionSnapshots);
+ return "KvSnapshotTableLease{"
+ + "tableId="
+ + tableId
+ + ", bucketSnapshots="
+ + Arrays.toString(bucketSnapshots)
+ + ", partitionSnapshots="
+ + partitionSnapshotsStr
+ + '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KvSnapshotTableLease that = (KvSnapshotTableLease) o;
+ return tableId == that.tableId
+ && Arrays.equals(bucketSnapshots, that.bucketSnapshots)
+ && deepEqualsMapOfArrays(partitionSnapshots,
that.partitionSnapshots);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(tableId);
+ result = 31 * result + Arrays.hashCode(bucketSnapshots);
+ result = 31 * result + deepHashCodeMapOfArrays(partitionSnapshots);
+ return result;
+ }
+
+ private static String formatLongArrayMap(Map<Long, Long[]> map) {
+ if (map == null) {
+ return "null";
+ }
+ StringBuilder sb = new StringBuilder("{");
+ boolean first = true;
+ for (Map.Entry<Long, Long[]> entry : map.entrySet()) {
+ if (!first) {
+ sb.append(", ");
+ }
+
sb.append(entry.getKey()).append("=").append(Arrays.toString(entry.getValue()));
+ first = false;
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+
+ private static boolean deepEqualsMapOfArrays(Map<Long, Long[]> map1,
Map<Long, Long[]> map2) {
+ if (map1 == map2) {
+ return true;
+ }
+ if (map1 == null || map2 == null || map1.size() != map2.size()) {
+ return false;
+ }
+
+ for (Map.Entry<Long, Long[]> entry : map1.entrySet()) {
+ Long key = entry.getKey();
+ Long[] value1 = entry.getValue();
+ Long[] value2 = map2.get(key);
+
+ if (value2 == null) {
+ return false;
+ }
+
+ if (!Arrays.equals(value1, value2)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static int deepHashCodeMapOfArrays(Map<Long, Long[]> map) {
+ if (map == null) {
+ return 0;
+ }
+ int hash = 0;
+ for (Map.Entry<Long, Long[]> entry : map.entrySet()) {
+ Long key = entry.getKey();
+ Long[] value = entry.getValue();
+ // Combine key hash and array content hash
+ hash += Objects.hashCode(key) ^ Arrays.hashCode(value);
Review Comment:
hash = 31 * hash + (Objects.hashCode(key) ^ Arrays.hashCode(value));
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ conf,
+ metadataHelper,
+ coordinatorContext,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+ clock,
+ coordinatorMetricGroup);
+ }
+
+ @VisibleForTesting
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ ScheduledExecutorService scheduledExecutor,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this.metadataHelper = metadataHelper;
+ this.conf = conf;
+ this.scheduledExecutor = scheduledExecutor;
+ this.coordinatorContext = coordinatorContext;
+ this.clock = clock;
+ this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+ registerMetrics(coordinatorMetricGroup);
+ }
+
+ public void start() {
+ LOG.info("kv snapshot lease manager has been started.");
+ scheduledExecutor.scheduleWithFixedDelay(
+ this::expireLeases,
+ 0L,
+
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void initialize() throws Exception {
+ for (String leaseId : metadataHelper.getLeasesList()) {
+ Optional<KvSnapshotLease> kvSnapshotLeaseOpt =
metadataHelper.getLease(leaseId);
Review Comment:
What I mean is, after a standby node is initialized, can it stay consistent
with the primary node? And if a failover occurs, can it correctly handle the
business logic?
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
+ private final Clock clock;
+ private final ScheduledExecutorService scheduledExecutor;
+ private final Configuration conf;
+
+ private final Map<String, ReadWriteLock> leaseLocks =
MapUtils.newConcurrentHashMap();
+ /** lease id to kv snapshot lease. */
+ @GuardedBy("leaseLocks")
+ private final Map<String, KvSnapshotLease> kvSnapshotLeaseMap;
+
+ private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+ /**
+ * KvSnapshotLeaseForBucket to the ref count, which means this table
bucket + snapshotId has
+ * been leased by how many lease id.
+ */
+ private final Map<KvSnapshotLeaseForBucket, AtomicInteger> refCount =
+ MapUtils.newConcurrentHashMap();
+
+ /** For metrics. */
+ private final AtomicInteger leasedBucketCount = new AtomicInteger(0);
+
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ conf,
+ metadataHelper,
+ coordinatorContext,
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("kv-snapshot-lease-cleaner")),
+ clock,
+ coordinatorMetricGroup);
+ }
+
+ @VisibleForTesting
+ public KvSnapshotLeaseManager(
+ Configuration conf,
+ KvSnapshotLeaseMetadataManager metadataHelper,
+ CoordinatorContext coordinatorContext,
+ ScheduledExecutorService scheduledExecutor,
+ Clock clock,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this.metadataHelper = metadataHelper;
+ this.conf = conf;
+ this.scheduledExecutor = scheduledExecutor;
+ this.coordinatorContext = coordinatorContext;
+ this.clock = clock;
+ this.kvSnapshotLeaseMap = MapUtils.newConcurrentHashMap();
+
+ registerMetrics(coordinatorMetricGroup);
+ }
+
+ public void start() {
+ LOG.info("kv snapshot lease manager has been started.");
+ scheduledExecutor.scheduleWithFixedDelay(
+ this::expireLeases,
+ 0L,
+
conf.get(ConfigOptions.KV_SNAPSHOT_LEASE_EXPIRATION_CHECK_INTERVAL).toMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void initialize() throws Exception {
+ for (String leaseId : metadataHelper.getLeasesList()) {
+ Optional<KvSnapshotLease> kvSnapshotLeaseOpt =
metadataHelper.getLease(leaseId);
+ if (kvSnapshotLeaseOpt.isPresent()) {
+ KvSnapshotLease kvSnapshotLease = kvSnapshotLeaseOpt.get();
+ this.leaseLocks.put(leaseId, new ReentrantReadWriteLock());
+ this.kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+
+ initializeRefCount(kvSnapshotLease);
+
+
leasedBucketCount.addAndGet(kvSnapshotLease.getLeasedSnapshotCount());
+ }
+ }
+ }
+
+ public boolean snapshotLeaseNotExist(KvSnapshotLeaseForBucket
kvSnapshotLeaseForBucket) {
+ return inReadLock(
+ refCountLock,
+ () ->
+ refCount.getOrDefault(kvSnapshotLeaseForBucket, new
AtomicInteger(0)).get()
+ <= 0);
+ }
+
+ /**
+ * Acquire kv snapshot lease.
+ *
+ * @param leaseId the lease id
+ * @param leaseDuration the lease duration
+ * @param tableIdToLeaseBucket the table id to lease bucket
+ * @return the map of unavailable snapshots that failed to be leased
+ */
+ public Map<TableBucket, Long> acquireLease(
+ String leaseId,
+ long leaseDuration,
+ Map<Long, List<KvSnapshotLeaseForBucket>> tableIdToLeaseBucket)
+ throws Exception {
+ ReadWriteLock lock = leaseLocks.computeIfAbsent(leaseId, k -> new
ReentrantReadWriteLock());
+ return inWriteLock(
+ lock,
+ () -> {
+ // To record the unavailable snapshots such as the kv
snapshotId to lease not
+ // exists.
+ Map<TableBucket, Long> unavailableSnapshots = new
HashMap<>();
+
+ boolean update = kvSnapshotLeaseMap.containsKey(leaseId);
+ KvSnapshotLease kvSnapshotLease;
+ long newLeaseDuration = clock.milliseconds() +
leaseDuration;
+ if (!update) {
+ // set the expiration time as: current time +
leaseDuration
+ kvSnapshotLease = new
KvSnapshotLease(newLeaseDuration);
+ kvSnapshotLeaseMap.put(leaseId, kvSnapshotLease);
+ LOG.info(
Review Comment:
Don't use string concatenation to print logs.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotLeaseManager.java:
##########
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.KvSnapshotLeaseForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLease;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadataManager;
+import org.apache.fluss.server.zk.data.lease.KvSnapshotTableLease;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot lease acquire, renew, release and drop. */
+@ThreadSafe
+public class KvSnapshotLeaseManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(KvSnapshotLeaseManager.class);
+ private static final AtomicInteger ZERO = new AtomicInteger(0);
+
+ private final KvSnapshotLeaseMetadataManager metadataHelper;
+ private final CoordinatorContext coordinatorContext;
Review Comment:
Although in practice we only access coordinatorContext within acquireLease,
and acquireLease is exclusively invoked by coordinatorProcessor, meaning there
is actually no thread-safety issue, this thread-safety guarantee is overly
implicit and obscure—indicating a bad code smell. Other maintainers of the
codebase could easily misunderstand or inadvertently modify the code, thereby
breaking the implicit safety assumptions. While refactoring and fixing this
properly might be costly, we can at least add comments to explicitly document
the thread-safety constraints here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]