This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e7ab13faf2d Fix pipe drop event discard with restart-aware committer
keys (#17748)
e7ab13faf2d is described below
commit e7ab13faf2dc5377e29723ec2338b3f5144aa3bc
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 15:52:25 2026 +0800
Fix pipe drop event discard with restart-aware committer keys (#17748)
* Fix
* fix
* fix
* fix
* fix
---
.../agent/task/connection/PipeEventCollector.java | 5 ++-
.../sink/PipeRealtimePriorityBlockingQueue.java | 10 +++--
.../agent/task/subtask/sink/PipeSinkSubtask.java | 25 ++++++-----
.../subtask/sink/PipeSinkSubtaskLifeCycle.java | 9 ++--
.../task/subtask/sink/PipeSinkSubtaskManager.java | 6 ++-
.../evolvable/batch/PipeTabletEventBatch.java | 17 ++++++--
.../batch/PipeTransferBatchReqBuilder.java | 11 +++--
.../protocol/airgap/IoTDBDataRegionAirGapSink.java | 8 +++-
.../thrift/async/IoTDBDataRegionAsyncSink.java | 41 ++++++++---------
.../thrift/sync/IoTDBDataRegionSyncSink.java | 8 +++-
.../websocket/WebSocketConnectorServer.java | 51 ++++++++++------------
.../sink/protocol/websocket/WebSocketSink.java | 8 ++++
.../subtask/SubscriptionSinkSubtaskLifeCycle.java | 4 +-
.../subtask/SubscriptionSinkSubtaskManager.java | 7 ++-
.../task/subtask/sink/PipeSinkSubtaskTest.java | 6 ++-
.../task/connection/BlockingPendingQueue.java | 39 ++++++++++++-----
.../task/progress/PipeEventCommitManager.java | 5 +++
.../protocol/PipeConnectorWithEventDiscard.java | 7 +++
18 files changed, 171 insertions(+), 96 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 7e6b0aa7781..2324a3d2707 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -247,7 +247,10 @@ public class PipeEventCollector implements EventCollector {
enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes());
if (enrichedEvent.getPipeName() != null
- && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(),
creationTime, regionId)) {
+ && (pendingQueue.isEventFromDroppedPipe(enrichedEvent)
+ || (enrichedEvent.getCommitterKey() == null
+ && pendingQueue.isPipeDropped(
+ enrichedEvent.getPipeName(), creationTime, regionId)))) {
enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName());
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
index 4b65746b3ab..8641dbc7867 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java
@@ -356,12 +356,16 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
@Override
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
+ super.discardEventsOfPipe(committerKey);
tsfileInsertEventDeque.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isEventFromPipe(
- ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isEventFromPipe((EnrichedEvent) event, committerKey)) {
if (((EnrichedEvent) event)
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
index 560512521e7..90d325f6d23 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
@@ -201,10 +202,9 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
* When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
* its queued events in the output pipe connector.
*/
- public void discardEventsOfPipe(
- final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
// Try to remove the events as much as possible
- inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop,
regionId);
+ inputPendingQueue.discardEventsOfPipe(committerKey);
try {
increaseHighPriorityTaskCount();
@@ -217,9 +217,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
// use a new thread to stop all the pipes, we will not encounter
deadlock here. Or else we
// will.
if (lastEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
- && creationTimeToDrop == ((EnrichedEvent)
lastEvent).getCreationTime()
- && regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
+ && isEventFromPipe((EnrichedEvent) lastEvent, committerKey)) {
// Do not clear the last event's reference counts because it may be
on transferring
lastEvent = null;
// Submit self to avoid that the lastEvent has been retried "max
times" times and has
@@ -241,9 +239,7 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
// clear the lastExceptionEvent. It's safe to potentially clear it
twice because we have the
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent)
lastExceptionEvent).getPipeName())
- && creationTimeToDrop == ((EnrichedEvent)
lastExceptionEvent).getCreationTime()
- && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId())
{
+ && isEventFromPipe((EnrichedEvent) lastExceptionEvent,
committerKey)) {
clearReferenceCountAndReleaseLastExceptionEvent();
}
}
@@ -252,11 +248,18 @@ public class PipeSinkSubtask extends
PipeAbstractSinkSubtask {
}
if (outputPipeSink instanceof PipeConnectorWithEventDiscard) {
- ((PipeConnectorWithEventDiscard) outputPipeSink)
- .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId);
+ ((PipeConnectorWithEventDiscard)
outputPipeSink).discardEventsOfPipe(committerKey);
}
}
+ private static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
+ }
+
//////////////////////////// APIs provided for metric framework
////////////////////////////
public String getAttributeSortedString() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
index 85634277627..42b1ae91366 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -87,19 +88,17 @@ public class PipeSinkSubtaskLifeCycle implements
AutoCloseable {
* Otherwise, the {@link PipeSinkSubtaskLifeCycle#runningTaskCount} might be
inconsistent with the
* {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} because of parallel
connector scheduling.
*
- * @param pipeNameToDeregister pipe name
- * @param regionId region id
+ * @param committerKey committer key of the pipe task to deregister
* @return {@code true} if the {@link PipeSinkSubtask} is out of life cycle,
indicating that the
* {@link PipeSinkSubtask} should never be used again
* @throws IllegalStateException if {@link
PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0
*/
- public synchronized boolean deregister(
- final String pipeNameToDeregister, final long creationTimeToDeregister,
final int regionId) {
+ public synchronized boolean deregister(final CommitterKey committerKey) {
if (registeredTaskCount <= 0) {
throw new
IllegalStateException(DataNodePipeMessages.REGISTEREDTASKCOUNT_0_1);
}
- subtask.discardEventsOfPipe(pipeNameToDeregister,
creationTimeToDeregister, regionId);
+ subtask.discardEventsOfPipe(committerKey);
try {
if (registeredTaskCount > 1) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
index 367b9210406..3ad99ca5c06 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -211,7 +212,10 @@ public class PipeSinkSubtaskManager {
// Shall not be empty
final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor;
- lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId));
+ final CommitterKey committerKey =
+ PipeEventCommitManager.getInstance().getCommitterKey(pipeName,
creationTime, regionId);
+
+ lifeCycles.removeIf(o -> o.deregister(committerKey));
if (lifeCycles.isEmpty()) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
index 8bf69e6e6b0..aede0e994d9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -157,11 +158,13 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
*/
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
events.removeIf(
event -> {
- if (pipeNameToDrop.equals(event.getPipeName())
- && creationTimeToDrop == event.getCreationTime()
- && regionId == event.getRegionId()) {
+ if (isEventFromPipe(event, committerKey)) {
event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
return true;
}
@@ -169,6 +172,14 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
});
}
+ private static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
+ }
+
public synchronized void decreaseEventsReferenceCount(
final String holderMessage, final boolean shouldReport) {
events.forEach(event -> event.decreaseReferenceCount(holderMessage,
shouldReport));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index 3bec537614c..b3a8884a146 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.payload.evolvable.batch;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
@@ -201,10 +202,12 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop,
regionId);
- endPointToBatch
- .values()
- .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
+ defaultBatch.discardEventsOfPipe(committerKey);
+ endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(committerKey));
}
public int size() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
index 649ef35c4ce..ea83524988b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.airgap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
import org.apache.iotdb.commons.utils.RetryUtils;
@@ -613,8 +614,13 @@ public class IoTDBDataRegionAirGapSink extends
IoTDBDataNodeAirGapSink {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
+ tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index 9adbcf6cf16..32a2c191048 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -23,8 +23,8 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.client.ThriftClient;
import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink;
@@ -130,9 +130,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
private final Map<PipeTransferTrackableHandler,
PipeTransferTrackableHandler> pendingHandlers =
new ConcurrentHashMap<>();
- // Pipe name, creation time, region id
- private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
- ConcurrentHashMap.newKeySet();
+ private final Set<CommitterKey> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
private boolean enableSendTsFileLimit;
private volatile boolean isConnectionException;
@@ -749,16 +747,20 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
+ droppedPipeTaskKeys.add(committerKey);
if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
+ tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isDroppedPipe(
- (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
@@ -769,8 +771,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
retryTsFileQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isDroppedPipe(
- (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isDroppedPipe((EnrichedEvent) event, committerKey)) {
((EnrichedEvent)
event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName());
retryEventQueueEventCounter.decreaseEventCount(event);
return true;
@@ -872,18 +873,14 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink {
}
private boolean isDroppedPipe(final EnrichedEvent event) {
- return droppedPipeTaskKeys.contains(
- new Triple<>(event.getPipeName(), event.getCreationTime(),
event.getRegionId()));
- }
-
- private static boolean isDroppedPipe(
- final EnrichedEvent event,
- final String pipeNameToDrop,
- final long creationTimeToDrop,
- final int regionId) {
- return pipeNameToDrop.equals(event.getPipeName())
- && creationTimeToDrop == event.getCreationTime()
- && regionId == event.getRegionId();
+ return droppedPipeTaskKeys.stream().anyMatch(key -> isDroppedPipe(event,
key));
+ }
+
+ private static boolean isDroppedPipe(final EnrichedEvent event, final
CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
index 5e6297d8438..d9e25f5e09f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.thrift.sync;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter;
@@ -604,8 +605,13 @@ public class IoTDBDataRegionSyncSink extends
IoTDBDataNodeSyncSink {
@Override
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ @Override
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
if (Objects.nonNull(tabletBatchBuilder)) {
- tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop,
creationTimeToDrop, regionId);
+ tabletBatchBuilder.discardEventsOfPipe(committerKey);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
index bbb4cb9a3a8..baddf4727d6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket;
import org.apache.iotdb.commons.external.collections4.BidiMap;
import org.apache.iotdb.commons.external.collections4.bidimap.DualTreeBidiMap;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -60,9 +60,7 @@ public class WebSocketConnectorServer extends WebSocketServer
{
private final ConcurrentHashMap<String, ConcurrentHashMap<Long,
EventWaitingForAck>>
eventsWaitingForAck = new ConcurrentHashMap<>();
- // Pipe name, creation time, region id
- private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
- ConcurrentHashMap.newKeySet();
+ private final Set<CommitterKey> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
private final BidiMap<String, WebSocket> router =
new DualTreeBidiMap<String, WebSocket>(null,
Comparator.comparing(Object::hashCode)) {};
@@ -118,33 +116,33 @@ public class WebSocketConnectorServer extends
WebSocketServer {
.forEach((eventId, eventWrapper) ->
discardEvent(eventWrapper.event));
}
- droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName));
+ droppedPipeTaskKeys.removeIf(key -> key.getPipeName().equals(pipeName));
}
public synchronized void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public synchronized void discardEventsOfPipe(final CommitterKey
committerKey) {
+ droppedPipeTaskKeys.add(committerKey);
final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue =
- eventsWaitingForTransfer.get(pipeNameToDrop);
+ eventsWaitingForTransfer.get(committerKey.getPipeName());
if (eventTransferQueue != null) {
eventTransferQueue.removeIf(
- eventWrapper ->
- discardIfMatches(eventWrapper.event, pipeNameToDrop,
creationTimeToDrop, regionId));
+ eventWrapper -> discardIfMatches(eventWrapper.event, committerKey));
synchronized (eventTransferQueue) {
eventTransferQueue.notifyAll();
}
}
final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap =
- eventsWaitingForAck.get(pipeNameToDrop);
+ eventsWaitingForAck.get(committerKey.getPipeName());
if (eventId2EventMap != null) {
eventId2EventMap
.entrySet()
- .removeIf(
- entry ->
- discardIfMatches(
- entry.getValue().event, pipeNameToDrop,
creationTimeToDrop, regionId));
+ .removeIf(entry -> discardIfMatches(entry.getValue().event,
committerKey));
}
}
@@ -506,19 +504,13 @@ public class WebSocketConnectorServer extends
WebSocketServer {
}
}
- private boolean discardIfMatches(
- final Event event,
- final String pipeNameToDrop,
- final long creationTimeToDrop,
- final int regionId) {
+ private boolean discardIfMatches(final Event event, final CommitterKey
committerKey) {
if (!(event instanceof EnrichedEvent)) {
return false;
}
final EnrichedEvent enrichedEvent = (EnrichedEvent) event;
- if (!pipeNameToDrop.equals(enrichedEvent.getPipeName())
- || creationTimeToDrop != enrichedEvent.getCreationTime()
- || regionId != enrichedEvent.getRegionId()) {
+ if (!isEventFromPipe(enrichedEvent, committerKey)) {
return false;
}
@@ -528,11 +520,16 @@ public class WebSocketConnectorServer extends
WebSocketServer {
private boolean isDroppedPipe(final Event event) {
return event instanceof EnrichedEvent
- && droppedPipeTaskKeys.contains(
- new Triple<>(
- ((EnrichedEvent) event).getPipeName(),
- ((EnrichedEvent) event).getCreationTime(),
- ((EnrichedEvent) event).getRegionId()));
+ && droppedPipeTaskKeys.stream()
+ .anyMatch(key -> isEventFromPipe((EnrichedEvent) event, key));
+ }
+
+ private static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
}
private boolean isQueueAvailable(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
index 3c487ff1356..06eab035b4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.protocol.websocket;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
@@ -177,6 +178,13 @@ public class WebSocketSink implements PipeConnector,
PipeConnectorWithEventDisca
}
}
+ @Override
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
+ if (server != null) {
+ server.discardEventsOfPipe(committerKey);
+ }
+ }
+
public void commit(EnrichedEvent enrichedEvent) {
Optional.ofNullable(enrichedEvent)
.ifPresent(event ->
event.decreaseReferenceCount(WebSocketSink.class.getName(), true));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
index 41026a18048..ca04c1a6d5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.subscription.task.subtask;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.db.i18n.DataNodeMiscMessages;
import org.apache.iotdb.db.pipe.agent.task.execution.PipeSinkSubtaskExecutor;
import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtask;
@@ -64,8 +65,7 @@ public class SubscriptionSinkSubtaskLifeCycle extends
PipeSinkSubtaskLifeCycle {
}
@Override
- public synchronized boolean deregister(
- final String pipeNameToDeregister, final long creationTimeToDeregister,
final int regionId) {
+ public synchronized boolean deregister(final CommitterKey committerKey) {
if (registeredTaskCount <= 0) {
throw new
IllegalStateException(DataNodeMiscMessages.REGISTERED_TASK_COUNT_LE_ZERO);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
index 3a1ef0d74ad..a41b8a09558 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.subscription.task.subtask;
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
@@ -170,7 +171,11 @@ public class SubscriptionSinkSubtaskManager {
final PipeSinkSubtaskLifeCycle lifeCycle =
attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString);
- if (lifeCycle.deregister(pipeName, creationTime, regionId)) {
+
+ final CommitterKey committerKey =
+ PipeEventCommitManager.getInstance().getCommitterKey(pipeName,
creationTime, regionId);
+
+ if (lifeCycle.deregister(committerKey)) {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
index ddfc699721b..2a15fb9ea18 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.task.subtask.sink;
import
org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -51,9 +52,10 @@ public class PipeSinkSubtaskTest {
connector));
try {
- subtask.discardEventsOfPipe("pipe", 1L, 1);
+ final CommitterKey committerKey = new CommitterKey("pipe", 1L, 1, -1);
+ subtask.discardEventsOfPipe(committerKey);
- verify((PipeConnectorWithEventDiscard)
connector).discardEventsOfPipe("pipe", 1L, 1);
+ verify((PipeConnectorWithEventDiscard)
connector).discardEventsOfPipe(committerKey);
} finally {
subtask.close();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
index c56e8143ef5..c430e3f6b06 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.commons.pipe.agent.task.connection;
import org.apache.iotdb.commons.i18n.PipeMessages;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.commons.pipe.datastructure.Triple;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.pipe.api.event.Event;
@@ -48,9 +48,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
- // Pipe name, creation time, region id
- protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys =
- ConcurrentHashMap.newKeySet();
+ protected final Set<CommitterKey> droppedPipeTaskKeys =
ConcurrentHashMap.newKeySet();
protected BlockingPendingQueue(
final BlockingQueue<E> pendingQueue, final PipeEventCounter
eventCounter) {
@@ -139,12 +137,15 @@ public abstract class BlockingPendingQueue<E extends
Event> {
public void discardEventsOfPipe(
final String pipeNameToDrop, final long creationTimeToDrop, final int
regionId) {
- droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop,
regionId));
+ discardEventsOfPipe(new CommitterKey(pipeNameToDrop, creationTimeToDrop,
regionId, -1));
+ }
+
+ public void discardEventsOfPipe(final CommitterKey committerKey) {
+ droppedPipeTaskKeys.add(committerKey);
pendingQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
- && isEventFromPipe(
- ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop,
regionId)) {
+ && isEventFromPipe((EnrichedEvent) event, committerKey)) {
if (((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
eventCounter.decreaseEventCount(event);
}
@@ -192,16 +193,30 @@ public abstract class BlockingPendingQueue<E extends
Event> {
&& regionId == event.getRegionId();
}
+ protected static boolean isEventFromPipe(
+ final EnrichedEvent event, final CommitterKey committerKey) {
+ return committerKey.getPipeName().equals(event.getPipeName())
+ && committerKey.getCreationTime() == event.getCreationTime()
+ && committerKey.getRegionId() == event.getRegionId()
+ && (committerKey.getRestartTimes() < 0 ||
committerKey.equals(event.getCommitterKey()));
+ }
+
protected boolean isEventFromDroppedPipe(final E event) {
return event instanceof EnrichedEvent
&& ((EnrichedEvent) event).getPipeName() != null
- && isPipeDropped(
- ((EnrichedEvent) event).getPipeName(),
- ((EnrichedEvent) event).getCreationTime(),
- ((EnrichedEvent) event).getRegionId());
+ && isEventFromDroppedPipe((EnrichedEvent) event);
+ }
+
+ public boolean isEventFromDroppedPipe(final EnrichedEvent event) {
+ return droppedPipeTaskKeys.stream().anyMatch(key -> isEventFromPipe(event,
key));
}
public boolean isPipeDropped(final String pipeName, final long creationTime,
final int regionId) {
- return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime,
regionId));
+ return droppedPipeTaskKeys.stream()
+ .anyMatch(
+ key ->
+ key.getPipeName().equals(pipeName)
+ && key.getCreationTime() == creationTime
+ && key.getRegionId() == regionId);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index f2c3a73e18c..26e7ea305d5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -168,6 +168,11 @@ public class PipeEventCommitManager {
return true;
}
+ public CommitterKey getCommitterKey(
+ final String pipeName, final long creationTime, final int regionId) {
+ return generateCommitterKey(pipeName, creationTime, regionId);
+ }
+
private CommitterKey generateCommitterKey(
final String pipeName, final long creationTime, final int regionId) {
return taskAgent.getCommitterKey(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
index ab4dbcf9075..4ffc0c25ed2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java
@@ -19,7 +19,14 @@
package org.apache.iotdb.commons.pipe.sink.protocol;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
+
public interface PipeConnectorWithEventDiscard {
void discardEventsOfPipe(String pipeName, long creationTime, int regionId);
+
+ default void discardEventsOfPipe(final CommitterKey committerKey) {
+ discardEventsOfPipe(
+ committerKey.getPipeName(), committerKey.getCreationTime(),
committerKey.getRegionId());
+ }
}