This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 83f8db0a8b3 Subscription: avoid incomplete resource release of
prefetching queue caused by restarting subscription pipe (#13079)
83f8db0a8b3 is described below
commit 83f8db0a8b3763dc02b4bcae76f3a9e10439c9c1
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Aug 2 18:08:29 2024 +0800
Subscription: avoid incomplete resource release of prefetching queue caused
by restarting subscription pipe (#13079)
---
.../agent/SubscriptionBrokerAgent.java | 15 ++-
.../agent/SubscriptionConsumerAgent.java | 4 +-
.../db/subscription/broker/SubscriptionBroker.java | 102 +++++++++++++++------
.../broker/SubscriptionPrefetchingQueue.java | 69 +++++---------
.../broker/SubscriptionPrefetchingTabletQueue.java | 6 +-
.../broker/SubscriptionPrefetchingTsFileQueue.java | 24 ++++-
.../SubscriptionConnectorSubtaskLifeCycle.java | 2 +-
.../iotdb/commons/pipe/event/EnrichedEvent.java | 19 +++-
8 files changed, 152 insertions(+), 89 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index 538004a3ed8..43b16cf2f4c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -158,15 +158,24 @@ public class SubscriptionBrokerAgent {
broker.bindPrefetchingQueue(subtask.getTopicName(),
subtask.getInputPendingQueue());
}
- public void unbindPrefetchingQueue(
- final String consumerGroupId, final String topicName, final boolean
doRemove) {
+ public void unbindPrefetchingQueue(final String consumerGroupId, final
String topicName) {
final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
if (Objects.isNull(broker)) {
LOGGER.warn(
"Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
return;
}
- broker.unbindPrefetchingQueue(topicName, doRemove);
+ broker.unbindPrefetchingQueue(topicName);
+ }
+
+ public void removePrefetchingQueue(final String consumerGroupId, final
String topicName) {
+ final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+ if (Objects.isNull(broker)) {
+ LOGGER.warn(
+ "Subscription: broker bound to consumer group [{}] does not exist",
consumerGroupId);
+ return;
+ }
+ broker.removePrefetchingQueue(topicName);
}
public void executePrefetch(final String consumerGroupId, final String
topicName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index eee14943df0..075098f8f93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -117,11 +117,11 @@ public class SubscriptionConsumerAgent {
return;
}
- // unbind and remove prefetching queue
+ // remove prefetching queue
final Set<String> topicsUnsubByGroup =
ConsumerGroupMeta.getTopicsUnsubByGroup(metaInAgent,
metaFromCoordinator);
for (final String topicName : topicsUnsubByGroup) {
- SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId,
topicName, true);
+ SubscriptionAgent.broker().removePrefetchingQueue(consumerGroupId,
topicName);
}
// TODO: Currently we fully replace the entire ConsumerGroupMeta without
carefully checking the
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 43acf12f180..acb4882a4e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -20,13 +20,19 @@
package org.apache.iotdb.db.subscription.broker;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
+import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetrics;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.rpc.subscription.config.TopicConstant;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +44,9 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
public class SubscriptionBroker {
@@ -46,14 +55,22 @@ public class SubscriptionBroker {
private final String brokerId; // consumer group id
private final Map<String, SubscriptionPrefetchingQueue>
topicNameToPrefetchingQueue;
+ private final Map<String, String> completedTopicNames;
+
+ // The subscription pipe that was restarted should reuse the previous commit
ID.
+ private final Map<String, AtomicLong> topicNameToCommitIdGenerator;
public SubscriptionBroker(final String brokerId) {
this.brokerId = brokerId;
this.topicNameToPrefetchingQueue = new ConcurrentHashMap<>();
+ this.completedTopicNames = new ConcurrentHashMap<>();
+ this.topicNameToCommitIdGenerator = new ConcurrentHashMap<>();
}
public boolean isEmpty() {
- return topicNameToPrefetchingQueue.isEmpty();
+ return topicNameToPrefetchingQueue.isEmpty()
+ && completedTopicNames.isEmpty()
+ && topicNameToCommitIdGenerator.isEmpty();
}
//////////////////////////// provided for SubscriptionBrokerAgent
////////////////////////////
@@ -64,21 +81,32 @@ public class SubscriptionBroker {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
+ // check if completed
+ if (completedTopicNames.containsKey(topicName)) {
+ LOGGER.info(
+ "Subscription: prefetching queue bound to topic [{}] for
consumer group [{}] is completed, return termination response to client",
+ topicName,
+ brokerId);
+ events.add(
+ new SubscriptionEvent(
+ new SubscriptionPipeEmptyEvent(),
+ new SubscriptionPollResponse(
+ SubscriptionPollResponseType.TERMINATION.getType(),
+ new TerminationPayload(),
+ new SubscriptionCommitContext(
+
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+ PipeDataNodeAgent.runtime().getRebootTimes(),
+ topicName,
+ brokerId,
+ INVALID_COMMIT_ID))));
+ continue;
+ }
// There are two reasons for not printing logs here:
// 1. There will be a delay in the creation of the prefetching queue
after subscription.
// 2. There is no corresponding prefetching queue on this DN
(currently the consumer is
// fully connected to all DNs).
continue;
}
- // check if completed before closed
- if (prefetchingQueue.isCompleted()) {
- LOGGER.info(
- "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is completed, return termination response to client",
- topicName,
- brokerId);
-
events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse());
- continue;
- }
if (prefetchingQueue.isClosed()) {
LOGGER.warn(
"Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is closed",
@@ -186,11 +214,17 @@ public class SubscriptionBroker {
if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(topicFormat)) {
prefetchingQueue =
new SubscriptionPrefetchingTsFileQueue(
- brokerId, topicName, new
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
+ brokerId,
+ topicName,
+ new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue),
+ topicNameToCommitIdGenerator.computeIfAbsent(topicName, (key) ->
new AtomicLong()));
} else {
prefetchingQueue =
new SubscriptionPrefetchingTabletQueue(
- brokerId, topicName, new
TsFileDeduplicationBlockingPendingQueue(inputPendingQueue));
+ brokerId,
+ topicName,
+ new TsFileDeduplicationBlockingPendingQueue(inputPendingQueue),
+ topicNameToCommitIdGenerator.computeIfAbsent(topicName, (key) ->
new AtomicLong()));
}
SubscriptionPrefetchingQueueMetrics.getInstance().register(prefetchingQueue);
topicNameToPrefetchingQueue.put(topicName, prefetchingQueue);
@@ -200,7 +234,7 @@ public class SubscriptionBroker {
brokerId);
}
- public void unbindPrefetchingQueue(final String topicName, final boolean
doRemove) {
+ public void unbindPrefetchingQueue(final String topicName) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.isNull(prefetchingQueue)) {
@@ -214,28 +248,40 @@ public class SubscriptionBroker {
// mark prefetching queue closed first
prefetchingQueue.markClosed();
- // mark prefetching queue completed only for topic of snapshot mode
- if (SubscriptionAgent.topic()
- .getTopicMode(topicName)
- .equals(TopicConstant.MODE_SNAPSHOT_VALUE)) {
- prefetchingQueue.markCompleted();
+ // mark topic name completed only for topic of snapshot mode
+ if
(SubscriptionAgent.topic().getTopicMode(topicName).equals(TopicConstant.MODE_SNAPSHOT_VALUE)
+ && prefetchingQueue.isCompleted()) {
+ completedTopicNames.put(topicName, topicName);
}
- if (doRemove) {
- // clean up events in prefetching queue
- prefetchingQueue.cleanup();
+ // clean up events in prefetching queue
+ prefetchingQueue.cleanup();
- // deregister metrics
- SubscriptionPrefetchingQueueMetrics.getInstance()
- .deregister(prefetchingQueue.getPrefetchingQueueId());
+ // deregister metrics
+ SubscriptionPrefetchingQueueMetrics.getInstance()
+ .deregister(prefetchingQueue.getPrefetchingQueueId());
- // remove prefetching queue
- topicNameToPrefetchingQueue.remove(topicName);
- LOGGER.info(
- "Subscription: drop prefetching queue bound to topic [{}] for
consumer group [{}]",
+ // remove prefetching queue
+ topicNameToPrefetchingQueue.remove(topicName);
+ LOGGER.info(
+ "Subscription: drop prefetching queue bound to topic [{}] for consumer
group [{}]",
+ topicName,
+ brokerId);
+ }
+
+ public void removePrefetchingQueue(final String topicName) {
+ final SubscriptionPrefetchingQueue prefetchingQueue =
+ topicNameToPrefetchingQueue.get(topicName);
+ if (Objects.nonNull(prefetchingQueue)) {
+ LOGGER.warn(
+ "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] still exists",
topicName,
brokerId);
+ return;
}
+
+ completedTopicNames.remove(topicName);
+ topicNameToCommitIdGenerator.remove(topicName);
}
public void executePrefetch(final String topicName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 749bcf13530..d4e441eb0fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -27,14 +27,9 @@ import
org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
-import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
-import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
-import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
-import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,19 +42,18 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
-
public abstract class SubscriptionPrefetchingQueue {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPrefetchingQueue.class);
protected final String brokerId; // consumer group id
protected final String topicName;
+
protected final SubscriptionBlockingPendingQueue inputPendingQueue;
protected final LinkedBlockingQueue<SubscriptionEvent> prefetchingQueue;
-
protected final Map<SubscriptionCommitContext, SubscriptionEvent>
uncommittedEvents;
- private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0);
+
+ private final AtomicLong commitIdGenerator;
private volatile boolean isCompleted = false;
private volatile boolean isClosed = false;
@@ -67,13 +61,15 @@ public abstract class SubscriptionPrefetchingQueue {
public SubscriptionPrefetchingQueue(
final String brokerId,
final String topicName,
- final SubscriptionBlockingPendingQueue inputPendingQueue) {
+ final SubscriptionBlockingPendingQueue inputPendingQueue,
+ final AtomicLong commitIdGenerator) {
this.brokerId = brokerId;
this.topicName = topicName;
this.inputPendingQueue = inputPendingQueue;
this.prefetchingQueue = new LinkedBlockingQueue<>();
this.uncommittedEvents = new ConcurrentHashMap<>();
+ this.commitIdGenerator = commitIdGenerator;
}
public void cleanup() {
@@ -170,13 +166,20 @@ public abstract class SubscriptionPrefetchingQueue {
}
if (event instanceof PipeTerminateEvent) {
- LOGGER.info(
- "Subscription: SubscriptionPrefetchingQueue {} commit
PipeTerminateEvent {}",
- this,
- event);
+ final PipeTerminateEvent terminateEvent = (PipeTerminateEvent) event;
+ // add mark completed hook
+ terminateEvent.addOnCommittedHook(
+ () -> {
+ markCompleted();
+ return null;
+ });
// commit directly
((PipeTerminateEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), true);
+ LOGGER.info(
+ "Subscription: SubscriptionPrefetchingQueue {} commit
PipeTerminateEvent {}",
+ this,
+ terminateEvent);
continue;
}
@@ -318,16 +321,7 @@ public abstract class SubscriptionPrefetchingQueue {
PipeDataNodeAgent.runtime().getRebootTimes(),
topicName,
brokerId,
- subscriptionCommitIdGenerator.getAndIncrement());
- }
-
- private SubscriptionCommitContext generateInvalidSubscriptionCommitContext()
{
- return new SubscriptionCommitContext(
- IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
- PipeDataNodeAgent.runtime().getRebootTimes(),
- topicName,
- brokerId,
- INVALID_COMMIT_ID);
+ commitIdGenerator.getAndIncrement());
}
//////////////////////////// APIs provided for metric framework
////////////////////////////
@@ -346,10 +340,10 @@ public abstract class SubscriptionPrefetchingQueue {
}
public long getCurrentCommitId() {
- return subscriptionCommitIdGenerator.get();
+ return commitIdGenerator.get();
}
- /////////////////////////////// termination ///////////////////////////////
+ /////////////////////////////// close & termination
///////////////////////////////
public boolean isClosed() {
return isClosed;
@@ -367,25 +361,6 @@ public abstract class SubscriptionPrefetchingQueue {
isCompleted = true;
}
- public SubscriptionEvent generateSubscriptionPollTerminationResponse() {
- return new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.TERMINATION.getType(),
- new TerminationPayload(),
- generateInvalidSubscriptionCommitContext()));
- }
-
- public SubscriptionEvent generateSubscriptionPollErrorResponse(
- final String errorMessage, final boolean critical) {
- return new SubscriptionEvent(
- new SubscriptionPipeEmptyEvent(),
- new SubscriptionPollResponse(
- SubscriptionPollResponseType.ERROR.getType(),
- new ErrorPayload(errorMessage, critical),
- generateInvalidSubscriptionCommitContext()));
- }
-
/////////////////////////////// stringify ///////////////////////////////
protected Map<String, String> coreReportMessage() {
@@ -393,7 +368,7 @@ public abstract class SubscriptionPrefetchingQueue {
result.put("brokerId", brokerId);
result.put("topicName", topicName);
result.put("size of uncommittedEvents",
String.valueOf(uncommittedEvents.size()));
- result.put("subscriptionCommitIdGenerator",
subscriptionCommitIdGenerator.toString());
+ result.put("commitIdGenerator", commitIdGenerator.toString());
result.put("isCompleted", String.valueOf(isCompleted));
result.put("isClosed", String.valueOf(isClosed));
return result;
@@ -406,7 +381,7 @@ public abstract class SubscriptionPrefetchingQueue {
result.put("size of inputPendingQueue",
String.valueOf(inputPendingQueue.size()));
result.put("size of prefetchingQueue",
String.valueOf(prefetchingQueue.size()));
result.put("uncommittedEvents", uncommittedEvents.toString());
- result.put("subscriptionCommitIdGenerator",
subscriptionCommitIdGenerator.toString());
+ result.put("commitIdGenerator", commitIdGenerator.toString());
result.put("isCompleted", String.valueOf(isCompleted));
result.put("isClosed", String.valueOf(isClosed));
return result;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
index 1c327d2671d..47c57cb1897 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletQueue.java
@@ -35,6 +35,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQueue {
@@ -53,8 +54,9 @@ public class SubscriptionPrefetchingTabletQueue extends
SubscriptionPrefetchingQ
public SubscriptionPrefetchingTabletQueue(
final String brokerId,
final String topicName,
- final SubscriptionBlockingPendingQueue inputPendingQueue) {
- super(brokerId, topicName, inputPendingQueue);
+ final SubscriptionBlockingPendingQueue inputPendingQueue,
+ final AtomicLong commitIdGenerator) {
+ super(brokerId, topicName, inputPendingQueue, commitIdGenerator);
this.currentBatchRef.set(
new SubscriptionPipeTabletEventBatch(this, BATCH_MAX_DELAY_IN_MS,
BATCH_MAX_SIZE_IN_BYTES));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index f0604eca157..d8ccc8fed69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -20,11 +20,15 @@
package org.apache.iotdb.db.subscription.broker;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
import
org.apache.iotdb.db.subscription.event.batch.SubscriptionPipeTsFileEventBatch;
+import org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeEmptyEvent;
import
org.apache.iotdb.db.subscription.event.pipe.SubscriptionPipeTsFilePlainEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
@@ -46,8 +50,11 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext.INVALID_COMMIT_ID;
+
public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQueue {
private static final Logger LOGGER =
@@ -66,8 +73,9 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
public SubscriptionPrefetchingTsFileQueue(
final String brokerId,
final String topicName,
- final SubscriptionBlockingPendingQueue inputPendingQueue) {
- super(brokerId, topicName, inputPendingQueue);
+ final SubscriptionBlockingPendingQueue inputPendingQueue,
+ final AtomicLong commitIdGenerator) {
+ super(brokerId, topicName, inputPendingQueue, commitIdGenerator);
this.consumerIdToSubscriptionEventMap = new ConcurrentHashMap<>();
this.currentBatchRef.set(
@@ -429,7 +437,17 @@ public class SubscriptionPrefetchingTsFileQueue extends
SubscriptionPrefetchingQ
private SubscriptionEvent generateSubscriptionPollErrorResponse(final String
errorMessage) {
// consider non-critical by default, meaning the client can retry
- return super.generateSubscriptionPollErrorResponse(errorMessage, false);
+ return new SubscriptionEvent(
+ new SubscriptionPipeEmptyEvent(),
+ new SubscriptionPollResponse(
+ SubscriptionPollResponseType.ERROR.getType(),
+ new ErrorPayload(errorMessage, false),
+ new SubscriptionCommitContext(
+ IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
+ PipeDataNodeAgent.runtime().getRebootTimes(),
+ topicName,
+ brokerId,
+ INVALID_COMMIT_ID)));
}
/////////////////////////////// stringify ///////////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 41e690cbce9..c05bd13f07c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -103,6 +103,6 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
// when dropping the subscription.
final String consumerGroupId = ((SubscriptionConnectorSubtask)
subtask).getConsumerGroupId();
final String topicName = ((SubscriptionConnectorSubtask)
subtask).getTopicName();
- SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId,
topicName, false);
+ SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId,
topicName);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 6d44d26c332..cbdbef1e4e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -29,9 +29,12 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
/**
* {@link EnrichedEvent} is an {@link Event} that can be enriched with
additional runtime
@@ -65,6 +68,7 @@ public abstract class EnrichedEvent implements Event {
protected boolean isTimeParsed;
protected boolean shouldReportOnCommit = true;
+ protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>();
protected EnrichedEvent(
final String pipeName,
@@ -83,6 +87,13 @@ public abstract class EnrichedEvent implements Event {
this.endTime = endTime;
isPatternParsed = this.pipePattern == null || this.pipePattern.isRoot();
isTimeParsed = Long.MIN_VALUE == startTime && Long.MAX_VALUE == endTime;
+ addOnCommittedHook(
+ () -> {
+ if (shouldReportOnCommit) {
+ reportProgress();
+ }
+ return null;
+ });
}
/**
@@ -346,9 +357,11 @@ public abstract class EnrichedEvent implements Event {
}
public void onCommitted() {
- if (shouldReportOnCommit) {
- reportProgress();
- }
+ onCommittedHooks.forEach(Supplier::get);
+ }
+
+ public void addOnCommittedHook(final Supplier<Void> hook) {
+ onCommittedHooks.add(hook);
}
public boolean isReleased() {