This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e80b9927c9e Subscription: fix ConcurrentModificationException for
`ConsumerGroupMeta` & fix the logic of `isTopicSubscribedByConsumerGroup` &
avoid consumer from outputting too much content in string form (#14425)
e80b9927c9e is described below
commit e80b9927c9e0ca94e2a6b08ddedcf8d09f624c58
Author: VGalaxies <[email protected]>
AuthorDate: Sat Dec 14 15:52:54 2024 +0800
Subscription: fix ConcurrentModificationException for `ConsumerGroupMeta` &
fix the logic of `isTopicSubscribedByConsumerGroup` & avoid consumer from
outputting too much content in string form (#14425)
---
.../consumer/SubscriptionConsumer.java | 8 +++--
.../session/subscription/util/CollectionUtils.java | 31 ++++++++++++++++++++
.../meta/consumer/ConsumerGroupMeta.java | 34 +++++++++++++---------
.../meta/consumer/ConsumerGroupMetaKeeper.java | 7 ++---
4 files changed, 60 insertions(+), 20 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 70738a5aece..f02ae3b2448 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseTy
import org.apache.iotdb.rpc.subscription.payload.poll.TabletsPayload;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
+import org.apache.iotdb.session.subscription.util.CollectionUtils;
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
import org.apache.iotdb.session.subscription.util.PollTimer;
import org.apache.iotdb.session.subscription.util.RandomStringGenerator;
@@ -1463,8 +1464,11 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
result.put("consumerGroupId", consumerGroupId);
result.put("isClosed", isClosed.toString());
result.put("fileSaveDir", fileSaveDir);
- result.put("inFlightFilesCommitContextSet",
inFlightFilesCommitContextSet.toString());
- result.put("subscribedTopicNames", subscribedTopics.keySet().toString());
+ result.put(
+ "inFlightFilesCommitContextSet",
+ CollectionUtils.getLimitedString(inFlightFilesCommitContextSet, 32));
+ result.put(
+ "subscribedTopicNames",
CollectionUtils.getLimitedString(subscribedTopics.keySet(), 32));
return result;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/CollectionUtils.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/CollectionUtils.java
new file mode 100644
index 00000000000..5f2cfb5ba60
--- /dev/null
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/util/CollectionUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription.util;
+
+import java.util.Collection;
+import java.util.stream.Collectors;
+
+public class CollectionUtils {
+
+ public static String getLimitedString(final Collection<?> collection, final
int limit) {
+ return collection.stream().limit(limit).collect(Collectors.toList())
+ + (collection.size() > limit ? " ... (" + (collection.size() - limit)
+ " more)" : "");
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index e902368cc2d..f7d4901884c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -30,29 +30,30 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
public class ConsumerGroupMeta {
private String consumerGroupId;
private long creationTime;
- private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet = new
HashMap<>();
- private Map<String, ConsumerMeta> consumerIdToConsumerMeta = new HashMap<>();
+ private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet;
+ private Map<String, ConsumerMeta> consumerIdToConsumerMeta;
public ConsumerGroupMeta() {
- // Empty constructor
+ this.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
+ this.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
}
public ConsumerGroupMeta(
final String consumerGroupId, final long creationTime, final
ConsumerMeta firstConsumerMeta) {
+ this();
+
this.consumerGroupId = consumerGroupId;
this.creationTime = creationTime;
- this.topicNameToSubscribedConsumerIdSet = new HashMap<>();
- this.consumerIdToConsumerMeta = new HashMap<>();
consumerIdToConsumerMeta.put(firstConsumerMeta.getConsumerId(),
firstConsumerMeta);
}
@@ -61,8 +62,9 @@ public class ConsumerGroupMeta {
final ConsumerGroupMeta copied = new ConsumerGroupMeta();
copied.consumerGroupId = consumerGroupId;
copied.creationTime = creationTime;
- copied.topicNameToSubscribedConsumerIdSet = new
HashMap<>(topicNameToSubscribedConsumerIdSet);
- copied.consumerIdToConsumerMeta = new HashMap<>(consumerIdToConsumerMeta);
+ copied.topicNameToSubscribedConsumerIdSet =
+ new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
+ copied.consumerIdToConsumerMeta = new
ConcurrentHashMap<>(consumerIdToConsumerMeta);
return copied;
}
@@ -149,8 +151,12 @@ public class ConsumerGroupMeta {
return topics;
}
- public Set<String> getTopicsSubscribedByConsumerGroup() {
- return topicNameToSubscribedConsumerIdSet.keySet();
+ public boolean isTopicSubscribedByConsumerGroup(final String topic) {
+ final Set<String> subscribedConsumerIdSet =
topicNameToSubscribedConsumerIdSet.get(topic);
+ if (Objects.isNull(subscribedConsumerIdSet)) {
+ return false;
+ }
+ return !subscribedConsumerIdSet.isEmpty();
}
public void addSubscription(final String consumerId, final Set<String>
topics) {
@@ -228,7 +234,7 @@ public class ConsumerGroupMeta {
consumerGroupMeta.consumerGroupId =
ReadWriteIOUtils.readString(inputStream);
consumerGroupMeta.creationTime = ReadWriteIOUtils.readLong(inputStream);
- consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new HashMap<>();
+ consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new
ConcurrentHashMap<>();
int size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(inputStream);
@@ -242,7 +248,7 @@ public class ConsumerGroupMeta {
consumerGroupMeta.topicNameToSubscribedConsumerIdSet.put(key, value);
}
- consumerGroupMeta.consumerIdToConsumerMeta = new HashMap<>();
+ consumerGroupMeta.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
size = ReadWriteIOUtils.readInt(inputStream);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(inputStream);
@@ -259,7 +265,7 @@ public class ConsumerGroupMeta {
consumerGroupMeta.consumerGroupId =
ReadWriteIOUtils.readString(byteBuffer);
consumerGroupMeta.creationTime = ReadWriteIOUtils.readLong(byteBuffer);
- consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new HashMap<>();
+ consumerGroupMeta.topicNameToSubscribedConsumerIdSet = new
ConcurrentHashMap<>();
int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(byteBuffer);
@@ -273,7 +279,7 @@ public class ConsumerGroupMeta {
consumerGroupMeta.topicNameToSubscribedConsumerIdSet.put(key, value);
}
- consumerGroupMeta.consumerIdToConsumerMeta = new HashMap<>();
+ consumerGroupMeta.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(byteBuffer);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
index 84166834570..c2775b204a9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
@@ -119,7 +119,7 @@ public class ConsumerGroupMetaKeeper {
public Set<String> getSubscribedConsumerGroupIds(final String topicName) {
return consumerGroupIdToConsumerGroupMetaMap.entrySet().stream()
- .filter(entry ->
entry.getValue().getTopicsSubscribedByConsumerGroup().contains(topicName))
+ .filter(entry ->
entry.getValue().isTopicSubscribedByConsumerGroup(topicName))
.map(Entry::getKey)
.collect(Collectors.toSet());
}
@@ -129,13 +129,12 @@ public class ConsumerGroupMetaKeeper {
return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId)
&& consumerGroupIdToConsumerGroupMetaMap
.get(consumerGroupId)
- .getTopicsSubscribedByConsumerGroup()
- .contains(topicName);
+ .isTopicSubscribedByConsumerGroup(topicName);
}
public boolean isTopicSubscribedByConsumerGroup(final String topicName) {
return consumerGroupIdToConsumerGroupMetaMap.values().stream()
- .anyMatch(meta ->
meta.getTopicsSubscribedByConsumerGroup().contains(topicName));
+ .anyMatch(meta -> meta.isTopicSubscribedByConsumerGroup(topicName));
}
///////////////////////////////// Snapshot
/////////////////////////////////