This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 99769d1a6a0 Subscription: fix unexpected cancellation of workers
during consumer startup & optimize server-side subscription logs & add
synchronized modifier (#13032)
99769d1a6a0 is described below
commit 99769d1a6a0a9e4aec5b5f4f0cd3d486b0d628c7
Author: V_Galaxy <[email protected]>
AuthorDate: Mon Jul 29 12:19:34 2024 +0800
Subscription: fix unexpected cancellation of workers during consumer
startup & optimize server-side subscription logs & add synchronized modifier
(#13032)
---
.../consumer/SubscriptionConsumer.java | 9 ++++---
.../consumer/SubscriptionPullConsumer.java | 6 +++--
.../consumer/SubscriptionPushConsumer.java | 8 +++++-
.../agent/SubscriptionBrokerAgent.java | 2 ++
.../db/subscription/broker/SubscriptionBroker.java | 31 +++++++++++++---------
.../broker/SubscriptionPrefetchingQueue.java | 4 +--
.../broker/SubscriptionPrefetchingTabletQueue.java | 3 ++-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 3 ++-
8 files changed, 42 insertions(+), 24 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 2c09fabb0c2..c1a89ded983 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -234,13 +234,14 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
providers.releaseWriteLock();
}
+ // set isClosed to false before submitting workers
+ isClosed.set(false);
+
// submit heartbeat worker
submitHeartbeatWorker();
// submit endpoints syncer
submitEndpointsSyncer();
-
- isClosed.set(false);
}
@Override
@@ -1068,7 +1069,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
/////////////////////////////// stringify ///////////////////////////////
protected Map<String, String> coreReportMessage() {
- Map<String, String> result = new HashMap<>(5);
+ final Map<String, String> result = new HashMap<>(5);
result.put("consumerId", consumerId);
result.put("consumerGroupId", consumerGroupId);
result.put("isClosed", isClosed.toString());
@@ -1078,7 +1079,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
protected Map<String, String> allReportMessage() {
- Map<String, String> result = new HashMap<>(10);
+ final Map<String, String> result = new HashMap<>(10);
result.put("consumerId", consumerId);
result.put("consumerGroupId", consumerGroupId);
result.put("heartbeatIntervalMs", String.valueOf(heartbeatIntervalMs));
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
index 31148dc09fc..37734187404 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java
@@ -111,12 +111,14 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
super.open();
+ // set isClosed to false before submitting workers
+ isClosed.set(false);
+
+ // submit auto poll worker if enabling auto commit
if (autoCommit) {
uncommittedMessages = new ConcurrentSkipListMap<>();
submitAutoCommitWorker();
}
-
- isClosed.set(false);
}
@Override
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
index 5143958b89a..69cf13f9bef 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java
@@ -51,6 +51,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
private final AckStrategy ackStrategy;
private final ConsumeListener consumeListener;
+ // avoid interval less than or equal to zero
private final long autoPollIntervalMs;
private final long autoPollTimeoutMs;
@@ -115,8 +116,12 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
}
super.open();
- submitAutoPollWorker();
+
+ // set isClosed to false before submitting workers
isClosed.set(false);
+
+ // submit auto poll worker
+ submitAutoPollWorker();
}
@Override
@@ -289,6 +294,7 @@ public class SubscriptionPushConsumer extends
SubscriptionConsumer {
}
public Builder autoPollIntervalMs(final long autoPollIntervalMs) {
+ // avoid interval less than or equal to zero
this.autoPollIntervalMs = Math.max(autoPollIntervalMs, 1);
return this;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 87c7edfaf8a..538004a3ed8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -116,6 +116,7 @@ public class SubscriptionBrokerAgent {
public void createBroker(final String consumerGroupId) {
final SubscriptionBroker broker = new SubscriptionBroker(consumerGroupId);
consumerGroupIdToSubscriptionBroker.put(consumerGroupId, broker);
+ LOGGER.info("Subscription: create broker bound to consumer group [{}]",
consumerGroupId);
}
/**
@@ -140,6 +141,7 @@ public class SubscriptionBrokerAgent {
return false;
}
consumerGroupIdToSubscriptionBroker.remove(consumerGroupId);
+ LOGGER.info("Subscription: drop broker bound to consumer group [{}]",
consumerGroupId);
return true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index b9ae1ad7197..43acf12f180 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -64,10 +64,10 @@ public class SubscriptionBroker {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
- LOGGER.warn(
- "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] does not exist",
- topicName,
- brokerId);
+ // There are two reasons for not printing logs here:
+ // 1. There will be a delay in the creation of the prefetching queue
after subscription.
+ // 2. There is no corresponding prefetching queue on this DN
(currently the consumer is
+ // fully connected to all DNs).
continue;
}
// check if completed before closed
@@ -174,9 +174,7 @@ public class SubscriptionBroker {
public void bindPrefetchingQueue(
final String topicName, final UnboundedBlockingPendingQueue<Event>
inputPendingQueue) {
- final SubscriptionPrefetchingQueue prefetchingQueue =
- topicNameToPrefetchingQueue.get(topicName);
- if (Objects.nonNull(prefetchingQueue)) {
+ if (Objects.nonNull(topicNameToPrefetchingQueue.get(topicName))) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] has already existed",
topicName,
@@ -184,19 +182,22 @@ public class SubscriptionBroker {
return;
}
final String topicFormat =
SubscriptionAgent.topic().getTopicFormat(topicName);
+ final SubscriptionPrefetchingQueue prefetchingQueue;
if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) {
- final SubscriptionPrefetchingQueue queue =
+ prefetchingQueue =
new SubscriptionPrefetchingTsFileQueue(
brokerId, topicName, new
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
- SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
- topicNameToPrefetchingQueue.put(topicName, queue);
} else {
- final SubscriptionPrefetchingQueue queue =
+ prefetchingQueue =
new SubscriptionPrefetchingTabletQueue(
brokerId, topicName, new
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
- SubscriptionPrefetchingQueueMetrics.getInstance().register(queue);
- topicNameToPrefetchingQueue.put(topicName, queue);
}
+
SubscriptionPrefetchingQueueMetrics.getInstance().register(prefetchingQueue);
+ topicNameToPrefetchingQueue.put(topicName, prefetchingQueue);
+ LOGGER.info(
+ "Subscription: create prefetching queue bound to topic [{}] for
consumer group [{}]",
+ topicName,
+ brokerId);
}
public void unbindPrefetchingQueue(final String topicName, final boolean
doRemove) {
@@ -230,6 +231,10 @@ public class SubscriptionBroker {
// remove prefetching queue
topicNameToPrefetchingQueue.remove(topicName);
+ LOGGER.info(
+ "Subscription: drop prefetching queue bound to topic [{}] for
consumer group [{}]",
+ topicName,
+ brokerId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 83549212ed6..749bcf13530 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -389,7 +389,7 @@ public abstract class SubscriptionPrefetchingQueue {
/////////////////////////////// stringify ///////////////////////////////
protected Map<String, String> coreReportMessage() {
- Map<String, String> result = new HashMap<>(6);
+ final Map<String, String> result = new HashMap<>(6);
result.put("brokerId", brokerId);
result.put("topicName", topicName);
result.put("size of uncommittedEvents",
String.valueOf(uncommittedEvents.size()));
@@ -400,7 +400,7 @@ public abstract class SubscriptionPrefetchingQueue {
}
protected Map<String, String> allReportMessage() {
- Map<String, String> result = new HashMap<>(8);
+ final Map<String, String> result = new HashMap<>(8);
result.put("brokerId", brokerId);
result.put("topicName", topicName);
result.put("size of inputPendingQueue",
String.valueOf(inputPendingQueue.size()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index ddd38f50a59..1c327d2671d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -97,7 +97,8 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
return onEventInternal(null);
}
- private boolean onEventInternal(@Nullable final EnrichedEvent event) {
+ // missing synchronization can cause IoTDBSubscriptionSharingIT to lose data
+ private synchronized boolean onEventInternal(@Nullable final EnrichedEvent
event) {
final AtomicBoolean result = new AtomicBoolean(false);
currentBatchRef.getAndUpdate(
(batch) -> {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 2b70ea7d56a..f0604eca157 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -327,7 +327,8 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
return onEventInternal(null);
}
- private boolean onEventInternal(@Nullable final TabletInsertionEvent event) {
+ // missing synchronization can cause IoTDBSubscriptionSharingIT to lose data
+ private synchronized boolean onEventInternal(@Nullable final
TabletInsertionEvent event) {
final AtomicBoolean result = new AtomicBoolean(false);
currentBatchRef.getAndUpdate(
(batch) -> {