This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new ffad66625e3 Pipe: discard batched events before restarting pipes
(#13284)
ffad66625e3 is described below
commit ffad66625e34cc23080f5f44676382a60d106a51
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Aug 26 11:49:38 2024 +0800
Pipe: discard batched events before restarting pipes (#13284)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../payload/evolvable/batch/PipeTabletEventBatch.java | 19 ++++++++++++++++++-
.../evolvable/batch/PipeTransferBatchReqBuilder.java | 5 +++++
.../thrift/async/IoTDBDataRegionAsyncConnector.java | 6 ++----
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 5 +++++
.../task/subtask/connector/PipeConnectorSubtask.java | 5 +++--
.../connector/PipeConnectorSubtaskLifeCycle.java | 2 +-
.../pipe/connector/protocol/IoTDBConnector.java | 8 ++++++++
7 files changed, 42 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index db5ba85d7ba..41135281339 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -115,7 +116,23 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
events.clear();
}
- public void decreaseEventsReferenceCount(final String holderMessage, final
boolean shouldReport) {
+ /**
+ * Discard all events of the given pipe. This method only clears the
reference count of the events
+ * and discard them, but do not modify other objects (such as buffers) for
simplicity.
+ */
+ public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+ events.removeIf(
+ event -> {
+ if (pipeNameToDrop.equals(event.getPipeName())) {
+
event.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
+ return true;
+ }
+ return false;
+ });
+ }
+
+ public synchronized void decreaseEventsReferenceCount(
+ final String holderMessage, final boolean shouldReport) {
events.forEach(event -> event.decreaseReferenceCount(holderMessage,
shouldReport));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
index f28f19e4da1..77fd73f8aba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTransferBatchReqBuilder.java
@@ -184,6 +184,11 @@ public class PipeTransferBatchReqBuilder implements
AutoCloseable {
&&
endPointToBatch.values().stream().allMatch(PipeTabletEventPlainBatch::isEmpty);
}
+ public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+ defaultBatch.discardEventsOfPipe(pipeNameToDrop);
+ endPointToBatch.values().forEach(batch ->
batch.discardEventsOfPipe(pipeNameToDrop));
+ }
+
@Override
public synchronized void close() {
defaultBatch.close();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 43c5fa6eff2..3f37b247869 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -515,11 +515,9 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
//////////////////////////// Operations for close
////////////////////////////
- /**
- * When a pipe is dropped, the connector maybe reused and will not be
closed. So we just discard
- * its queued events in the output pipe connector.
- */
+ @Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index fca40f21d64..a92790c5e80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -492,6 +492,11 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
LOGGER.info("Successfully transferred file {}.", tsFile);
}
+ @Override
+ public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
+ tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
+ }
+
@Override
public void close() {
if (tabletBatchBuilder != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index cc78ebdea54..a832d43f73e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
@@ -268,8 +269,8 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
}
}
- if (outputPipeConnector instanceof IoTDBDataRegionAsyncConnector) {
- ((IoTDBDataRegionAsyncConnector)
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
+ if (outputPipeConnector instanceof IoTDBConnector) {
+ ((IoTDBConnector)
outputPipeConnector).discardEventsOfPipe(pipeNameToDrop);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index a28a0289b08..e3d4d35a171 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -89,7 +89,7 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
* the {@link PipeConnectorSubtask} should never be used again
* @throws IllegalStateException if {@link
PipeConnectorSubtaskLifeCycle#registeredTaskCount} <= 0
*/
- public synchronized boolean deregister(String pipeNameToDeregister) {
+ public synchronized boolean deregister(final String pipeNameToDeregister) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
index 9dc08f93402..55231032e5a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java
@@ -452,6 +452,14 @@ public abstract class IoTDBConnector implements
PipeConnector {
GLOBAL_RATE_LIMITER.acquire(bytesLength);
}
+ /**
+ * When a pipe is dropped, the connector maybe reused and will not be
closed. We need to discard
+ * its batched or queued events in the output pipe connector.
+ */
+ public synchronized void discardEventsOfPipe(final String pipeName) {
+ // Do nothing by default
+ }
+
public PipeReceiverStatusHandler statusHandler() {
return receiverStatusHandler;
}