Copilot commented on code in PR #2179: URL: https://github.com/apache/fluss/pull/2179#discussion_r2621949912
########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.KvSnapshotConsumer; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** A manager to manage kv snapshot consumer register/unregister/clear. */ +@ThreadSafe +public class KvSnapshotConsumerManager { + private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotConsumerManager.class); + + private final ZooKeeperClient zkClient; + private final CoordinatorContext coordinatorContext; + private final Clock clock; + private final ScheduledExecutorService scheduledExecutor; + private final Configuration conf; + + private final Map<String, ReadWriteLock> consumerLocks = MapUtils.newConcurrentHashMap(); + /** Consumer id to Consumer. */ + @GuardedBy("consumerLocks") + private final Map<String, KvSnapshotConsumer> consumers; + + private final ReadWriteLock refCountLock = new ReentrantReadWriteLock(); + + /** + * ConsumeKvSnapshotForBucket to the ref count, which means this table bucket + snapshotId has + * been consumed by how many consumers. + */ + @GuardedBy("refCountLock") + private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount = + MapUtils.newConcurrentHashMap(); + + /** For metrics. */ + private final AtomicInteger consumedBucketCount = new AtomicInteger(0); + + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this( + conf, + zkClient, + coordinatorContext, + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("kv-snapshot-consumer-cleaner")), + clock, + coordinatorMetricGroup); + } + + @VisibleForTesting + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + ScheduledExecutorService scheduledExecutor, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this.zkClient = zkClient; + this.conf = conf; + this.scheduledExecutor = scheduledExecutor; + this.coordinatorContext = coordinatorContext; + this.clock = clock; + this.consumers = MapUtils.newConcurrentHashMap(); + + registerMetrics(coordinatorMetricGroup); + } + + public void start() { + scheduledExecutor.scheduleWithFixedDelay( + this::expireConsumers, + 0L, + conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(), + TimeUnit.MILLISECONDS); + } + + public void initialize() throws Exception { + List<String> consumers = zkClient.getKvSnapshotConsumerList(); + for (String consumer : consumers) { + Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt = + zkClient.getKvSnapshotConsumer(consumer); + if (kvSnapshotConsumerOpt.isPresent()) { + KvSnapshotConsumer kvSnapshotConsumer = kvSnapshotConsumerOpt.get(); + this.consumerLocks.put(consumer, new ReentrantReadWriteLock()); + this.consumers.put(consumer, kvSnapshotConsumer); Review Comment: This field access (publicly accessible via [this expression](1)) is not protected by any monitor, but the class is annotated as @ThreadSafe. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.KvSnapshotConsumer; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** A manager to manage kv snapshot consumer register/unregister/clear. */ +@ThreadSafe +public class KvSnapshotConsumerManager { + private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotConsumerManager.class); + + private final ZooKeeperClient zkClient; + private final CoordinatorContext coordinatorContext; + private final Clock clock; + private final ScheduledExecutorService scheduledExecutor; + private final Configuration conf; + + private final Map<String, ReadWriteLock> consumerLocks = MapUtils.newConcurrentHashMap(); + /** Consumer id to Consumer. */ + @GuardedBy("consumerLocks") + private final Map<String, KvSnapshotConsumer> consumers; + + private final ReadWriteLock refCountLock = new ReentrantReadWriteLock(); + + /** + * ConsumeKvSnapshotForBucket to the ref count, which means this table bucket + snapshotId has + * been consumed by how many consumers. + */ + @GuardedBy("refCountLock") + private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount = + MapUtils.newConcurrentHashMap(); + + /** For metrics. */ + private final AtomicInteger consumedBucketCount = new AtomicInteger(0); + + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this( + conf, + zkClient, + coordinatorContext, + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("kv-snapshot-consumer-cleaner")), + clock, + coordinatorMetricGroup); + } + + @VisibleForTesting + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + ScheduledExecutorService scheduledExecutor, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this.zkClient = zkClient; + this.conf = conf; + this.scheduledExecutor = scheduledExecutor; + this.coordinatorContext = coordinatorContext; + this.clock = clock; + this.consumers = MapUtils.newConcurrentHashMap(); + + registerMetrics(coordinatorMetricGroup); + } + + public void start() { + scheduledExecutor.scheduleWithFixedDelay( + this::expireConsumers, + 0L, + conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(), + TimeUnit.MILLISECONDS); + } + + public void initialize() throws Exception { + List<String> consumers = zkClient.getKvSnapshotConsumerList(); + for (String consumer : consumers) { + Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt = + zkClient.getKvSnapshotConsumer(consumer); + if (kvSnapshotConsumerOpt.isPresent()) { + KvSnapshotConsumer kvSnapshotConsumer = kvSnapshotConsumerOpt.get(); + this.consumerLocks.put(consumer, new ReentrantReadWriteLock()); Review Comment: This field access (publicly accessible via [this expression](1)) is not protected by any monitor, but the class is annotated as @ThreadSafe. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.KvSnapshotConsumer; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** A manager to manage kv snapshot consumer register/unregister/clear. */ +@ThreadSafe +public class KvSnapshotConsumerManager { + private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotConsumerManager.class); + + private final ZooKeeperClient zkClient; + private final CoordinatorContext coordinatorContext; + private final Clock clock; + private final ScheduledExecutorService scheduledExecutor; + private final Configuration conf; + + private final Map<String, ReadWriteLock> consumerLocks = MapUtils.newConcurrentHashMap(); + /** Consumer id to Consumer. */ + @GuardedBy("consumerLocks") + private final Map<String, KvSnapshotConsumer> consumers; + + private final ReadWriteLock refCountLock = new ReentrantReadWriteLock(); + + /** + * ConsumeKvSnapshotForBucket to the ref count, which means this table bucket + snapshotId has + * been consumed by how many consumers. + */ + @GuardedBy("refCountLock") + private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount = + MapUtils.newConcurrentHashMap(); + + /** For metrics. */ + private final AtomicInteger consumedBucketCount = new AtomicInteger(0); + + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this( + conf, + zkClient, + coordinatorContext, + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("kv-snapshot-consumer-cleaner")), + clock, + coordinatorMetricGroup); + } + + @VisibleForTesting + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + ScheduledExecutorService scheduledExecutor, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this.zkClient = zkClient; + this.conf = conf; + this.scheduledExecutor = scheduledExecutor; + this.coordinatorContext = coordinatorContext; + this.clock = clock; + this.consumers = MapUtils.newConcurrentHashMap(); + + registerMetrics(coordinatorMetricGroup); + } + + public void start() { + scheduledExecutor.scheduleWithFixedDelay( + this::expireConsumers, + 0L, + conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(), + TimeUnit.MILLISECONDS); + } + + public void initialize() throws Exception { + List<String> consumers = zkClient.getKvSnapshotConsumerList(); + for (String consumer : consumers) { + Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt = + zkClient.getKvSnapshotConsumer(consumer); + if (kvSnapshotConsumerOpt.isPresent()) { + KvSnapshotConsumer kvSnapshotConsumer = kvSnapshotConsumerOpt.get(); + this.consumerLocks.put(consumer, new ReentrantReadWriteLock()); + this.consumers.put(consumer, kvSnapshotConsumer); + + initializeRefCount(kvSnapshotConsumer); + + consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount()); + } + } + } + + public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket consumeKvSnapshotForBucket) { + return inReadLock( + refCountLock, + () -> + !refCount.containsKey(consumeKvSnapshotForBucket) + || refCount.get(consumeKvSnapshotForBucket).get() <= 0); + } + + public void register( + String consumerId, + long expirationTime, + Map<Long, List<ConsumeKvSnapshotForBucket>> tableIdToRegisterBucket) + throws Exception { + ReadWriteLock lock = + consumerLocks.computeIfAbsent(consumerId, k -> new ReentrantReadWriteLock()); + inWriteLock( + lock, + () -> { + boolean update = consumers.containsKey(consumerId); + KvSnapshotConsumer consumer; + if (!update) { + // set the expiration time as: current time + expirationTime + consumer = new KvSnapshotConsumer(clock.milliseconds() + expirationTime); + consumers.put(consumerId, consumer); + LOG.info("kv snapshot consumer '" + consumerId + "' has been registered."); + } else { + consumer = consumers.get(consumerId); + } + + for (Map.Entry<Long, List<ConsumeKvSnapshotForBucket>> entry : + tableIdToRegisterBucket.entrySet()) { + Long tableId = entry.getKey(); + TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId); + int numBuckets = tableInfo.getNumBuckets(); + List<ConsumeKvSnapshotForBucket> buckets = entry.getValue(); + for (ConsumeKvSnapshotForBucket bucket : buckets) { + boolean isUpdate = + consumer.registerBucket( + bucket.getTableBucket(), + bucket.getKvSnapshotId(), + numBuckets); + if (!isUpdate) { + consumedBucketCount.getAndIncrement(); + inWriteLock( + refCountLock, + () -> { + refCount.computeIfAbsent( + bucket, k -> new AtomicInteger(0)) + .getAndIncrement(); + }); + } + } + } + + if (update) { + zkClient.updateKvSnapshotConsumer(consumerId, consumer); + } else { + zkClient.registerKvSnapshotConsumer(consumerId, consumer); + } + }); + } + + public void unregister( + String consumerId, Map<Long, List<TableBucket>> tableIdToUnregisterBucket) + throws Exception { + ReadWriteLock lock = consumerLocks.get(consumerId); Review Comment: This field access (publicly accessible via [this expression](1)) is not protected by any monitor, but the class is annotated as @ThreadSafe. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.KvSnapshotConsumer; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** A manager to manage kv snapshot consumer register/unregister/clear. */ +@ThreadSafe +public class KvSnapshotConsumerManager { + private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotConsumerManager.class); + + private final ZooKeeperClient zkClient; + private final CoordinatorContext coordinatorContext; + private final Clock clock; + private final ScheduledExecutorService scheduledExecutor; + private final Configuration conf; + + private final Map<String, ReadWriteLock> consumerLocks = MapUtils.newConcurrentHashMap(); + /** Consumer id to Consumer. */ + @GuardedBy("consumerLocks") + private final Map<String, KvSnapshotConsumer> consumers; + + private final ReadWriteLock refCountLock = new ReentrantReadWriteLock(); + + /** + * ConsumeKvSnapshotForBucket to the ref count, which means this table bucket + snapshotId has + * been consumed by how many consumers. + */ + @GuardedBy("refCountLock") + private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount = + MapUtils.newConcurrentHashMap(); + + /** For metrics. */ + private final AtomicInteger consumedBucketCount = new AtomicInteger(0); + + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this( + conf, + zkClient, + coordinatorContext, + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("kv-snapshot-consumer-cleaner")), + clock, + coordinatorMetricGroup); + } + + @VisibleForTesting + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + ScheduledExecutorService scheduledExecutor, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this.zkClient = zkClient; + this.conf = conf; + this.scheduledExecutor = scheduledExecutor; + this.coordinatorContext = coordinatorContext; + this.clock = clock; + this.consumers = MapUtils.newConcurrentHashMap(); + + registerMetrics(coordinatorMetricGroup); + } + + public void start() { + scheduledExecutor.scheduleWithFixedDelay( + this::expireConsumers, + 0L, + conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(), + TimeUnit.MILLISECONDS); + } + + public void initialize() throws Exception { + List<String> consumers = zkClient.getKvSnapshotConsumerList(); + for (String consumer : consumers) { + Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt = + zkClient.getKvSnapshotConsumer(consumer); + if (kvSnapshotConsumerOpt.isPresent()) { + KvSnapshotConsumer kvSnapshotConsumer = kvSnapshotConsumerOpt.get(); + this.consumerLocks.put(consumer, new ReentrantReadWriteLock()); + this.consumers.put(consumer, kvSnapshotConsumer); + + initializeRefCount(kvSnapshotConsumer); + + consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount()); + } + } + } + + public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket consumeKvSnapshotForBucket) { + return inReadLock( + refCountLock, + () -> + !refCount.containsKey(consumeKvSnapshotForBucket) + || refCount.get(consumeKvSnapshotForBucket).get() <= 0); + } + + public void register( + String consumerId, + long expirationTime, + Map<Long, List<ConsumeKvSnapshotForBucket>> tableIdToRegisterBucket) + throws Exception { + ReadWriteLock lock = + consumerLocks.computeIfAbsent(consumerId, k -> new ReentrantReadWriteLock()); Review Comment: This field access (publicly accessible via [this expression](1)) is not protected by any monitor, but the class is annotated as @ThreadSafe. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** Json serializer and deserializer for {@link KvSnapshotConsumer}. */ +public class KvSnapshotConsumerJsonSerde + implements JsonSerializer<KvSnapshotConsumer>, JsonDeserializer<KvSnapshotConsumer> { + + public static final KvSnapshotConsumerJsonSerde INSTANCE = new KvSnapshotConsumerJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String EXPIRATION_TIME = "expiration_time"; + private static final String TABLES = "tables"; + + private static final int VERSION = 1; + + @Override + public void serialize(KvSnapshotConsumer kvSnapshotConsumer, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeNumberField(EXPIRATION_TIME, kvSnapshotConsumer.getExpirationTime()); + + generator.writeFieldName(TABLES); + generator.writeStartObject(); + + // for none-partitioned table. + for (Map.Entry<Long, Long[]> entry : + kvSnapshotConsumer.getTableIdToSnapshots().entrySet()) { + generator.writeFieldName(entry.getKey().toString()); + writeLongArray(generator, entry.getValue()); + } + + // for partitioned table. + Map<Long, Long[]> partitionToSortedSnapshotIds = + kvSnapshotConsumer.getPartitionIdToSnapshots(); + for (Map.Entry<Long, Set<Long>> entry : + kvSnapshotConsumer.getTableIdToPartitions().entrySet()) { + Long tableId = entry.getKey(); + + generator.writeFieldName(tableId.toString()); + generator.writeStartObject(); + + for (Long partitionId : entry.getValue()) { + Long[] snapshotIds = partitionToSortedSnapshotIds.get(partitionId); + if (snapshotIds != null) { + generator.writeFieldName(partitionId.toString()); + writeLongArray(generator, snapshotIds); + } + } + + generator.writeEndObject(); + } + generator.writeEndObject(); // tables + + generator.writeEndObject(); // root + } + + @Override + public KvSnapshotConsumer deserialize(JsonNode node) { + long expirationTime = node.get(EXPIRATION_TIME).asLong(); + JsonNode tablesNode = node.get(TABLES); + + Map<Long, Long[]> tableIdToSnapshots = new HashMap<>(); + Map<Long, Set<Long>> tableIdToPartitions = new HashMap<>(); + Map<Long, Long[]> partitionIdToSnapshots = new HashMap<>(); + + ObjectNode tablesObj = (ObjectNode) tablesNode; + Iterator<Map.Entry<String, JsonNode>> tableFields = tablesObj.fields(); + while (tableFields.hasNext()) { + Map.Entry<String, JsonNode> tableEntry = tableFields.next(); + Long tableId = Long.parseLong(tableEntry.getKey()); + JsonNode tableValue = tableEntry.getValue(); + + if (tableValue.isArray()) { + // Non-partitioned table, like: "1": [1, -1, 1, 2] + List<Long> snapshotIds = new ArrayList<>(); + for (JsonNode elem : tableValue) { + snapshotIds.add(elem.asLong()); + } + tableIdToSnapshots.put(tableId, snapshotIds.toArray(new Long[0])); + } else if (tableValue.isObject()) { + // Partitioned table, like: "2": { "1001": [...], "1002": [...] } + Set<Long> partitions = new HashSet<>(); + ObjectNode partitionObj = (ObjectNode) tableValue; + Iterator<Map.Entry<String, JsonNode>> partFields = partitionObj.fields(); + while (partFields.hasNext()) { + Map.Entry<String, JsonNode> partEntry = partFields.next(); + Long partitionId = Long.parseLong(partEntry.getKey()); Review Comment: The variable 'partitionId' is only assigned values of primitive type and is never 'null', but it is declared with the boxed type 'Long'. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.KvSnapshotConsumer; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** A manager to manage kv snapshot consumer register/unregister/clear. */ +@ThreadSafe +public class KvSnapshotConsumerManager { + private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotConsumerManager.class); + + private final ZooKeeperClient zkClient; + private final CoordinatorContext coordinatorContext; + private final Clock clock; + private final ScheduledExecutorService scheduledExecutor; + private final Configuration conf; + + private final Map<String, ReadWriteLock> consumerLocks = MapUtils.newConcurrentHashMap(); + /** Consumer id to Consumer. */ + @GuardedBy("consumerLocks") + private final Map<String, KvSnapshotConsumer> consumers; + + private final ReadWriteLock refCountLock = new ReentrantReadWriteLock(); + + /** + * ConsumeKvSnapshotForBucket to the ref count, which means this table bucket + snapshotId has + * been consumed by how many consumers. + */ + @GuardedBy("refCountLock") + private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount = + MapUtils.newConcurrentHashMap(); + + /** For metrics. */ + private final AtomicInteger consumedBucketCount = new AtomicInteger(0); + + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this( + conf, + zkClient, + coordinatorContext, + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("kv-snapshot-consumer-cleaner")), + clock, + coordinatorMetricGroup); + } + + @VisibleForTesting + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + ScheduledExecutorService scheduledExecutor, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this.zkClient = zkClient; + this.conf = conf; + this.scheduledExecutor = scheduledExecutor; + this.coordinatorContext = coordinatorContext; + this.clock = clock; + this.consumers = MapUtils.newConcurrentHashMap(); + + registerMetrics(coordinatorMetricGroup); + } + + public void start() { + scheduledExecutor.scheduleWithFixedDelay( + this::expireConsumers, + 0L, + conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(), + TimeUnit.MILLISECONDS); + } + + public void initialize() throws Exception { + List<String> consumers = zkClient.getKvSnapshotConsumerList(); + for (String consumer : consumers) { + Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt = + zkClient.getKvSnapshotConsumer(consumer); + if (kvSnapshotConsumerOpt.isPresent()) { + KvSnapshotConsumer kvSnapshotConsumer = kvSnapshotConsumerOpt.get(); + this.consumerLocks.put(consumer, new ReentrantReadWriteLock()); + this.consumers.put(consumer, kvSnapshotConsumer); + + initializeRefCount(kvSnapshotConsumer); + + consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount()); + } + } + } + + public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket consumeKvSnapshotForBucket) { + return inReadLock( + refCountLock, + () -> + !refCount.containsKey(consumeKvSnapshotForBucket) + || refCount.get(consumeKvSnapshotForBucket).get() <= 0); + } + + public void register( + String consumerId, + long expirationTime, + Map<Long, List<ConsumeKvSnapshotForBucket>> tableIdToRegisterBucket) + throws Exception { + ReadWriteLock lock = + consumerLocks.computeIfAbsent(consumerId, k -> new ReentrantReadWriteLock()); + inWriteLock( + lock, + () -> { + boolean update = consumers.containsKey(consumerId); + KvSnapshotConsumer consumer; + if (!update) { + // set the expiration time as: current time + expirationTime + consumer = new KvSnapshotConsumer(clock.milliseconds() + expirationTime); + consumers.put(consumerId, consumer); + LOG.info("kv snapshot consumer '" + consumerId + "' has been registered."); + } else { + consumer = consumers.get(consumerId); + } + + for (Map.Entry<Long, List<ConsumeKvSnapshotForBucket>> entry : + tableIdToRegisterBucket.entrySet()) { + Long tableId = entry.getKey(); + TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId); + int numBuckets = tableInfo.getNumBuckets(); + List<ConsumeKvSnapshotForBucket> buckets = entry.getValue(); + for (ConsumeKvSnapshotForBucket bucket : buckets) { + boolean isUpdate = + consumer.registerBucket( + bucket.getTableBucket(), + bucket.getKvSnapshotId(), + numBuckets); + if (!isUpdate) { + consumedBucketCount.getAndIncrement(); + inWriteLock( + refCountLock, + () -> { + refCount.computeIfAbsent( + bucket, k -> new AtomicInteger(0)) + .getAndIncrement(); + }); + } + } + } + + if (update) { + zkClient.updateKvSnapshotConsumer(consumerId, consumer); + } else { + zkClient.registerKvSnapshotConsumer(consumerId, consumer); + } + }); + } + + public void unregister( + String consumerId, Map<Long, List<TableBucket>> tableIdToUnregisterBucket) + throws Exception { + ReadWriteLock lock = consumerLocks.get(consumerId); + if (lock == null) { + return; + } + + inWriteLock( + lock, + () -> { + KvSnapshotConsumer consumer = consumers.get(consumerId); + + if (consumer == null) { + return; + } + + for (Map.Entry<Long, List<TableBucket>> entry : + tableIdToUnregisterBucket.entrySet()) { + List<TableBucket> buckets = entry.getValue(); + for (TableBucket bucket : buckets) { + long snapshotId = consumer.unregisterBucket(bucket); + if (snapshotId != -1L) { + consumedBucketCount.getAndDecrement(); + inWriteLock( + refCountLock, + () -> { + refCount.get( + new ConsumeKvSnapshotForBucket( + bucket, snapshotId)) + .getAndDecrement(); + }); + } + } + } + + if (consumer.isEmpty()) { + clear(consumerId); + } else { + zkClient.updateKvSnapshotConsumer(consumerId, consumer); + } + }); + } + + /** + * Clear kv snapshot consumer. + * + * @param consumerId the consumer id + * @return true if clear success, false if consumer not exist + */ + public boolean clear(String consumerId) throws Exception { + ReadWriteLock lock = consumerLocks.get(consumerId); Review Comment: This field access (publicly accessible via [this expression](1)) is not protected by any monitor, but the class is annotated as @ThreadSafe. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumer.java: ########## @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.utils.MapUtils; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** The zkNode data of kv snapshot consumer. */ +@NotThreadSafe +public class KvSnapshotConsumer { + private final long expirationTime; + + /** + * mapping from table id to sorted snapshot ids for none-partitioned table, the value is a list + * of consumed snapshot ids sorted by bucket id. + */ + private final Map<Long, Long[]> tableIdToSnapshots; + + /** tableId to partition ids for partitioned table. */ + private final Map<Long, Set<Long>> tableIdToPartitions; + + /** + * mapping from partition id to sorted snapshot ids for partitioned table, the value is a list + * of consumed snapshot ids sorted by bucket id. + */ + private final Map<Long, Long[]> partitionIdToSnapshots; + + public KvSnapshotConsumer(long expirationTime) { + this( + expirationTime, + MapUtils.newConcurrentHashMap(), + MapUtils.newConcurrentHashMap(), + MapUtils.newConcurrentHashMap()); + } + + public KvSnapshotConsumer( + long expirationTime, + Map<Long, Long[]> tableIdToSnapshots, + Map<Long, Set<Long>> tableIdToPartitions, + Map<Long, Long[]> partitionIdToSnapshots) { + this.expirationTime = expirationTime; + this.tableIdToSnapshots = tableIdToSnapshots; + this.tableIdToPartitions = tableIdToPartitions; + this.partitionIdToSnapshots = partitionIdToSnapshots; + } + + public long getExpirationTime() { + return expirationTime; + } + + public Map<Long, Long[]> getTableIdToSnapshots() { + return tableIdToSnapshots; + } + + public Map<Long, Set<Long>> getTableIdToPartitions() { + return tableIdToPartitions; + } + + public Map<Long, Long[]> getPartitionIdToSnapshots() { + return partitionIdToSnapshots; + } + + /** + * Register a bucket to the consumer. + * + * @param tableBucket table bucket + * @param snapshotId snapshot id + * @param bucketNum bucket number of this table or partition + * @return true if this operation is update, false if this operation is insert + */ + public boolean registerBucket(TableBucket tableBucket, long snapshotId, int bucketNum) { + Long[] bucketIndex; + Long partitionId = tableBucket.getPartitionId(); + long tableId = tableBucket.getTableId(); + int bucketId = tableBucket.getBucket(); + if (partitionId == null) { + // For none-partitioned table. + bucketIndex = + tableIdToSnapshots.computeIfAbsent( + tableId, + k -> { + Long[] array = new Long[bucketNum]; + Arrays.fill(array, -1L); + return array; + }); + } else { + // For partitioned table. + + // first add partition to table. + Set<Long> partitions = + tableIdToPartitions.computeIfAbsent(tableId, k -> new HashSet<>()); + partitions.add(partitionId); + + // then add bucket to partition. + bucketIndex = + partitionIdToSnapshots.computeIfAbsent( + partitionId, + k -> { + Long[] array = new Long[bucketNum]; + Arrays.fill(array, -1L); + return array; + }); + } + + if (bucketIndex.length != bucketNum) { + throw new IllegalArgumentException( + "The input bucket number is not equal to the bucket number of the table."); + } + boolean isUpdate = bucketIndex[bucketId] != -1L; + bucketIndex[bucketId] = snapshotId; + return isUpdate; + } + + /** + * Unregister a bucket from the consumer. + * + * @param tableBucket table bucket + * @return the snapshot id of the unregistered bucket + */ + public long unregisterBucket(TableBucket tableBucket) { + Long[] bucketIndex; + long tableId = tableBucket.getTableId(); + Long partitionId = tableBucket.getPartitionId(); + int bucketId = tableBucket.getBucket(); + if (partitionId == null) { + // For none-partitioned table. + bucketIndex = tableIdToSnapshots.get(tableId); + } else { + // For partitioned table. + bucketIndex = partitionIdToSnapshots.get(partitionId); + } + + Long snapshotId = -1L; + if (bucketIndex != null) { + snapshotId = bucketIndex[bucketId]; + bucketIndex[bucketId] = -1L; + + boolean needRemove = true; + for (Long bucket : bucketIndex) { + if (bucket != -1L) { + needRemove = false; + break; + } + } + + if (needRemove) { + if (partitionId == null) { + tableIdToSnapshots.remove(tableId); + } else { + tableIdToPartitions.remove(tableId); + partitionIdToSnapshots.remove(partitionId); + } + } + } + return snapshotId; + } + + public boolean isEmpty() { + return tableIdToSnapshots.isEmpty() + && tableIdToPartitions.isEmpty() + && partitionIdToSnapshots.isEmpty(); + } + + public int getConsumedSnapshotCount() { + int count = 0; + for (Long[] buckets : tableIdToSnapshots.values()) { + for (Long bucket : buckets) { + if (bucket != -1L) { + count++; + } + } + } + + for (Long[] buckets : partitionIdToSnapshots.values()) { + for (Long bucket : buckets) { + if (bucket != -1L) { + count++; + } + } + } + return count; + } + + @Override + public String toString() { + String tableSnapshotsStr = formatLongArrayMap(tableIdToSnapshots); + String partitionSnapshotsStr = formatLongArrayMap(partitionIdToSnapshots); + + return "KvSnapshotConsumer{" + + "expirationTime=" + + expirationTime + + ", tableIdToSnapshots=" + + tableSnapshotsStr + + ", tableIdToPartitions=" + + tableIdToPartitions + + ", partitionIdToSnapshots=" + + partitionSnapshotsStr + + '}'; + } + + @Override + public boolean equals(Object o) { Review Comment: This 'equals()' method does not check argument type. ########## fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/event/FinishedKvSnapshotConsumeEvent.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.flink.source.event; + +import org.apache.fluss.metadata.TableBucket; + +import org.apache.flink.api.connector.source.SourceEvent; + +import java.util.Objects; +import java.util.Set; + +/** SourceEvent used to represent a Fluss table bucket has complete consume kv snapshot. */ +public class FinishedKvSnapshotConsumeEvent implements SourceEvent { Review Comment: Class FinishedKvSnapshotConsumeEvent overrides [hashCode](1) but not equals. ########## fluss-server/src/main/java/org/apache/fluss/server/zk/data/KvSnapshotConsumerJsonSerde.java: ########## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** Json serializer and deserializer for {@link KvSnapshotConsumer}. */ +public class KvSnapshotConsumerJsonSerde + implements JsonSerializer<KvSnapshotConsumer>, JsonDeserializer<KvSnapshotConsumer> { + + public static final KvSnapshotConsumerJsonSerde INSTANCE = new KvSnapshotConsumerJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String EXPIRATION_TIME = "expiration_time"; + private static final String TABLES = "tables"; + + private static final int VERSION = 1; + + @Override + public void serialize(KvSnapshotConsumer kvSnapshotConsumer, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeNumberField(EXPIRATION_TIME, kvSnapshotConsumer.getExpirationTime()); + + generator.writeFieldName(TABLES); + generator.writeStartObject(); + + // for none-partitioned table. + for (Map.Entry<Long, Long[]> entry : + kvSnapshotConsumer.getTableIdToSnapshots().entrySet()) { + generator.writeFieldName(entry.getKey().toString()); + writeLongArray(generator, entry.getValue()); + } + + // for partitioned table. + Map<Long, Long[]> partitionToSortedSnapshotIds = + kvSnapshotConsumer.getPartitionIdToSnapshots(); + for (Map.Entry<Long, Set<Long>> entry : + kvSnapshotConsumer.getTableIdToPartitions().entrySet()) { + Long tableId = entry.getKey(); + + generator.writeFieldName(tableId.toString()); + generator.writeStartObject(); + + for (Long partitionId : entry.getValue()) { + Long[] snapshotIds = partitionToSortedSnapshotIds.get(partitionId); + if (snapshotIds != null) { + generator.writeFieldName(partitionId.toString()); + writeLongArray(generator, snapshotIds); + } + } + + generator.writeEndObject(); + } + generator.writeEndObject(); // tables + + generator.writeEndObject(); // root + } + + @Override + public KvSnapshotConsumer deserialize(JsonNode node) { + long expirationTime = node.get(EXPIRATION_TIME).asLong(); + JsonNode tablesNode = node.get(TABLES); + + Map<Long, Long[]> tableIdToSnapshots = new HashMap<>(); + Map<Long, Set<Long>> tableIdToPartitions = new HashMap<>(); + Map<Long, Long[]> partitionIdToSnapshots = new HashMap<>(); + + ObjectNode tablesObj = (ObjectNode) tablesNode; + Iterator<Map.Entry<String, JsonNode>> tableFields = tablesObj.fields(); + while (tableFields.hasNext()) { + Map.Entry<String, JsonNode> tableEntry = tableFields.next(); + Long tableId = Long.parseLong(tableEntry.getKey()); Review Comment: The variable 'tableId' is only assigned values of primitive type and is never 'null', but it is declared with the boxed type 'Long'. ########## fluss-server/src/main/java/org/apache/fluss/server/coordinator/KvSnapshotConsumerManager.java: ########## @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.metadata.ConsumeKvSnapshotForBucket; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableInfo; +import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.KvSnapshotConsumer; +import org.apache.fluss.utils.MapUtils; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.concurrent.ExecutorThreadFactory; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** A manager to manage kv snapshot consumer register/unregister/clear. */ +@ThreadSafe +public class KvSnapshotConsumerManager { + private static final Logger LOG = LoggerFactory.getLogger(KvSnapshotConsumerManager.class); + + private final ZooKeeperClient zkClient; + private final CoordinatorContext coordinatorContext; + private final Clock clock; + private final ScheduledExecutorService scheduledExecutor; + private final Configuration conf; + + private final Map<String, ReadWriteLock> consumerLocks = MapUtils.newConcurrentHashMap(); + /** Consumer id to Consumer. */ + @GuardedBy("consumerLocks") + private final Map<String, KvSnapshotConsumer> consumers; + + private final ReadWriteLock refCountLock = new ReentrantReadWriteLock(); + + /** + * ConsumeKvSnapshotForBucket to the ref count, which means this table bucket + snapshotId has + * been consumed by how many consumers. + */ + @GuardedBy("refCountLock") + private final Map<ConsumeKvSnapshotForBucket, AtomicInteger> refCount = + MapUtils.newConcurrentHashMap(); + + /** For metrics. */ + private final AtomicInteger consumedBucketCount = new AtomicInteger(0); + + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this( + conf, + zkClient, + coordinatorContext, + Executors.newScheduledThreadPool( + 1, new ExecutorThreadFactory("kv-snapshot-consumer-cleaner")), + clock, + coordinatorMetricGroup); + } + + @VisibleForTesting + public KvSnapshotConsumerManager( + Configuration conf, + ZooKeeperClient zkClient, + CoordinatorContext coordinatorContext, + ScheduledExecutorService scheduledExecutor, + Clock clock, + CoordinatorMetricGroup coordinatorMetricGroup) { + this.zkClient = zkClient; + this.conf = conf; + this.scheduledExecutor = scheduledExecutor; + this.coordinatorContext = coordinatorContext; + this.clock = clock; + this.consumers = MapUtils.newConcurrentHashMap(); + + registerMetrics(coordinatorMetricGroup); + } + + public void start() { + scheduledExecutor.scheduleWithFixedDelay( + this::expireConsumers, + 0L, + conf.get(ConfigOptions.KV_SNAPSHOT_CONSUMER_EXPIRATION_CHECK_INTERVAL).toMillis(), + TimeUnit.MILLISECONDS); + } + + public void initialize() throws Exception { + List<String> consumers = zkClient.getKvSnapshotConsumerList(); + for (String consumer : consumers) { + Optional<KvSnapshotConsumer> kvSnapshotConsumerOpt = + zkClient.getKvSnapshotConsumer(consumer); + if (kvSnapshotConsumerOpt.isPresent()) { + KvSnapshotConsumer kvSnapshotConsumer = kvSnapshotConsumerOpt.get(); + this.consumerLocks.put(consumer, new ReentrantReadWriteLock()); + this.consumers.put(consumer, kvSnapshotConsumer); + + initializeRefCount(kvSnapshotConsumer); + + consumedBucketCount.getAndAdd(kvSnapshotConsumer.getConsumedSnapshotCount()); + } + } + } + + public boolean snapshotConsumerNotExist(ConsumeKvSnapshotForBucket consumeKvSnapshotForBucket) { + return inReadLock( + refCountLock, + () -> + !refCount.containsKey(consumeKvSnapshotForBucket) + || refCount.get(consumeKvSnapshotForBucket).get() <= 0); + } + + public void register( + String consumerId, + long expirationTime, + Map<Long, List<ConsumeKvSnapshotForBucket>> tableIdToRegisterBucket) + throws Exception { + ReadWriteLock lock = + consumerLocks.computeIfAbsent(consumerId, k -> new ReentrantReadWriteLock()); + inWriteLock( + lock, + () -> { + boolean update = consumers.containsKey(consumerId); + KvSnapshotConsumer consumer; + if (!update) { + // set the expiration time as: current time + expirationTime + consumer = new KvSnapshotConsumer(clock.milliseconds() + expirationTime); + consumers.put(consumerId, consumer); + LOG.info("kv snapshot consumer '" + consumerId + "' has been registered."); + } else { + consumer = consumers.get(consumerId); + } + + for (Map.Entry<Long, List<ConsumeKvSnapshotForBucket>> entry : + tableIdToRegisterBucket.entrySet()) { + Long tableId = entry.getKey(); + TableInfo tableInfo = coordinatorContext.getTableInfoById(tableId); + int numBuckets = tableInfo.getNumBuckets(); + List<ConsumeKvSnapshotForBucket> buckets = entry.getValue(); + for (ConsumeKvSnapshotForBucket bucket : buckets) { + boolean isUpdate = + consumer.registerBucket( + bucket.getTableBucket(), + bucket.getKvSnapshotId(), + numBuckets); + if (!isUpdate) { + consumedBucketCount.getAndIncrement(); + inWriteLock( + refCountLock, + () -> { + refCount.computeIfAbsent( + bucket, k -> new AtomicInteger(0)) + .getAndIncrement(); + }); + } + } + } + + if (update) { + zkClient.updateKvSnapshotConsumer(consumerId, consumer); + } else { + zkClient.registerKvSnapshotConsumer(consumerId, consumer); + } + }); + } + + public void unregister( + String consumerId, Map<Long, List<TableBucket>> tableIdToUnregisterBucket) + throws Exception { + ReadWriteLock lock = consumerLocks.get(consumerId); + if (lock == null) { + return; + } + + inWriteLock( + lock, + () -> { + KvSnapshotConsumer consumer = consumers.get(consumerId); + + if (consumer == null) { + return; + } + + for (Map.Entry<Long, List<TableBucket>> entry : + tableIdToUnregisterBucket.entrySet()) { + List<TableBucket> buckets = entry.getValue(); + for (TableBucket bucket : buckets) { + long snapshotId = consumer.unregisterBucket(bucket); + if (snapshotId != -1L) { + consumedBucketCount.getAndDecrement(); + inWriteLock( + refCountLock, + () -> { + refCount.get( + new ConsumeKvSnapshotForBucket( + bucket, snapshotId)) + .getAndDecrement(); + }); + } + } + } + + if (consumer.isEmpty()) { + clear(consumerId); + } else { + zkClient.updateKvSnapshotConsumer(consumerId, consumer); + } + }); + } + + /** + * Clear kv snapshot consumer. + * + * @param consumerId the consumer id + * @return true if clear success, false if consumer not exist + */ + public boolean clear(String consumerId) throws Exception { + ReadWriteLock lock = consumerLocks.get(consumerId); + if (lock == null) { + return false; + } + + boolean exist = + inWriteLock( + lock, + () -> { + KvSnapshotConsumer kvSnapshotConsumer = consumers.remove(consumerId); + if (kvSnapshotConsumer == null) { + return false; + } + + clearRefCount(kvSnapshotConsumer); + zkClient.deleteKvSnapshotConsumer(consumerId); + + LOG.info("kv snapshot consumer '" + consumerId + "' has been cleared."); + return true; + }); + + consumerLocks.remove(consumerId); Review Comment: This field access (publicly accessible via [this expression](1)) is not protected by any monitor, but the class is annotated as @ThreadSafe. ########## fluss-common/src/main/java/org/apache/fluss/metadata/ConsumeKvSnapshotForBucket.java: ########## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.metadata; + +import java.util.Objects; + +/** An entity for consume kv snapshot for bucket. */ +public class ConsumeKvSnapshotForBucket { + private final TableBucket tableBucket; + private final long kvSnapshotId; + + public ConsumeKvSnapshotForBucket(TableBucket tableBucket, long kvSnapshotId) { + this.tableBucket = tableBucket; + this.kvSnapshotId = kvSnapshotId; + } + + public TableBucket getTableBucket() { + return tableBucket; + } + + public long getKvSnapshotId() { + return kvSnapshotId; + } + + @Override + public String toString() { + return "ConsumeKvSnapshotForBucket{" + + "tableBucket=" + + tableBucket + + ", kvSnapshotId=" + + kvSnapshotId + + '}'; + } + + @Override + public boolean equals(Object o) { Review Comment: This 'equals()' method does not check argument type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
