This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch drop-fix in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4b8a274a526f590ae688b98c9c8e5d5356269fca Author: Caideyipi <[email protected]> AuthorDate: Wed May 6 17:16:23 2026 +0800 Pipe: Fixed the event clear logic of drop pipe (#17560) * drop-1 * wd * drop * fix * local * triple * by * spt * bug-fix * no-pipe-task-key * Update IoTDBDataRegionAsyncSink.java * triple * Fix --- .../agent/task/connection/PipeEventCollector.java | 20 ++- .../sink/PipeRealtimePriorityBlockingQueue.java | 13 +- .../agent/task/subtask/sink/PipeSinkSubtask.java | 14 +- .../subtask/sink/PipeSinkSubtaskLifeCycle.java | 5 +- .../task/subtask/sink/PipeSinkSubtaskManager.java | 2 +- .../evolvable/batch/PipeTabletEventBatch.java | 7 +- .../batch/PipeTransferBatchReqBuilder.java | 9 +- .../thrift/async/IoTDBDataRegionAsyncSink.java | 50 +++++-- .../thrift/sync/IoTDBDataRegionSyncSink.java | 5 +- .../websocket/WebSocketConnectorServer.java | 143 +++++++++++++++++---- .../sink/protocol/websocket/WebSocketSink.java | 11 +- .../subtask/SubscriptionSinkSubtaskLifeCycle.java | 3 +- .../subtask/SubscriptionSinkSubtaskManager.java | 2 +- .../task/connection/PipeEventCollectorTest.java | 97 ++++++++++++++ .../task/subtask/sink/PipeSinkSubtaskTest.java | 61 +++++++++ .../apache/iotdb/db/pipe/sink/PipeSinkTest.java | 105 +++++++++++++++ .../task/connection/BlockingPendingQueue.java | 55 +++++++- .../iotdb/commons/pipe/datastructure/Triple.java | 63 +++++++++ .../commons/pipe/sink/protocol/IoTDBSink.java | 5 +- .../protocol/PipeConnectorWithEventDiscard.java | 25 ++++ 20 files changed, 623 insertions(+), 72 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java index 4728f62c94f..387d4ff7ec6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java @@ -178,7 +178,8 @@ public class PipeEventCollector implements EventCollector { private void collectEvent(final Event event) { if (event instanceof EnrichedEvent) { - if (!((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName())) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!enrichedEvent.increaseReferenceCount(PipeEventCollector.class.getName())) { LOGGER.warn("PipeEventCollector: The event {} is already released, skipping it.", event); isFailedToIncreaseReferenceCount = true; return; @@ -186,18 +187,25 @@ public class PipeEventCollector implements EventCollector { // Assign a commit id for this event in order to report progress in order. PipeEventCommitManager.getInstance() - .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, creationTime, regionId); + .enrichWithCommitterKeyAndCommitId(enrichedEvent, creationTime, regionId); - // Assign a rebootTime for pipeConsensus - ((EnrichedEvent) event).setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + // Assign a rebootTime for iotConsensusV2 + enrichedEvent.setRebootTimes(PipeDataNodeAgent.runtime().getRebootTimes()); + + if (enrichedEvent.getPipeName() != null + && pendingQueue.isPipeDropped(enrichedEvent.getPipeName(), creationTime, regionId)) { + enrichedEvent.clearReferenceCount(PipeEventCollector.class.getName()); + return; + } } if (event instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue); } - pendingQueue.offer(event); - collectInvocationCount.incrementAndGet(); + if (pendingQueue.offer(event)) { + collectInvocationCount.incrementAndGet(); + } } public void resetFlags() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java index 6d227ac31fd..f972bba0e6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java @@ -73,7 +73,9 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ @Override public boolean offer(final Event event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } if (event instanceof TsFileInsertionEvent) { tsfileInsertEventDeque.add((TsFileInsertionEvent) event); @@ -356,13 +358,14 @@ public class PipeRealtimePriorityBlockingQueue extends UnboundedBlockingPendingQ } @Override - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - super.discardEventsOfPipe(pipeNameToDrop, regionId); + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + super.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); tsfileInsertEventDeque.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event) .clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) { eventCounter.decreaseEventCount(event); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 4b4794891f6..80ddeae56db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractSinkSubtask; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent; @@ -199,9 +201,10 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { * When a pipe is dropped, the connector maybe reused and will not be closed. So we just discard * its queued events in the output pipe connector. */ - public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { // Try to remove the events as much as possible - inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId); + inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); try { increaseHighPriorityTaskCount(); @@ -215,6 +218,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // will. if (lastEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastEvent).getCreationTime() && regionId == ((EnrichedEvent) lastEvent).getRegionId()) { // Do not clear the last event's reference counts because it may be on transferring lastEvent = null; @@ -238,6 +242,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // "nonnull" detection. if (lastExceptionEvent instanceof EnrichedEvent && pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName()) + && creationTimeToDrop == ((EnrichedEvent) lastExceptionEvent).getCreationTime() && regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) { clearReferenceCountAndReleaseLastExceptionEvent(); } @@ -246,8 +251,9 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { decreaseHighPriorityTaskCount(); } - if (outputPipeConnector instanceof IoTDBSink) { - ((IoTDBSink) outputPipeConnector).discardEventsOfPipe(pipeNameToDrop, regionId); + if (outputPipeConnector instanceof PipeConnectorWithEventDiscard) { + ((PipeConnectorWithEventDiscard) outputPipeConnector) + .discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java index 35f7983075d..1780f5a87ef 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskLifeCycle.java @@ -92,12 +92,13 @@ public class PipeSinkSubtaskLifeCycle implements AutoCloseable { * {@link PipeSinkSubtask} should never be used again * @throws IllegalStateException if {@link PipeSinkSubtaskLifeCycle#registeredTaskCount} <= 0 */ - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } - subtask.discardEventsOfPipe(pipeNameToDeregister, regionId); + subtask.discardEventsOfPipe(pipeNameToDeregister, creationTimeToDeregister, regionId); try { if (registeredTaskCount > 1) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java index caff425f790..d7f81c12dbc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java @@ -209,7 +209,7 @@ public class PipeSinkSubtaskManager { // Shall not be empty final PipeSinkSubtaskExecutor executor = lifeCycles.get(0).executor; - lifeCycles.removeIf(o -> o.deregister(pipeName, regionId)); + lifeCycles.removeIf(o -> o.deregister(pipeName, creationTime, regionId)); if (lifeCycles.isEmpty()) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index 96bddd0d672..c44e12a4bbf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -154,10 +154,13 @@ public abstract class PipeTabletEventBatch implements AutoCloseable { * Discard all events of the given pipe. This method only clears the reference count of the events * and discard them, but do not modify other objects (such as buffers) for simplicity. */ - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { events.removeIf( event -> { - if (pipeNameToDrop.equals(event.getPipeName()) && regionId == event.getRegionId()) { + if (pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId()) { event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java index 9fa706e985f..ac5f568f1c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTransferBatchReqBuilder.java @@ -195,9 +195,12 @@ public class PipeTransferBatchReqBuilder implements AutoCloseable { && endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty); } - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - defaultBatch.discardEventsOfPipe(pipeNameToDrop, regionId); - endPointToBatch.values().forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, regionId)); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + defaultBatch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + endPointToBatch + .values() + .forEach(batch -> batch.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId)); } public int size() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index 07381d15e28..fe3d44bedb4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.pipe.sink.protocol.IoTDBSink; @@ -74,6 +75,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -121,6 +123,10 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { private final Map<PipeTransferTrackableHandler, PipeTransferTrackableHandler> pendingHandlers = new ConcurrentHashMap<>(); + // Pipe name, creation time, region id + private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); + private boolean enableSendTsFileLimit; private volatile boolean isConnectionException; @@ -660,8 +666,15 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { public void addFailureEventToRetryQueue(final Event event, final Exception e) { isConnectionException = e instanceof PipeConnectionException || ThriftClient.isConnectionBroken(e); - if (event instanceof EnrichedEvent && ((EnrichedEvent) event).isReleased()) { - return; + if (event instanceof EnrichedEvent) { + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (enrichedEvent.isReleased()) { + return; + } + if (isDroppedPipe(enrichedEvent)) { + enrichedEvent.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); + return; + } } if (isClosed.get()) { @@ -707,15 +720,18 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { //////////////////////////// Operations for close //////////////////////////// @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { - if (isTabletBatchModeEnabled) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + + if (isTabletBatchModeEnabled && Objects.nonNull(tabletBatchBuilder)) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -726,8 +742,8 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { retryTsFileQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isDroppedPipe( + (EnrichedEvent) event, pipeNameToDrop, creationTimeToDrop, regionId)) { ((EnrichedEvent) event).clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); retryEventQueueEventCounter.decreaseEventCount(event); return true; @@ -771,6 +787,7 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { // clear reference count of events in retry queue after closing async client clearRetryEventsReferenceCount(); + droppedPipeTaskKeys.clear(); super.close(); } @@ -827,6 +844,21 @@ public class IoTDBDataRegionAsyncSink extends IoTDBSink { this.transferTsFileCounter = transferTsFileCounter; } + private boolean isDroppedPipe(final EnrichedEvent event) { + return droppedPipeTaskKeys.contains( + new Triple<>(event.getPipeName(), event.getCreationTime(), event.getRegionId())); + } + + private static boolean isDroppedPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } + @Override public void setTabletBatchSizeHistogram(Histogram tabletBatchSizeHistogram) { if (tabletBatchBuilder != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index a13f40b1b83..552b8cf1cae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -521,9 +521,10 @@ public class IoTDBDataRegionSyncSink extends IoTDBDataNodeSyncSink { } @Override - public synchronized void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { if (Objects.nonNull(tabletBatchBuilder)) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, regionId); + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java index 6da64460495..43e2ab6dbc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketConnectorServer.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; +import org.apache.iotdb.commons.external.collections4.BidiMap; +import org.apache.iotdb.commons.external.collections4.bidimap.DualTreeBidiMap; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.pipe.api.event.Event; @@ -39,6 +42,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -57,6 +61,10 @@ public class WebSocketConnectorServer extends WebSocketServer { private final ConcurrentHashMap<String, ConcurrentHashMap<Long, EventWaitingForAck>> eventsWaitingForAck = new ConcurrentHashMap<>(); + // Pipe name, creation time, region id + private final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); + private final BidiMap<String, WebSocket> router = new DualTreeBidiMap<String, WebSocket>(null, Comparator.comparing(Object::hashCode)) {}; @@ -97,13 +105,8 @@ public class WebSocketConnectorServer extends WebSocketServer { eventWrappers = new ArrayList<>(eventTransferQueue); eventTransferQueue.clear(); } - eventWrappers.forEach( - (eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + eventWrappers.forEach(eventWrapper -> discardEvent(eventWrapper.event)); + eventWrappers.clear(); synchronized (eventTransferQueue) { eventTransferQueue.notifyAll(); } @@ -113,13 +116,36 @@ public class WebSocketConnectorServer extends WebSocketServer { if (eventsWaitingForAck.containsKey(pipeName)) { eventsWaitingForAck .remove(pipeName) - .forEach( - (eventId, eventWrapper) -> { - if (eventWrapper.event instanceof EnrichedEvent) { - ((EnrichedEvent) eventWrapper.event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } - }); + .forEach((eventId, eventWrapper) -> discardEvent(eventWrapper.event)); + } + + droppedPipeTaskKeys.removeIf(key -> key.getFirst().equals(pipeName)); + } + + public synchronized void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); + + final PriorityBlockingQueue<EventWaitingForTransfer> eventTransferQueue = + eventsWaitingForTransfer.get(pipeNameToDrop); + if (eventTransferQueue != null) { + eventTransferQueue.removeIf( + eventWrapper -> + discardIfMatches(eventWrapper.event, pipeNameToDrop, creationTimeToDrop, regionId)); + synchronized (eventTransferQueue) { + eventTransferQueue.notifyAll(); + } + } + + final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap = + eventsWaitingForAck.get(pipeNameToDrop); + if (eventId2EventMap != null) { + eventId2EventMap + .entrySet() + .removeIf( + entry -> + discardIfMatches( + entry.getValue().event, pipeNameToDrop, creationTimeToDrop, regionId)); } } @@ -300,21 +326,24 @@ public class WebSocketConnectorServer extends WebSocketServer { } public void addEvent(Event event, WebSocketSink connector) { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + + final String pipeName = connector.getPipeName(); final PriorityBlockingQueue<EventWaitingForTransfer> queue = - eventsWaitingForTransfer.get(connector.getPipeName()); + eventsWaitingForTransfer.get(pipeName); if (queue == null) { LOGGER.warn("The pipe {} was dropped so the event {} will be dropped.", connector, event); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } if (queue.size() >= 5) { synchronized (queue) { - while (queue.size() >= 5) { + while (queue.size() >= 5 && isQueueAvailable(pipeName, queue) && !isDroppedPipe(event)) { try { queue.wait(); } catch (InterruptedException e) { @@ -323,12 +352,22 @@ public class WebSocketConnectorServer extends WebSocketServer { } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + queue.put( new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); return; } } + if (!isQueueAvailable(pipeName, queue) || isDroppedPipe(event)) { + discardEvent(event); + return; + } + synchronized (queue) { queue.put(new EventWaitingForTransfer(eventIdGenerator.incrementAndGet(), connector, event)); } @@ -377,6 +416,11 @@ public class WebSocketConnectorServer extends WebSocketServer { final WebSocketSink connector = element.connector; try { + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + ByteBuffer tabletBuffer; if (event instanceof PipeRawTabletInsertionEvent) { tabletBuffer = ((PipeRawTabletInsertionEvent) event).convertToTablet().serialize(); @@ -387,7 +431,11 @@ public class WebSocketConnectorServer extends WebSocketServer { } if (tabletBuffer == null) { - connector.commit((EnrichedEvent) event); + if (isDroppedPipe(event)) { + discardEvent(event); + } else { + connector.commit((EnrichedEvent) event); + } return; } @@ -398,11 +446,17 @@ public class WebSocketConnectorServer extends WebSocketServer { server.broadcast(payload, Collections.singletonList(router.get(pipeName))); + if (isDroppedPipe(event)) { + discardEvent(event); + return; + } + final ConcurrentHashMap<Long, EventWaitingForAck> eventId2EventMap = eventsWaitingForAck.get(pipeName); if (eventId2EventMap == null) { LOGGER.warn( "The pipe {} was dropped so the event ack {} will be ignored.", pipeName, eventId); + discardEvent(event); return; } eventId2EventMap.put(eventId, new EventWaitingForAck(connector, event)); @@ -410,13 +464,10 @@ public class WebSocketConnectorServer extends WebSocketServer { synchronized (server) { final PriorityBlockingQueue<EventWaitingForTransfer> queue = eventsWaitingForTransfer.get(pipeName); - if (queue == null) { + if (queue == null || isDroppedPipe(event)) { LOGGER.warn( "The pipe {} was dropped so the event {} will be dropped.", pipeName, eventId); - if (event instanceof EnrichedEvent) { - ((EnrichedEvent) event) - .decreaseReferenceCount(WebSocketConnectorServer.class.getName(), false); - } + discardEvent(event); return; } @@ -465,4 +516,44 @@ public class WebSocketConnectorServer extends WebSocketServer { this.event = event; } } + + private boolean discardIfMatches( + final Event event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + if (!(event instanceof EnrichedEvent)) { + return false; + } + + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + if (!pipeNameToDrop.equals(enrichedEvent.getPipeName()) + || creationTimeToDrop != enrichedEvent.getCreationTime() + || regionId != enrichedEvent.getRegionId()) { + return false; + } + + discardEvent(enrichedEvent); + return true; + } + + private boolean isDroppedPipe(final Event event) { + return event instanceof EnrichedEvent + && droppedPipeTaskKeys.contains( + new Triple<>( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId())); + } + + private boolean isQueueAvailable( + final String pipeName, final PriorityBlockingQueue<EventWaitingForTransfer> queue) { + return eventsWaitingForTransfer.get(pipeName) == queue; + } + + private void discardEvent(final Event event) { + if (event instanceof EnrichedEvent) { + ((EnrichedEvent) event).clearReferenceCount(WebSocketSink.class.getName()); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java index c89486bc2c8..40fccc12c99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/websocket/WebSocketSink.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.sink.protocol.websocket; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; @@ -39,7 +40,7 @@ import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Optional; -public class WebSocketSink implements PipeConnector { +public class WebSocketSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketSink.class); @@ -164,6 +165,14 @@ public class WebSocketSink implements PipeConnector { } } + @Override + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + if (server != null) { + server.discardEventsOfPipe(pipeNameToDrop, creationTimeToDrop, regionId); + } + } + public void commit(EnrichedEvent enrichedEvent) { Optional.ofNullable(enrichedEvent) .ifPresent(event -> event.decreaseReferenceCount(WebSocketSink.class.getName(), true)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java index 98163697374..af871feaa7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskLifeCycle.java @@ -63,7 +63,8 @@ public class SubscriptionSinkSubtaskLifeCycle extends PipeSinkSubtaskLifeCycle { } @Override - public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { + public synchronized boolean deregister( + final String pipeNameToDeregister, final long creationTimeToDeregister, final int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java index 07def3ff4d3..f4547673eaa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionSinkSubtaskManager.java @@ -167,7 +167,7 @@ public class SubscriptionSinkSubtaskManager { final PipeSinkSubtaskLifeCycle lifeCycle = attributeSortedString2SubtaskLifeCycleMap.get(attributeSortedString); - if (lifeCycle.deregister(pipeName, regionId)) { + if (lifeCycle.deregister(pipeName, creationTime, regionId)) { attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java new file mode 100644 index 00000000000..029a722c8a9 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollectorTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.connection; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeRealtimePriorityBlockingQueue; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter; +import org.apache.iotdb.pipe.api.event.Event; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +public class PipeEventCollectorTest { + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToUnboundedPendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe( + new UnboundedBlockingPendingQueue<>(new PipeDataRegionEventCounter())); + } + + @Test + public void testCollectorDoesNotOfferEventsOfDroppedPipeToRealtimePendingQueue() { + verifyCollectorDoesNotOfferEventsOfDroppedPipe(new PipeRealtimePriorityBlockingQueue()); + } + + private void verifyCollectorDoesNotOfferEventsOfDroppedPipe( + final UnboundedBlockingPendingQueue<Event> pendingQueue) { + pendingQueue.discardEventsOfPipe("pipe", 1L, 1); + + final PipeEventCollector droppedPipeCollector = + new PipeEventCollector(pendingQueue, 1L, 1, false, false, false); + final PipeRawTabletInsertionEvent droppedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 1L); + droppedPipeCollector.collect(droppedPipeEvent); + + Assert.assertTrue(droppedPipeEvent.isReleased()); + Assert.assertEquals(0, pendingQueue.size()); + + final PipeEventCollector recreatedPipeCollector = + new PipeEventCollector(pendingQueue, 2L, 1, false, false, false); + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L); + recreatedPipeCollector.collect(recreatedPipeEvent); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + Assert.assertEquals(1, pendingQueue.size()); + + pendingQueue.discardAllEvents(); + Assert.assertTrue(recreatedPipeEvent.isReleased()); + } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime) { + final List<IMeasurementSchema> schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d1", schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java new file mode 100644 index 00000000000..ddfc699721b --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.agent.task.subtask.sink; + +import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; +import org.apache.iotdb.commons.pipe.sink.protocol.PipeConnectorWithEventDiscard; +import org.apache.iotdb.pipe.api.PipeConnector; + +import org.junit.Test; +import org.mockito.Mockito; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.withSettings; + +public class PipeSinkSubtaskTest { + + @Test + public void testDiscardEventsOfPipeDelegatesToConnector() { + final PipeConnector connector = + mock( + PipeConnector.class, + withSettings().extraInterfaces(PipeConnectorWithEventDiscard.class)); + final UnboundedBlockingPendingQueue<?> pendingQueue = mock(UnboundedBlockingPendingQueue.class); + + final PipeSinkSubtask subtask = + Mockito.spy( + new PipeSinkSubtask( + "PipeSinkSubtaskTest", + System.currentTimeMillis(), + "data_test", + 0, + (UnboundedBlockingPendingQueue) pendingQueue, + connector)); + + try { + subtask.discardEventsOfPipe("pipe", 1L, 1); + + verify((PipeConnectorWithEventDiscard) connector).discardEventsOfPipe("pipe", 1L, 1); + } finally { + subtask.close(); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index fae64308762..5816caaf65d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment; @@ -28,14 +29,18 @@ import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink; import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketConnectorServer; +import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import java.security.SecureRandom; import java.util.Arrays; @@ -103,6 +108,85 @@ public class PipeSinkTest { } } + @Test + public void testAsyncSinkDropDoesNotRequeueDroppedPipeEvents() throws Exception { + try (final IoTDBDataRegionAsyncSink connector = new IoTDBDataRegionAsyncSink()) { + final PipeParameters parameters = + new PipeParameters( + new HashMap<String, String>() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.IOTDB_THRIFT_ASYNC_CONNECTOR.getPipePluginName()); + put(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6668"); + } + }); + connector.validate(new PipeParameterValidator(parameters)); + connector.customize( + parameters, + new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("pipe", 1L, 1))); + + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent("pipe", 1L, 1); + droppedEvent.increaseReferenceCount("test"); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 1L, 1, -1), 1L); + + connector.discardEventsOfPipe("pipe", 1L, 1); + connector.addFailureEventToRetryQueue(droppedEvent, new PipeException("test")); + + Assert.assertEquals(0, connector.getRetryEventQueueSize()); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent("pipe", 2L, 1); + recreatedPipeEvent.increaseReferenceCount("test"); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey("pipe", 2L, 1, -1), 1L); + + connector.addFailureEventToRetryQueue(recreatedPipeEvent, new PipeException("test")); + + Assert.assertEquals(1, connector.getRetryEventQueueSize()); + } + } + + @Test + public void testWebSocketSinkDropDoesNotRequeueDroppedPipeEvents() { + final String pipeName = "pipe_" + System.nanoTime(); + final WebSocketConnectorServer server = WebSocketConnectorServer.getOrCreateInstance(0); + final WebSocketSink connector = Mockito.mock(WebSocketSink.class); + Mockito.when(connector.getPipeName()).thenReturn(pipeName); + + server.register(connector); + try { + final PipeRawTabletInsertionEvent droppedEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + droppedEvent.increaseReferenceCount(WebSocketSink.class.getName()); + droppedEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 1L, 1, -1), 1L); + server.addEvent(droppedEvent, connector); + + server.discardEventsOfPipe(pipeName, 1L, 1); + Assert.assertTrue(droppedEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedDroppedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 1L, 1); + recreatedDroppedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedDroppedPipeEvent.setCommitterKeyAndCommitId( + new CommitterKey(pipeName, 1L, 1, -1), 2L); + server.addEvent(recreatedDroppedPipeEvent, connector); + + Assert.assertTrue(recreatedDroppedPipeEvent.isReleased()); + + final PipeRawTabletInsertionEvent recreatedPipeEvent = + createPipeRawTabletInsertionEvent(pipeName, 2L, 1); + recreatedPipeEvent.increaseReferenceCount(WebSocketSink.class.getName()); + recreatedPipeEvent.setCommitterKeyAndCommitId(new CommitterKey(pipeName, 2L, 1, -1), 3L); + server.addEvent(recreatedPipeEvent, connector); + + Assert.assertFalse(recreatedPipeEvent.isReleased()); + } finally { + server.unregister(connector); + } + } + @Test public void testOpcUaSink() { final List<MeasurementSchema> schemaList = @@ -181,4 +265,25 @@ public class PipeSinkTest { Assert.fail(); } } + + private PipeRawTabletInsertionEvent createPipeRawTabletInsertionEvent( + final String pipeName, final long creationTime, final int regionId) { + final List<IMeasurementSchema> schemaList = + Arrays.asList(new MeasurementSchema("s1", TSDataType.INT64)); + final Tablet tablet = new Tablet("root.db.d" + regionId, schemaList, 1); + tablet.addTimestamp(0, 1L); + tablet.addValue("s1", 0, 1L); + return new PipeRawTabletInsertionEvent( + false, + "root.db", + "db", + "root.db", + tablet, + false, + pipeName, + creationTime, + null, + null, + false); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java index 8773b03f9f3..8d920121363 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/connection/BlockingPendingQueue.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.task.connection; import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.pipe.datastructure.Triple; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.metric.PipeEventCounter; import org.apache.iotdb.pipe.api.event.Event; @@ -27,7 +28,9 @@ import org.apache.iotdb.pipe.api.event.Event; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -44,6 +47,10 @@ public abstract class BlockingPendingQueue<E extends Event> { protected final AtomicBoolean isClosed = new AtomicBoolean(false); + // Pipe name, creation time, region id + protected final Set<Triple<String, Long, Integer>> droppedPipeTaskKeys = + ConcurrentHashMap.newKeySet(); + protected BlockingPendingQueue( final BlockingQueue<E> pendingQueue, final PipeEventCounter eventCounter) { this.pendingQueue = pendingQueue; @@ -51,7 +58,10 @@ public abstract class BlockingPendingQueue<E extends Event> { } public boolean offer(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } + final boolean offered = pendingQueue.offer(event); if (offered) { eventCounter.increaseEventCount(event); @@ -60,7 +70,9 @@ public abstract class BlockingPendingQueue<E extends Event> { } public boolean put(final E event) { - checkBeforeOffer(event); + if (!checkBeforeOffer(event)) { + return false; + } try { pendingQueue.put(event); eventCounter.increaseEventCount(event); @@ -101,6 +113,7 @@ public abstract class BlockingPendingQueue<E extends Event> { isClosed.set(true); pendingQueue.clear(); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } /** DO NOT FORGET to set eventCounter to new value after invoking this method. */ @@ -120,14 +133,17 @@ public abstract class BlockingPendingQueue<E extends Event> { return true; }); eventCounter.reset(); + droppedPipeTaskKeys.clear(); } - public void discardEventsOfPipe(final String pipeNameToDrop, final int regionId) { + public void discardEventsOfPipe( + final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { + droppedPipeTaskKeys.add(new Triple<>(pipeNameToDrop, creationTimeToDrop, regionId)); pendingQueue.removeIf( event -> { if (event instanceof EnrichedEvent - && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()) - && regionId == ((EnrichedEvent) event).getRegionId()) { + && isEventFromPipe( + ((EnrichedEvent) event), pipeNameToDrop, creationTimeToDrop, regionId)) { if (((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName())) { eventCounter.decreaseEventCount(event); } @@ -157,9 +173,34 @@ public abstract class BlockingPendingQueue<E extends Event> { return eventCounter.getPipeHeartbeatEventCount(); } - protected void checkBeforeOffer(final E event) { - if (isClosed.get() && event instanceof EnrichedEvent) { + protected boolean checkBeforeOffer(final E event) { + final boolean shouldReject = isClosed.get() || isEventFromDroppedPipe(event); + if (shouldReject && event instanceof EnrichedEvent) { ((EnrichedEvent) event).clearReferenceCount(BlockingPendingQueue.class.getName()); } + return !shouldReject; + } + + protected static boolean isEventFromPipe( + final EnrichedEvent event, + final String pipeNameToDrop, + final long creationTimeToDrop, + final int regionId) { + return pipeNameToDrop.equals(event.getPipeName()) + && creationTimeToDrop == event.getCreationTime() + && regionId == event.getRegionId(); + } + + protected boolean isEventFromDroppedPipe(final E event) { + return event instanceof EnrichedEvent + && ((EnrichedEvent) event).getPipeName() != null + && isPipeDropped( + ((EnrichedEvent) event).getPipeName(), + ((EnrichedEvent) event).getCreationTime(), + ((EnrichedEvent) event).getRegionId()); + } + + public boolean isPipeDropped(final String pipeName, final long creationTime, final int regionId) { + return droppedPipeTaskKeys.contains(new Triple<>(pipeName, creationTime, regionId)); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java new file mode 100644 index 00000000000..275ccb20ea3 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/Triple.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.datastructure; + +import java.util.Objects; + +public class Triple<L, M, R> { + public final L first; + public final M second; + public final R third; + + public Triple(final L first, final M second, final R third) { + this.first = first; + this.second = second; + this.third = third; + } + + public L getFirst() { + return first; + } + + public M getSecond() { + return second; + } + + public R getThird() { + return third; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Triple<?, ?, ?> triple = (Triple<?, ?, ?>) o; + return first.equals(triple.first) && second.equals(triple.second) && third.equals(triple.third); + } + + @Override + public int hashCode() { + return Objects.hash(first, second, third); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 66f3eda6a1a..2f2d54e7b4f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -139,7 +139,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_RATE_LIMIT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_SKIP_IF_KEY; -public abstract class IoTDBSink implements PipeConnector { +public abstract class IoTDBSink implements PipeConnector, PipeConnectorWithEventDiscard { private static final String PARSE_URL_ERROR_FORMATTER = "Exception occurred while parsing node urls from target servers: {}"; @@ -621,7 +621,8 @@ public abstract class IoTDBSink implements PipeConnector { * When a pipe is dropped, the connector maybe reused and will not be closed. We need to discard * its batched or queued events in the output pipe connector. */ - public synchronized void discardEventsOfPipe(final String pipeName, final int regionId) { + public synchronized void discardEventsOfPipe( + final String pipeName, final long creationTime, final int regionId) { // Do nothing by default } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java new file mode 100644 index 00000000000..ab4dbcf9075 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/PipeConnectorWithEventDiscard.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.protocol; + +public interface PipeConnectorWithEventDiscard { + + void discardEventsOfPipe(String pipeName, long creationTime, int regionId); +}
