This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e80b9927c9e Subscription: fix ConcurrentModificationException for 
`ConsumerGroupMeta` & fix the logic of `isTopicSubscribedByConsumerGroup` & 
avoid consumer from outputting too much content in string form (#14425)
e80b9927c9e is described below

commit e80b9927c9e0ca94e2a6b08ddedcf8d09f624c58
Author: VGalaxies <[email protected]>
AuthorDate: Sat Dec 14 15:52:54 2024 +0800

    Subscription: fix ConcurrentModificationException for `ConsumerGroupMeta` & 
fix the logic of `isTopicSubscribedByConsumerGroup` & avoid consumer from 
outputting too much content in string form (#14425)
---
 .../consumer/SubscriptionConsumer.java             |  8 +++--
 .../session/subscription/util/CollectionUtils.java | 31 ++++++++++++++++++++
 .../meta/consumer/ConsumerGroupMeta.java           | 34 +++++++++++++---------
 .../meta/consumer/ConsumerGroupMetaKeeper.java     |  7 ++---
 4 files changed, 60 insertions(+), 20 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 70738a5aece..f02ae3b2448 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseTy
 import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import org.apache.iotdb.session.subscription.util.CollectionUtils;
 import org.apache.iotdb.session.subscription.util.IdentifierUtils;
 import org.apache.iotdb.session.subscription.util.PollTimer;
 import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
@@ -1463,8 +1464,11 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     result.put("consumerGroupId", consumerGroupId);
     result.put("isClosed", isClosed.toString());
     result.put("fileSaveDir", fileSaveDir);
-    result.put("inFlightFilesCommitContextSet", 
inFlightFilesCommitContextSet.toString());
-    result.put("subscribedTopicNames", subscribedTopics.keySet().toString());
+    result.put(
+        "inFlightFilesCommitContextSet",
+        CollectionUtils.getLimitedString(inFlightFilesCommitContextSet, 32));
+    result.put(
+        "subscribedTopicNames", 
CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32));
     return result;
   }
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/CollectionUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/CollectionUtils.java
new file mode 100644
index 00000000000..5f2cfb5ba60
--- /dev/null
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/CollectionUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.util;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+public class CollectionUtils {
+
+  public static String getLimitedString(final Collection<?> collection, final 
int limit) {
+    return collection.stream().limit(limit).collect(Collectors.toList())
+        + (collection.size() > limit ? " ... (" + (collection.size() - limit) 
+ " more)" : "");
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index e902368cc2d..f7d4901884c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -30,29 +30,30 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class ConsumerGroupMeta {
 
   private String consumerGroupId;
   private long creationTime;
-  private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet = new 
HashMap<>();
-  private Map<String, ConsumerMeta> consumerIdToConsumerMeta = new HashMap<>();
+  private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet;
+  private Map<String, ConsumerMeta> consumerIdToConsumerMeta;
 
   public ConsumerGroupMeta() {
-    // Empty constructor
+    this.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
+    this.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
   }
 
   public ConsumerGroupMeta(
       final String consumerGroupId, final long creationTime, final 
ConsumerMeta firstConsumerMeta) {
+    this();
+
     this.consumerGroupId = consumerGroupId;
     this.creationTime = creationTime;
-    this.topicNameToSubscribedConsumerIdSet = new HashMap<>();
-    this.consumerIdToConsumerMeta = new HashMap<>();
 
     consumerIdToConsumerMeta.put(firstConsumerMeta.getConsumerId(), 
firstConsumerMeta);
   }
@@ -61,8 +62,9 @@ public class ConsumerGroupMeta {
     final ConsumerGroupMeta copied = new ConsumerGroupMeta();
     copied.consumerGroupId = consumerGroupId;
     copied.creationTime = creationTime;
-    copied.topicNameToSubscribedConsumerIdSet = new 
HashMap<>(topicNameToSubscribedConsumerIdSet);
-    copied.consumerIdToConsumerMeta = new HashMap<>(consumerIdToConsumerMeta);
+    copied.topicNameToSubscribedConsumerIdSet =
+        new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
+    copied.consumerIdToConsumerMeta = new 
ConcurrentHashMap<>(consumerIdToConsumerMeta);
     return copied;
   }
 
@@ -149,8 +151,12 @@ public class ConsumerGroupMeta {
     return topics;
   }
 
-  public Set<String> getTopicsSubscribedByConsumerGroup() {
-    return topicNameToSubscribedConsumerIdSet.keySet();
+  public boolean isTopicSubscribedByConsumerGroup(final String topic) {
+    final Set<String> subscribedConsumerIdSet = 
topicNameToSubscribedConsumerIdSet.get(topic);
+    if (Objects.isNull(subscribedConsumerIdSet)) {
+      return false;
+    }
+    return !subscribedConsumerIdSet.isEmpty();
   }
 
   public void addSubscription(final String consumerId, final Set<String> 
topics) {
@@ -228,7 +234,7 @@ public class ConsumerGroupMeta {
     consumerGroupMeta.consumerGroupId = 
ReadWriteIOUtils.readString(inputStream);
     consumerGroupMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);
 
-    consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new HashMap<>();
+    consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new 
ConcurrentHashMap<>();
     int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(inputStream);
@@ -242,7 +248,7 @@ public class ConsumerGroupMeta {
       consumerGroupMeta.topicNameToSubscribedConsumerIdSet.put(key, value);
     }
 
-    consumerGroupMeta.consumerIdToConsumerMeta = new HashMap<>();
+    consumerGroupMeta.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
     size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(inputStream);
@@ -259,7 +265,7 @@ public class ConsumerGroupMeta {
     consumerGroupMeta.consumerGroupId = 
ReadWriteIOUtils.readString(byteBuffer);
     consumerGroupMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
 
-    consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new HashMap<>();
+    consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new 
ConcurrentHashMap<>();
     int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(byteBuffer);
@@ -273,7 +279,7 @@ public class ConsumerGroupMeta {
       consumerGroupMeta.topicNameToSubscribedConsumerIdSet.put(key, value);
     }
 
-    consumerGroupMeta.consumerIdToConsumerMeta = new HashMap<>();
+    consumerGroupMeta.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
     size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final String key = ReadWriteIOUtils.readString(byteBuffer);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
index 84166834570..c2775b204a9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
@@ -119,7 +119,7 @@ public class ConsumerGroupMetaKeeper {
 
   public Set<String> getSubscribedConsumerGroupIds(final String topicName) {
     return consumerGroupIdToConsumerGroupMetaMap.entrySet().stream()
-        .filter(entry -> 
entry.getValue().getTopicsSubscribedByConsumerGroup().contains(topicName))
+        .filter(entry -> 
entry.getValue().isTopicSubscribedByConsumerGroup(topicName))
         .map(Entry::getKey)
         .collect(Collectors.toSet());
   }
@@ -129,13 +129,12 @@ public class ConsumerGroupMetaKeeper {
     return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId)
         && consumerGroupIdToConsumerGroupMetaMap
             .get(consumerGroupId)
-            .getTopicsSubscribedByConsumerGroup()
-            .contains(topicName);
+            .isTopicSubscribedByConsumerGroup(topicName);
   }
 
   public boolean isTopicSubscribedByConsumerGroup(final String topicName) {
     return consumerGroupIdToConsumerGroupMetaMap.values().stream()
-        .anyMatch(meta -> 
meta.getTopicsSubscribedByConsumerGroup().contains(topicName));
+        .anyMatch(meta -> meta.isTopicSubscribedByConsumerGroup(topicName));
   }
 
   /////////////////////////////////  Snapshot  
/////////////////////////////////

Reply via email to