Copilot commented on code in PR #2179:
URL: https://github.com/apache/fluss/pull/2179#discussion_r2621949912


##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot consumer register/unregister/clear. */
+@ThreadSafe
+public class KvSnapshotConsumerManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotConsumerManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> consumerLocks = 
MapUtils.newConcurrentHashMap();
+    /** Consumer id to Consumer. */
+    @GuardedBy("consumerLocks")
+    private final Map<String, KvSnapshotConsumer> consumers;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * ConsumeKvSnapshotForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been consumed by how many consumers.
+     */
+    @GuardedBy("refCountLock")
+    private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger consumedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                zkClient,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-consumer-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.zkClient = zkClient;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.consumers = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireConsumers,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        List<String> consumers = zkClient.getKvSnapshotConsumerList();
+        for (String consumer : consumers) {
+            Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt =
+                    zkClient.getKvSnapshotConsumer(consumer);
+            if (kvSnapshotConsumerOpt.isPresent()) {
+                KvSnapshotConsumer kvSnapshotConsumer = 
kvSnapshotConsumerOpt.get();
+                this.consumerLocks.put(consumer, new ReentrantReadWriteLock());
+                this.consumers.put(consumer, kvSnapshotConsumer);

Review Comment:
   This field access (publicly accessible via [this expression](1)) is not 
protected by any monitor, but the class is annotated as @ThreadSafe.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot consumer register/unregister/clear. */
+@ThreadSafe
+public class KvSnapshotConsumerManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotConsumerManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> consumerLocks = 
MapUtils.newConcurrentHashMap();
+    /** Consumer id to Consumer. */
+    @GuardedBy("consumerLocks")
+    private final Map<String, KvSnapshotConsumer> consumers;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * ConsumeKvSnapshotForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been consumed by how many consumers.
+     */
+    @GuardedBy("refCountLock")
+    private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger consumedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                zkClient,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-consumer-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.zkClient = zkClient;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.consumers = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireConsumers,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        List<String> consumers = zkClient.getKvSnapshotConsumerList();
+        for (String consumer : consumers) {
+            Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt =
+                    zkClient.getKvSnapshotConsumer(consumer);
+            if (kvSnapshotConsumerOpt.isPresent()) {
+                KvSnapshotConsumer kvSnapshotConsumer = 
kvSnapshotConsumerOpt.get();
+                this.consumerLocks.put(consumer, new ReentrantReadWriteLock());

Review Comment:
   This field access (publicly accessible via [this expression](1)) is not 
protected by any monitor, but the class is annotated as @ThreadSafe.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot consumer register/unregister/clear. */
+@ThreadSafe
+public class KvSnapshotConsumerManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotConsumerManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> consumerLocks = 
MapUtils.newConcurrentHashMap();
+    /** Consumer id to Consumer. */
+    @GuardedBy("consumerLocks")
+    private final Map<String, KvSnapshotConsumer> consumers;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * ConsumeKvSnapshotForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been consumed by how many consumers.
+     */
+    @GuardedBy("refCountLock")
+    private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger consumedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                zkClient,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-consumer-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.zkClient = zkClient;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.consumers = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireConsumers,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        List<String> consumers = zkClient.getKvSnapshotConsumerList();
+        for (String consumer : consumers) {
+            Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt =
+                    zkClient.getKvSnapshotConsumer(consumer);
+            if (kvSnapshotConsumerOpt.isPresent()) {
+                KvSnapshotConsumer kvSnapshotConsumer = 
kvSnapshotConsumerOpt.get();
+                this.consumerLocks.put(consumer, new ReentrantReadWriteLock());
+                this.consumers.put(consumer, kvSnapshotConsumer);
+
+                initializeRefCount(kvSnapshotConsumer);
+
+                
consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket 
consumeKvSnapshotForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        !refCount.containsKey(consumeKvSnapshotForBucket)
+                                || 
refCount.get(consumeKvSnapshotForBucket).get() <= 0);
+    }
+
+    public void register(
+            String consumerId,
+            long expirationTime,
+            Map<Long, List<ConsumeKvSnapshotForBucket>> 
tableIdToRegisterBucket)
+            throws Exception {
+        ReadWriteLock lock =
+                consumerLocks.computeIfAbsent(consumerId, k -> new 
ReentrantReadWriteLock());
+        inWriteLock(
+                lock,
+                () -> {
+                    boolean update = consumers.containsKey(consumerId);
+                    KvSnapshotConsumer consumer;
+                    if (!update) {
+                        // set the expiration time as: current time + 
expirationTime
+                        consumer = new KvSnapshotConsumer(clock.milliseconds() 
+ expirationTime);
+                        consumers.put(consumerId, consumer);
+                        LOG.info("kv snapshot consumer '" + consumerId + "' 
has been registered.");
+                    } else {
+                        consumer = consumers.get(consumerId);
+                    }
+
+                    for (Map.Entry<Long, List<ConsumeKvSnapshotForBucket>> 
entry :
+                            tableIdToRegisterBucket.entrySet()) {
+                        Long tableId = entry.getKey();
+                        TableInfo tableInfo = 
coordinatorContext.getTableInfoById(tableId);
+                        int numBuckets = tableInfo.getNumBuckets();
+                        List<ConsumeKvSnapshotForBucket> buckets = 
entry.getValue();
+                        for (ConsumeKvSnapshotForBucket bucket : buckets) {
+                            boolean isUpdate =
+                                    consumer.registerBucket(
+                                            bucket.getTableBucket(),
+                                            bucket.getKvSnapshotId(),
+                                            numBuckets);
+                            if (!isUpdate) {
+                                consumedBucketCount.getAndIncrement();
+                                inWriteLock(
+                                        refCountLock,
+                                        () -> {
+                                            refCount.computeIfAbsent(
+                                                            bucket, k -> new 
AtomicInteger(0))
+                                                    .getAndIncrement();
+                                        });
+                            }
+                        }
+                    }
+
+                    if (update) {
+                        zkClient.updateKvSnapshotConsumer(consumerId, 
consumer);
+                    } else {
+                        zkClient.registerKvSnapshotConsumer(consumerId, 
consumer);
+                    }
+                });
+    }
+
+    public void unregister(
+            String consumerId, Map<Long, List<TableBucket>> 
tableIdToUnregisterBucket)
+            throws Exception {
+        ReadWriteLock lock = consumerLocks.get(consumerId);

Review Comment:
   This field access (publicly accessible via [this expression](1)) is not 
protected by any monitor, but the class is annotated as @ThreadSafe.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot consumer register/unregister/clear. */
+@ThreadSafe
+public class KvSnapshotConsumerManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotConsumerManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> consumerLocks = 
MapUtils.newConcurrentHashMap();
+    /** Consumer id to Consumer. */
+    @GuardedBy("consumerLocks")
+    private final Map<String, KvSnapshotConsumer> consumers;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * ConsumeKvSnapshotForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been consumed by how many consumers.
+     */
+    @GuardedBy("refCountLock")
+    private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger consumedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                zkClient,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-consumer-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.zkClient = zkClient;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.consumers = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireConsumers,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        List<String> consumers = zkClient.getKvSnapshotConsumerList();
+        for (String consumer : consumers) {
+            Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt =
+                    zkClient.getKvSnapshotConsumer(consumer);
+            if (kvSnapshotConsumerOpt.isPresent()) {
+                KvSnapshotConsumer kvSnapshotConsumer = 
kvSnapshotConsumerOpt.get();
+                this.consumerLocks.put(consumer, new ReentrantReadWriteLock());
+                this.consumers.put(consumer, kvSnapshotConsumer);
+
+                initializeRefCount(kvSnapshotConsumer);
+
+                
consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket 
consumeKvSnapshotForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        !refCount.containsKey(consumeKvSnapshotForBucket)
+                                || 
refCount.get(consumeKvSnapshotForBucket).get() <= 0);
+    }
+
+    public void register(
+            String consumerId,
+            long expirationTime,
+            Map<Long, List<ConsumeKvSnapshotForBucket>> 
tableIdToRegisterBucket)
+            throws Exception {
+        ReadWriteLock lock =
+                consumerLocks.computeIfAbsent(consumerId, k -> new 
ReentrantReadWriteLock());

Review Comment:
   This field access (publicly accessible via [this expression](1)) is not 
protected by any monitor, but the class is annotated as @ThreadSafe.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Json serializer and deserializer for {@link KvSnapshotConsumer}. */
+public class KvSnapshotConsumerJsonSerde
+        implements JsonSerializer<KvSnapshotConsumer>, 
JsonDeserializer<KvSnapshotConsumer> {
+
+    public static final KvSnapshotConsumerJsonSerde INSTANCE = new 
KvSnapshotConsumerJsonSerde();
+
+    private static final String VERSION_KEY = "version";
+    private static final String EXPIRATION_TIME = "expiration_time";
+    private static final String TABLES = "tables";
+
+    private static final int VERSION = 1;
+
+    @Override
+    public void serialize(KvSnapshotConsumer kvSnapshotConsumer, JsonGenerator 
generator)
+            throws IOException {
+        generator.writeStartObject();
+        generator.writeNumberField(VERSION_KEY, VERSION);
+        generator.writeNumberField(EXPIRATION_TIME, 
kvSnapshotConsumer.getExpirationTime());
+
+        generator.writeFieldName(TABLES);
+        generator.writeStartObject();
+
+        // for none-partitioned table.
+        for (Map.Entry<Long, Long[]> entry :
+                kvSnapshotConsumer.getTableIdToSnapshots().entrySet()) {
+            generator.writeFieldName(entry.getKey().toString());
+            writeLongArray(generator, entry.getValue());
+        }
+
+        // for partitioned table.
+        Map<Long, Long[]> partitionToSortedSnapshotIds =
+                kvSnapshotConsumer.getPartitionIdToSnapshots();
+        for (Map.Entry<Long, Set<Long>> entry :
+                kvSnapshotConsumer.getTableIdToPartitions().entrySet()) {
+            Long tableId = entry.getKey();
+
+            generator.writeFieldName(tableId.toString());
+            generator.writeStartObject();
+
+            for (Long partitionId : entry.getValue()) {
+                Long[] snapshotIds = 
partitionToSortedSnapshotIds.get(partitionId);
+                if (snapshotIds != null) {
+                    generator.writeFieldName(partitionId.toString());
+                    writeLongArray(generator, snapshotIds);
+                }
+            }
+
+            generator.writeEndObject();
+        }
+        generator.writeEndObject(); // tables
+
+        generator.writeEndObject(); // root
+    }
+
+    @Override
+    public KvSnapshotConsumer deserialize(JsonNode node) {
+        long expirationTime = node.get(EXPIRATION_TIME).asLong();
+        JsonNode tablesNode = node.get(TABLES);
+
+        Map<Long, Long[]> tableIdToSnapshots = new HashMap<>();
+        Map<Long, Set<Long>> tableIdToPartitions = new HashMap<>();
+        Map<Long, Long[]> partitionIdToSnapshots = new HashMap<>();
+
+        ObjectNode tablesObj = (ObjectNode) tablesNode;
+        Iterator<Map.Entry<String, JsonNode>> tableFields = tablesObj.fields();
+        while (tableFields.hasNext()) {
+            Map.Entry<String, JsonNode> tableEntry = tableFields.next();
+            Long tableId = Long.parseLong(tableEntry.getKey());
+            JsonNode tableValue = tableEntry.getValue();
+
+            if (tableValue.isArray()) {
+                // Non-partitioned table, like: "1": [1, -1, 1, 2]
+                List<Long> snapshotIds = new ArrayList<>();
+                for (JsonNode elem : tableValue) {
+                    snapshotIds.add(elem.asLong());
+                }
+                tableIdToSnapshots.put(tableId, snapshotIds.toArray(new 
Long[0]));
+            } else if (tableValue.isObject()) {
+                // Partitioned table, like: "2": { "1001": [...], "1002": 
[...] }
+                Set<Long> partitions = new HashSet<>();
+                ObjectNode partitionObj = (ObjectNode) tableValue;
+                Iterator<Map.Entry<String, JsonNode>> partFields = 
partitionObj.fields();
+                while (partFields.hasNext()) {
+                    Map.Entry<String, JsonNode> partEntry = partFields.next();
+                    Long partitionId = Long.parseLong(partEntry.getKey());

Review Comment:
   The variable 'partitionId' is only assigned values of primitive type and is 
never 'null', but it is declared with the boxed type 'Long'.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot consumer register/unregister/clear. */
+@ThreadSafe
+public class KvSnapshotConsumerManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotConsumerManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> consumerLocks = 
MapUtils.newConcurrentHashMap();
+    /** Consumer id to Consumer. */
+    @GuardedBy("consumerLocks")
+    private final Map<String, KvSnapshotConsumer> consumers;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * ConsumeKvSnapshotForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been consumed by how many consumers.
+     */
+    @GuardedBy("refCountLock")
+    private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger consumedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                zkClient,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-consumer-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.zkClient = zkClient;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.consumers = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireConsumers,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        List<String> consumers = zkClient.getKvSnapshotConsumerList();
+        for (String consumer : consumers) {
+            Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt =
+                    zkClient.getKvSnapshotConsumer(consumer);
+            if (kvSnapshotConsumerOpt.isPresent()) {
+                KvSnapshotConsumer kvSnapshotConsumer = 
kvSnapshotConsumerOpt.get();
+                this.consumerLocks.put(consumer, new ReentrantReadWriteLock());
+                this.consumers.put(consumer, kvSnapshotConsumer);
+
+                initializeRefCount(kvSnapshotConsumer);
+
+                
consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket 
consumeKvSnapshotForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        !refCount.containsKey(consumeKvSnapshotForBucket)
+                                || 
refCount.get(consumeKvSnapshotForBucket).get() <= 0);
+    }
+
+    public void register(
+            String consumerId,
+            long expirationTime,
+            Map<Long, List<ConsumeKvSnapshotForBucket>> 
tableIdToRegisterBucket)
+            throws Exception {
+        ReadWriteLock lock =
+                consumerLocks.computeIfAbsent(consumerId, k -> new 
ReentrantReadWriteLock());
+        inWriteLock(
+                lock,
+                () -> {
+                    boolean update = consumers.containsKey(consumerId);
+                    KvSnapshotConsumer consumer;
+                    if (!update) {
+                        // set the expiration time as: current time + 
expirationTime
+                        consumer = new KvSnapshotConsumer(clock.milliseconds() 
+ expirationTime);
+                        consumers.put(consumerId, consumer);
+                        LOG.info("kv snapshot consumer '" + consumerId + "' 
has been registered.");
+                    } else {
+                        consumer = consumers.get(consumerId);
+                    }
+
+                    for (Map.Entry<Long, List<ConsumeKvSnapshotForBucket>> 
entry :
+                            tableIdToRegisterBucket.entrySet()) {
+                        Long tableId = entry.getKey();
+                        TableInfo tableInfo = 
coordinatorContext.getTableInfoById(tableId);
+                        int numBuckets = tableInfo.getNumBuckets();
+                        List<ConsumeKvSnapshotForBucket> buckets = 
entry.getValue();
+                        for (ConsumeKvSnapshotForBucket bucket : buckets) {
+                            boolean isUpdate =
+                                    consumer.registerBucket(
+                                            bucket.getTableBucket(),
+                                            bucket.getKvSnapshotId(),
+                                            numBuckets);
+                            if (!isUpdate) {
+                                consumedBucketCount.getAndIncrement();
+                                inWriteLock(
+                                        refCountLock,
+                                        () -> {
+                                            refCount.computeIfAbsent(
+                                                            bucket, k -> new 
AtomicInteger(0))
+                                                    .getAndIncrement();
+                                        });
+                            }
+                        }
+                    }
+
+                    if (update) {
+                        zkClient.updateKvSnapshotConsumer(consumerId, 
consumer);
+                    } else {
+                        zkClient.registerKvSnapshotConsumer(consumerId, 
consumer);
+                    }
+                });
+    }
+
+    public void unregister(
+            String consumerId, Map<Long, List<TableBucket>> 
tableIdToUnregisterBucket)
+            throws Exception {
+        ReadWriteLock lock = consumerLocks.get(consumerId);
+        if (lock == null) {
+            return;
+        }
+
+        inWriteLock(
+                lock,
+                () -> {
+                    KvSnapshotConsumer consumer = consumers.get(consumerId);
+
+                    if (consumer == null) {
+                        return;
+                    }
+
+                    for (Map.Entry<Long, List<TableBucket>> entry :
+                            tableIdToUnregisterBucket.entrySet()) {
+                        List<TableBucket> buckets = entry.getValue();
+                        for (TableBucket bucket : buckets) {
+                            long snapshotId = 
consumer.unregisterBucket(bucket);
+                            if (snapshotId != -1L) {
+                                consumedBucketCount.getAndDecrement();
+                                inWriteLock(
+                                        refCountLock,
+                                        () -> {
+                                            refCount.get(
+                                                            new 
ConsumeKvSnapshotForBucket(
+                                                                    bucket, 
snapshotId))
+                                                    .getAndDecrement();
+                                        });
+                            }
+                        }
+                    }
+
+                    if (consumer.isEmpty()) {
+                        clear(consumerId);
+                    } else {
+                        zkClient.updateKvSnapshotConsumer(consumerId, 
consumer);
+                    }
+                });
+    }
+
+    /**
+     * Clear kv snapshot consumer.
+     *
+     * @param consumerId the consumer id
+     * @return true if clear success, false if consumer not exist
+     */
+    public boolean clear(String consumerId) throws Exception {
+        ReadWriteLock lock = consumerLocks.get(consumerId);

Review Comment:
   This field access (publicly accessible via [this expression](1)) is not 
protected by any monitor, but the class is annotated as @ThreadSafe.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumer.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.utils.MapUtils;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/** The zkNode data of kv snapshot consumer. */
+@NotThreadSafe
+public class KvSnapshotConsumer {
+    private final long expirationTime;
+
+    /**
+     * mapping from table id to sorted snapshot ids for none-partitioned 
table, the value is a list
+     * of consumed snapshot ids sorted by bucket id.
+     */
+    private final Map<Long, Long[]> tableIdToSnapshots;
+
+    /** tableId to partition ids for partitioned table. */
+    private final Map<Long, Set<Long>> tableIdToPartitions;
+
+    /**
+     * mapping from partition id to sorted snapshot ids for partitioned table, 
the value is a list
+     * of consumed snapshot ids sorted by bucket id.
+     */
+    private final Map<Long, Long[]> partitionIdToSnapshots;
+
+    public KvSnapshotConsumer(long expirationTime) {
+        this(
+                expirationTime,
+                MapUtils.newConcurrentHashMap(),
+                MapUtils.newConcurrentHashMap(),
+                MapUtils.newConcurrentHashMap());
+    }
+
+    public KvSnapshotConsumer(
+            long expirationTime,
+            Map<Long, Long[]> tableIdToSnapshots,
+            Map<Long, Set<Long>> tableIdToPartitions,
+            Map<Long, Long[]> partitionIdToSnapshots) {
+        this.expirationTime = expirationTime;
+        this.tableIdToSnapshots = tableIdToSnapshots;
+        this.tableIdToPartitions = tableIdToPartitions;
+        this.partitionIdToSnapshots = partitionIdToSnapshots;
+    }
+
+    public long getExpirationTime() {
+        return expirationTime;
+    }
+
+    public Map<Long, Long[]> getTableIdToSnapshots() {
+        return tableIdToSnapshots;
+    }
+
+    public Map<Long, Set<Long>> getTableIdToPartitions() {
+        return tableIdToPartitions;
+    }
+
+    public Map<Long, Long[]> getPartitionIdToSnapshots() {
+        return partitionIdToSnapshots;
+    }
+
+    /**
+     * Register a bucket to the consumer.
+     *
+     * @param tableBucket table bucket
+     * @param snapshotId snapshot id
+     * @param bucketNum bucket number of this table or partition
+     * @return true if this operation is update, false if this operation is 
insert
+     */
+    public boolean registerBucket(TableBucket tableBucket, long snapshotId, 
int bucketNum) {
+        Long[] bucketIndex;
+        Long partitionId = tableBucket.getPartitionId();
+        long tableId = tableBucket.getTableId();
+        int bucketId = tableBucket.getBucket();
+        if (partitionId == null) {
+            // For none-partitioned table.
+            bucketIndex =
+                    tableIdToSnapshots.computeIfAbsent(
+                            tableId,
+                            k -> {
+                                Long[] array = new Long[bucketNum];
+                                Arrays.fill(array, -1L);
+                                return array;
+                            });
+        } else {
+            // For partitioned table.
+
+            // first add partition to table.
+            Set<Long> partitions =
+                    tableIdToPartitions.computeIfAbsent(tableId, k -> new 
HashSet<>());
+            partitions.add(partitionId);
+
+            // then add bucket to partition.
+            bucketIndex =
+                    partitionIdToSnapshots.computeIfAbsent(
+                            partitionId,
+                            k -> {
+                                Long[] array = new Long[bucketNum];
+                                Arrays.fill(array, -1L);
+                                return array;
+                            });
+        }
+
+        if (bucketIndex.length != bucketNum) {
+            throw new IllegalArgumentException(
+                    "The input bucket number is not equal to the bucket number 
of the table.");
+        }
+        boolean isUpdate = bucketIndex[bucketId] != -1L;
+        bucketIndex[bucketId] = snapshotId;
+        return isUpdate;
+    }
+
+    /**
+     * Unregister a bucket from the consumer.
+     *
+     * @param tableBucket table bucket
+     * @return the snapshot id of the unregistered bucket
+     */
+    public long unregisterBucket(TableBucket tableBucket) {
+        Long[] bucketIndex;
+        long tableId = tableBucket.getTableId();
+        Long partitionId = tableBucket.getPartitionId();
+        int bucketId = tableBucket.getBucket();
+        if (partitionId == null) {
+            // For none-partitioned table.
+            bucketIndex = tableIdToSnapshots.get(tableId);
+        } else {
+            // For partitioned table.
+            bucketIndex = partitionIdToSnapshots.get(partitionId);
+        }
+
+        Long snapshotId = -1L;
+        if (bucketIndex != null) {
+            snapshotId = bucketIndex[bucketId];
+            bucketIndex[bucketId] = -1L;
+
+            boolean needRemove = true;
+            for (Long bucket : bucketIndex) {
+                if (bucket != -1L) {
+                    needRemove = false;
+                    break;
+                }
+            }
+
+            if (needRemove) {
+                if (partitionId == null) {
+                    tableIdToSnapshots.remove(tableId);
+                } else {
+                    tableIdToPartitions.remove(tableId);
+                    partitionIdToSnapshots.remove(partitionId);
+                }
+            }
+        }
+        return snapshotId;
+    }
+
+    public boolean isEmpty() {
+        return tableIdToSnapshots.isEmpty()
+                && tableIdToPartitions.isEmpty()
+                && partitionIdToSnapshots.isEmpty();
+    }
+
+    public int getConsumedSnapshotCount() {
+        int count = 0;
+        for (Long[] buckets : tableIdToSnapshots.values()) {
+            for (Long bucket : buckets) {
+                if (bucket != -1L) {
+                    count++;
+                }
+            }
+        }
+
+        for (Long[] buckets : partitionIdToSnapshots.values()) {
+            for (Long bucket : buckets) {
+                if (bucket != -1L) {
+                    count++;
+                }
+            }
+        }
+        return count;
+    }
+
+    @Override
+    public String toString() {
+        String tableSnapshotsStr = formatLongArrayMap(tableIdToSnapshots);
+        String partitionSnapshotsStr = 
formatLongArrayMap(partitionIdToSnapshots);
+
+        return "KvSnapshotConsumer{"
+                + "expirationTime="
+                + expirationTime
+                + ", tableIdToSnapshots="
+                + tableSnapshotsStr
+                + ", tableIdToPartitions="
+                + tableIdToPartitions
+                + ", partitionIdToSnapshots="
+                + partitionSnapshotsStr
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   This 'equals()' method does not check argument type.



##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/FinishedKvSnapshotConsumeEvent.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.source.event;
+
+import org.apache.fluss.metadata.TableBucket;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import java.util.Objects;
+import java.util.Set;
+
+/** SourceEvent used to represent a Fluss table bucket has complete consume kv 
snapshot. */
+public class FinishedKvSnapshotConsumeEvent implements SourceEvent {

Review Comment:
   Class FinishedKvSnapshotConsumeEvent overrides [hashCode](1) but not equals.



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/** Json serializer and deserializer for {@link KvSnapshotConsumer}. */
+public class KvSnapshotConsumerJsonSerde
+        implements JsonSerializer<KvSnapshotConsumer>, 
JsonDeserializer<KvSnapshotConsumer> {
+
+    public static final KvSnapshotConsumerJsonSerde INSTANCE = new 
KvSnapshotConsumerJsonSerde();
+
+    private static final String VERSION_KEY = "version";
+    private static final String EXPIRATION_TIME = "expiration_time";
+    private static final String TABLES = "tables";
+
+    private static final int VERSION = 1;
+
+    @Override
+    public void serialize(KvSnapshotConsumer kvSnapshotConsumer, JsonGenerator 
generator)
+            throws IOException {
+        generator.writeStartObject();
+        generator.writeNumberField(VERSION_KEY, VERSION);
+        generator.writeNumberField(EXPIRATION_TIME, 
kvSnapshotConsumer.getExpirationTime());
+
+        generator.writeFieldName(TABLES);
+        generator.writeStartObject();
+
+        // for none-partitioned table.
+        for (Map.Entry<Long, Long[]> entry :
+                kvSnapshotConsumer.getTableIdToSnapshots().entrySet()) {
+            generator.writeFieldName(entry.getKey().toString());
+            writeLongArray(generator, entry.getValue());
+        }
+
+        // for partitioned table.
+        Map<Long, Long[]> partitionToSortedSnapshotIds =
+                kvSnapshotConsumer.getPartitionIdToSnapshots();
+        for (Map.Entry<Long, Set<Long>> entry :
+                kvSnapshotConsumer.getTableIdToPartitions().entrySet()) {
+            Long tableId = entry.getKey();
+
+            generator.writeFieldName(tableId.toString());
+            generator.writeStartObject();
+
+            for (Long partitionId : entry.getValue()) {
+                Long[] snapshotIds = 
partitionToSortedSnapshotIds.get(partitionId);
+                if (snapshotIds != null) {
+                    generator.writeFieldName(partitionId.toString());
+                    writeLongArray(generator, snapshotIds);
+                }
+            }
+
+            generator.writeEndObject();
+        }
+        generator.writeEndObject(); // tables
+
+        generator.writeEndObject(); // root
+    }
+
+    @Override
+    public KvSnapshotConsumer deserialize(JsonNode node) {
+        long expirationTime = node.get(EXPIRATION_TIME).asLong();
+        JsonNode tablesNode = node.get(TABLES);
+
+        Map<Long, Long[]> tableIdToSnapshots = new HashMap<>();
+        Map<Long, Set<Long>> tableIdToPartitions = new HashMap<>();
+        Map<Long, Long[]> partitionIdToSnapshots = new HashMap<>();
+
+        ObjectNode tablesObj = (ObjectNode) tablesNode;
+        Iterator<Map.Entry<String, JsonNode>> tableFields = tablesObj.fields();
+        while (tableFields.hasNext()) {
+            Map.Entry<String, JsonNode> tableEntry = tableFields.next();
+            Long tableId = Long.parseLong(tableEntry.getKey());

Review Comment:
   The variable 'tableId' is only assigned values of primitive type and is 
never 'null', but it is declared with the boxed type 'Long'.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java:
##########
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.KvSnapshotConsumer;
+import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.clock.Clock;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/** A manager to manage kv snapshot consumer register/unregister/clear. */
+@ThreadSafe
+public class KvSnapshotConsumerManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KvSnapshotConsumerManager.class);
+
+    private final ZooKeeperClient zkClient;
+    private final CoordinatorContext coordinatorContext;
+    private final Clock clock;
+    private final ScheduledExecutorService scheduledExecutor;
+    private final Configuration conf;
+
+    private final Map<String, ReadWriteLock> consumerLocks = 
MapUtils.newConcurrentHashMap();
+    /** Consumer id to Consumer. */
+    @GuardedBy("consumerLocks")
+    private final Map<String, KvSnapshotConsumer> consumers;
+
+    private final ReadWriteLock refCountLock = new ReentrantReadWriteLock();
+
+    /**
+     * ConsumeKvSnapshotForBucket to the ref count, which means this table 
bucket + snapshotId has
+     * been consumed by how many consumers.
+     */
+    @GuardedBy("refCountLock")
+    private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount =
+            MapUtils.newConcurrentHashMap();
+
+    /** For metrics. */
+    private final AtomicInteger consumedBucketCount = new AtomicInteger(0);
+
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this(
+                conf,
+                zkClient,
+                coordinatorContext,
+                Executors.newScheduledThreadPool(
+                        1, new 
ExecutorThreadFactory("kv-snapshot-consumer-cleaner")),
+                clock,
+                coordinatorMetricGroup);
+    }
+
+    @VisibleForTesting
+    public KvSnapshotConsumerManager(
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            CoordinatorContext coordinatorContext,
+            ScheduledExecutorService scheduledExecutor,
+            Clock clock,
+            CoordinatorMetricGroup coordinatorMetricGroup) {
+        this.zkClient = zkClient;
+        this.conf = conf;
+        this.scheduledExecutor = scheduledExecutor;
+        this.coordinatorContext = coordinatorContext;
+        this.clock = clock;
+        this.consumers = MapUtils.newConcurrentHashMap();
+
+        registerMetrics(coordinatorMetricGroup);
+    }
+
+    public void start() {
+        scheduledExecutor.scheduleWithFixedDelay(
+                this::expireConsumers,
+                0L,
+                
conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public void initialize() throws Exception {
+        List<String> consumers = zkClient.getKvSnapshotConsumerList();
+        for (String consumer : consumers) {
+            Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt =
+                    zkClient.getKvSnapshotConsumer(consumer);
+            if (kvSnapshotConsumerOpt.isPresent()) {
+                KvSnapshotConsumer kvSnapshotConsumer = 
kvSnapshotConsumerOpt.get();
+                this.consumerLocks.put(consumer, new ReentrantReadWriteLock());
+                this.consumers.put(consumer, kvSnapshotConsumer);
+
+                initializeRefCount(kvSnapshotConsumer);
+
+                
consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount());
+            }
+        }
+    }
+
+    public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket 
consumeKvSnapshotForBucket) {
+        return inReadLock(
+                refCountLock,
+                () ->
+                        !refCount.containsKey(consumeKvSnapshotForBucket)
+                                || 
refCount.get(consumeKvSnapshotForBucket).get() <= 0);
+    }
+
+    public void register(
+            String consumerId,
+            long expirationTime,
+            Map<Long, List<ConsumeKvSnapshotForBucket>> 
tableIdToRegisterBucket)
+            throws Exception {
+        ReadWriteLock lock =
+                consumerLocks.computeIfAbsent(consumerId, k -> new 
ReentrantReadWriteLock());
+        inWriteLock(
+                lock,
+                () -> {
+                    boolean update = consumers.containsKey(consumerId);
+                    KvSnapshotConsumer consumer;
+                    if (!update) {
+                        // set the expiration time as: current time + 
expirationTime
+                        consumer = new KvSnapshotConsumer(clock.milliseconds() 
+ expirationTime);
+                        consumers.put(consumerId, consumer);
+                        LOG.info("kv snapshot consumer '" + consumerId + "' 
has been registered.");
+                    } else {
+                        consumer = consumers.get(consumerId);
+                    }
+
+                    for (Map.Entry<Long, List<ConsumeKvSnapshotForBucket>> 
entry :
+                            tableIdToRegisterBucket.entrySet()) {
+                        Long tableId = entry.getKey();
+                        TableInfo tableInfo = 
coordinatorContext.getTableInfoById(tableId);
+                        int numBuckets = tableInfo.getNumBuckets();
+                        List<ConsumeKvSnapshotForBucket> buckets = 
entry.getValue();
+                        for (ConsumeKvSnapshotForBucket bucket : buckets) {
+                            boolean isUpdate =
+                                    consumer.registerBucket(
+                                            bucket.getTableBucket(),
+                                            bucket.getKvSnapshotId(),
+                                            numBuckets);
+                            if (!isUpdate) {
+                                consumedBucketCount.getAndIncrement();
+                                inWriteLock(
+                                        refCountLock,
+                                        () -> {
+                                            refCount.computeIfAbsent(
+                                                            bucket, k -> new 
AtomicInteger(0))
+                                                    .getAndIncrement();
+                                        });
+                            }
+                        }
+                    }
+
+                    if (update) {
+                        zkClient.updateKvSnapshotConsumer(consumerId, 
consumer);
+                    } else {
+                        zkClient.registerKvSnapshotConsumer(consumerId, 
consumer);
+                    }
+                });
+    }
+
+    public void unregister(
+            String consumerId, Map<Long, List<TableBucket>> 
tableIdToUnregisterBucket)
+            throws Exception {
+        ReadWriteLock lock = consumerLocks.get(consumerId);
+        if (lock == null) {
+            return;
+        }
+
+        inWriteLock(
+                lock,
+                () -> {
+                    KvSnapshotConsumer consumer = consumers.get(consumerId);
+
+                    if (consumer == null) {
+                        return;
+                    }
+
+                    for (Map.Entry<Long, List<TableBucket>> entry :
+                            tableIdToUnregisterBucket.entrySet()) {
+                        List<TableBucket> buckets = entry.getValue();
+                        for (TableBucket bucket : buckets) {
+                            long snapshotId = 
consumer.unregisterBucket(bucket);
+                            if (snapshotId != -1L) {
+                                consumedBucketCount.getAndDecrement();
+                                inWriteLock(
+                                        refCountLock,
+                                        () -> {
+                                            refCount.get(
+                                                            new 
ConsumeKvSnapshotForBucket(
+                                                                    bucket, 
snapshotId))
+                                                    .getAndDecrement();
+                                        });
+                            }
+                        }
+                    }
+
+                    if (consumer.isEmpty()) {
+                        clear(consumerId);
+                    } else {
+                        zkClient.updateKvSnapshotConsumer(consumerId, 
consumer);
+                    }
+                });
+    }
+
+    /**
+     * Clear kv snapshot consumer.
+     *
+     * @param consumerId the consumer id
+     * @return true if clear success, false if consumer not exist
+     */
+    public boolean clear(String consumerId) throws Exception {
+        ReadWriteLock lock = consumerLocks.get(consumerId);
+        if (lock == null) {
+            return false;
+        }
+
+        boolean exist =
+                inWriteLock(
+                        lock,
+                        () -> {
+                            KvSnapshotConsumer kvSnapshotConsumer = 
consumers.remove(consumerId);
+                            if (kvSnapshotConsumer == null) {
+                                return false;
+                            }
+
+                            clearRefCount(kvSnapshotConsumer);
+                            zkClient.deleteKvSnapshotConsumer(consumerId);
+
+                            LOG.info("kv snapshot consumer '" + consumerId + 
"' has been cleared.");
+                            return true;
+                        });
+
+        consumerLocks.remove(consumerId);

Review Comment:
   This field access (publicly accessible via [this expression](1)) is not 
protected by any monitor, but the class is annotated as @ThreadSafe.



##########
fluss-common/src/main/java/org/apache/fluss/metadata/ConsumeKvSnapshotForBucket.java:
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import java.util.Objects;
+
+/** An entity for consume kv snapshot for bucket. */
+public class ConsumeKvSnapshotForBucket {
+    private final TableBucket tableBucket;
+    private final long kvSnapshotId;
+
+    public ConsumeKvSnapshotForBucket(TableBucket tableBucket, long 
kvSnapshotId) {
+        this.tableBucket = tableBucket;
+        this.kvSnapshotId = kvSnapshotId;
+    }
+
+    public TableBucket getTableBucket() {
+        return tableBucket;
+    }
+
+    public long getKvSnapshotId() {
+        return kvSnapshotId;
+    }
+
+    @Override
+    public String toString() {
+        return "ConsumeKvSnapshotForBucket{"
+                + "tableBucket="
+                + tableBucket
+                + ", kvSnapshotId="
+                + kvSnapshotId
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   This 'equals()' method does not check argument type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to