This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 84a6c27a3a1 Pipe: Fix reference count leak when tasks restart (#13250)
(#13351)
84a6c27a3a1 is described below
commit 84a6c27a3a18abf40e460b0585635ed6aa8f3622
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Aug 30 10:27:25 2024 +0800
Pipe: Fix reference count leak when tasks restart (#13250) (#13351)
(cherry picked from commit b7369ee2e17fd90e9b03457506461299be4d64ab)
---
.../protocol/IoTDBConfigRegionAirGapConnector.java | 20 ++--
.../protocol/IoTDBConfigRegionConnector.java | 20 ++--
.../pipe/execution/PipeConfigNodeSubtask.java | 14 ++-
.../db/pipe/agent/task/PipeDataNodeTaskAgent.java | 97 ++++++----------
.../evolvable/batch/PipeTabletEventBatch.java | 7 +-
.../airgap/IoTDBDataNodeAirGapConnector.java | 10 +-
.../airgap/IoTDBDataRegionAirGapConnector.java | 30 ++---
.../airgap/IoTDBSchemaRegionAirGapConnector.java | 10 +-
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 30 ++---
.../connector/protocol/opcua/OpcUaConnector.java | 18 +--
.../pipeconsensus/PipeConsensusAsyncConnector.java | 4 -
.../pipeconsensus/PipeConsensusSyncConnector.java | 10 +-
.../async/IoTDBDataRegionAsyncConnector.java | 6 -
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 10 +-
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 30 ++---
.../thrift/sync/IoTDBSchemaRegionConnector.java | 10 +-
.../protocol/websocket/WebSocketConnector.java | 9 +-
.../protocol/writeback/WriteBackConnector.java | 18 +--
.../PipeHistoricalDataRegionTsFileExtractor.java | 42 ++++---
.../realtime/assigner/PipeDataRegionAssigner.java | 25 +++-
.../db/pipe/metric/PipeDataRegionEventCounter.java | 12 +-
.../pipe/task/connection/PipeEventCollector.java | 17 ++-
.../subtask/connector/PipeConnectorSubtask.java | 34 ++----
.../PipeRealtimePriorityBlockingQueue.java | 34 +++++-
.../subtask/processor/PipeProcessorSubtask.java | 9 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 10 ++
.../iotdb/commons/conf/CommonDescriptor.java | 5 +
.../iotdb/commons/pipe/config/PipeConfig.java | 7 ++
.../iotdb/commons/pipe/event/EnrichedEvent.java | 128 +++++++++++++--------
.../commons/pipe/metric/PipeEventCounter.java | 6 +-
.../pipe/task/connection/BlockingPendingQueue.java | 32 +++++-
.../task/subtask/PipeAbstractConnectorSubtask.java | 5 +-
.../pipe/task/subtask/PipeReportableSubtask.java | 2 +-
.../commons/pipe/task/subtask/PipeSubtask.java | 31 ++++-
34 files changed, 447 insertions(+), 305 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
index 0d466605d8f..e615616e2ba 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionAirGapConnector.java
@@ -137,12 +137,12 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
final AirGapSocket socket,
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
+ IoTDBConfigRegionAirGapConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
- IoTDBConfigRegionAirGapConnector.class.getName())) {
- return;
- }
doTransfer(socket, pipeConfigRegionWritePlanEvent);
} finally {
pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
@@ -178,12 +178,12 @@ public class IoTDBConfigRegionAirGapConnector extends
IoTDBAirGapConnector {
private void doTransferWrapper(
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
+ IoTDBConfigRegionAirGapConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
- IoTDBConfigRegionAirGapConnector.class.getName())) {
- return;
- }
doTransfer(socket, pipeConfigRegionSnapshotEvent);
} finally {
pipeConfigRegionSnapshotEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
index 1e2f3c19e5a..3e4ca8ac9c5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/connector/protocol/IoTDBConfigRegionConnector.java
@@ -114,12 +114,12 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
private void doTransferWrapper(
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
+ IoTDBConfigRegionConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeConfigRegionWritePlanEvent.increaseReferenceCount(
- IoTDBConfigRegionConnector.class.getName())) {
- return;
- }
doTransfer(pipeConfigRegionWritePlanEvent);
} finally {
pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
@@ -175,12 +175,12 @@ public class IoTDBConfigRegionConnector extends
IoTDBSslSyncConnector {
private void doTransferWrapper(final PipeConfigRegionSnapshotEvent
pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
+ IoTDBConfigRegionConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
- IoTDBConfigRegionConnector.class.getName())) {
- return;
- }
doTransfer(pipeConfigRegionSnapshotEvent);
} finally {
pipeConfigRegionSnapshotEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index 02cb5373359..cfd16480f62 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -181,10 +182,11 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
return false;
}
- outputPipeConnector.transfer(event);
- decreaseReferenceCountAndReleaseLastEvent(true);
-
- PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
+ if (!(event instanceof ProgressReportEvent)) {
+ outputPipeConnector.transfer(event);
+ PipeConfigRegionConnectorMetrics.getInstance().markConfigEvent(taskID);
+ }
+ decreaseReferenceCountAndReleaseLastEvent(event, true);
} catch (final PipeException e) {
setLastExceptionEvent(event);
if (!isClosed.get()) {
@@ -194,7 +196,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
"{} in pipe transfer, ignored because pipe is dropped.",
e.getClass().getSimpleName(),
e);
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(event);
}
} catch (final Exception e) {
setLastExceptionEvent(event);
@@ -205,7 +207,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
e);
} else {
LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 888f7c50c96..8a73a19684f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -83,6 +83,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@@ -92,6 +93,9 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
protected static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final AtomicLong LAST_FORCED_RESTART_TIME =
+ new AtomicLong(System.currentTimeMillis());
+
////////////////////////// Pipe Task Management Entry
//////////////////////////
@Override
@@ -475,18 +479,32 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
if (!tryWriteLockWithTimeOut(5)) {
return;
}
+
+ final Set<PipeMeta> stuckPipes;
try {
- restartAllStuckPipesInternal();
+ stuckPipes = findAllStuckPipes();
} finally {
releaseWriteLock();
}
+
+ // Restart all stuck pipes
+ stuckPipes.parallelStream().forEach(this::restartStuckPipe);
}
- private void restartAllStuckPipesInternal() {
+ private Set<PipeMeta> findAllStuckPipes() {
+ final Set<PipeMeta> stuckPipes = new HashSet<>();
+
+ if (System.currentTimeMillis() - LAST_FORCED_RESTART_TIME.get()
+ >
PipeConfig.getInstance().getPipeSubtaskExecutorForcedRestartIntervalMs()) {
+ LAST_FORCED_RESTART_TIME.set(System.currentTimeMillis());
+ for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
+ stuckPipes.add(pipeMeta);
+ }
+ return stuckPipes;
+ }
+
final Map<String, IoTDBDataRegionExtractor> taskId2ExtractorMap =
PipeDataRegionExtractorMetrics.getInstance().getExtractorMap();
-
- final Set<PipeMeta> stuckPipes = new HashSet<>();
for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) {
final String pipeName = pipeMeta.getStaticMeta().getPipeName();
final List<IoTDBDataRegionExtractor> extractors =
@@ -525,8 +543,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
}
}
- // Restart all stuck pipes
- stuckPipes.parallelStream().forEach(this::restartStuckPipe);
+ return stuckPipes;
}
private boolean mayDeletedTsFileSizeReachDangerousThreshold() {
@@ -565,59 +582,21 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
private void restartStuckPipe(final PipeMeta pipeMeta) {
LOGGER.warn("Pipe {} will be restarted because of stuck.",
pipeMeta.getStaticMeta());
- final long startTime = System.currentTimeMillis();
- changePipeStatusBeforeRestart(pipeMeta.getStaticMeta().getPipeName());
- handleSinglePipeMetaChangesInternal(pipeMeta);
- LOGGER.warn(
- "Pipe {} was restarted because of stuck, time cost: {} ms.",
- pipeMeta.getStaticMeta(),
- System.currentTimeMillis() - startTime);
- }
-
- private void changePipeStatusBeforeRestart(final String pipeName) {
- final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
- final Map<Integer, PipeTask> pipeTasks =
pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta());
- final Set<Integer> taskRegionIds = new HashSet<>(pipeTasks.keySet());
- final Set<Integer> dataRegionIds =
- StorageEngine.getInstance().getAllDataRegionIds().stream()
- .map(DataRegionId::getId)
- .collect(Collectors.toSet());
- final Set<PipeTask> dataRegionPipeTasks =
- taskRegionIds.stream()
- .filter(dataRegionIds::contains)
- .map(regionId ->
pipeTaskManager.removePipeTask(pipeMeta.getStaticMeta(), regionId))
- .filter(Objects::nonNull)
- .collect(Collectors.toSet());
-
- // Drop data region tasks
- dataRegionPipeTasks.parallelStream().forEach(PipeTask::drop);
-
- // Stop schema region tasks
-
pipeTaskManager.getPipeTasks(pipeMeta.getStaticMeta()).values().parallelStream()
- .forEach(PipeTask::stop);
-
- // Re-create data region tasks
- dataRegionPipeTasks.parallelStream()
- .forEach(
- pipeTask -> {
- final PipeTask newPipeTask =
- new PipeDataNodeTaskBuilder(
- pipeMeta.getStaticMeta(),
- ((PipeDataNodeTask) pipeTask).getRegionId(),
- pipeMeta
- .getRuntimeMeta()
- .getConsensusGroupId2TaskMetaMap()
- .get(((PipeDataNodeTask)
pipeTask).getRegionId()))
- .build();
- newPipeTask.create();
- pipeTaskManager.addPipeTask(
- pipeMeta.getStaticMeta(),
- ((PipeDataNodeTask) pipeTask).getRegionId(),
- newPipeTask);
- });
-
- // Set pipe meta status to STOPPED
- pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.STOPPED);
+ acquireWriteLock();
+ try {
+ final long startTime = System.currentTimeMillis();
+ final PipeMeta originalPipeMeta = pipeMeta.deepCopy();
+ handleDropPipe(pipeMeta.getStaticMeta().getPipeName());
+ handleSinglePipeMetaChanges(originalPipeMeta);
+ LOGGER.warn(
+ "Pipe {} was restarted because of stuck, time cost: {} ms.",
+ originalPipeMeta.getStaticMeta(),
+ System.currentTimeMillis() - startTime);
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to restart stuck pipe {}.",
pipeMeta.getStaticMeta(), e);
+ } finally {
+ releaseWriteLock();
+ }
}
///////////////////////// Terminate Logic /////////////////////////
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index bae98a9efa2..75d4b2f9e3c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -34,6 +36,8 @@ import java.util.Objects;
public abstract class PipeTabletEventBatch implements AutoCloseable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTabletEventBatch.class);
+
protected final List<EnrichedEvent> events = new ArrayList<>();
private final int maxDelayInMs;
@@ -74,8 +78,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
firstEventProcessingTime = System.currentTimeMillis();
}
} else {
- ((EnrichedEvent) event)
-
.decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false);
+ LOGGER.warn("Cannot increase reference count for event: {}, ignore it
in batch.", event);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 221fa516448..33be8e002f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -113,12 +113,12 @@ public abstract class IoTDBDataNodeAirGapConnector
extends IoTDBAirGapConnector
final AirGapSocket socket,
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
+ IoTDBDataNodeAirGapConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
- IoTDBDataNodeAirGapConnector.class.getName())) {
- return;
- }
doTransfer(socket, pipeSchemaRegionWritePlanEvent);
} finally {
pipeSchemaRegionWritePlanEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index 9874741c7f0..7b6c4376d0a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -150,12 +150,12 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
final AirGapSocket socket,
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAirGapConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionAirGapConnector.class.getName())) {
- return;
- }
doTransfer(socket, pipeInsertNodeTabletInsertionEvent);
} finally {
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -195,12 +195,12 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
private void doTransferWrapper(
final AirGapSocket socket, final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAirGapConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionAirGapConnector.class.getName())) {
- return;
- }
doTransfer(socket, pipeRawTabletInsertionEvent);
} finally {
pipeRawTabletInsertionEvent.decreaseReferenceCount(
@@ -233,12 +233,12 @@ public class IoTDBDataRegionAirGapConnector extends
IoTDBDataNodeAirGapConnector
private void doTransferWrapper(
final AirGapSocket socket, final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAirGapConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeTsFileInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionAirGapConnector.class.getName())) {
- return;
- }
doTransfer(socket, pipeTsFileInsertionEvent);
} finally {
pipeTsFileInsertionEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
index e53423779d0..342807a5dfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBSchemaRegionAirGapConnector.java
@@ -86,12 +86,12 @@ public class IoTDBSchemaRegionAirGapConnector extends
IoTDBDataNodeAirGapConnect
private void doTransferWrapper(
final AirGapSocket socket, final PipeSchemaRegionSnapshotEvent
pipeSchemaRegionSnapshotEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
+ IoTDBSchemaRegionAirGapConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
- IoTDBSchemaRegionAirGapConnector.class.getName())) {
- return;
- }
doTransfer(socket, pipeSchemaRegionSnapshotEvent);
} finally {
pipeSchemaRegionSnapshotEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 5edf485adfb..62a1f040624 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -313,12 +313,12 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
private void doTransferWrapper(
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeInsertionEvent)
throws IoTDBConnectionException, StatementExecutionException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeInsertionEvent.increaseReferenceCount(
+ IoTDBLegacyPipeConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeInsertionEvent.increaseReferenceCount(
- IoTDBLegacyPipeConnector.class.getName())) {
- return;
- }
doTransfer(pipeInsertNodeInsertionEvent);
} finally {
pipeInsertNodeInsertionEvent.decreaseReferenceCount(
@@ -344,12 +344,12 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
private void doTransferWrapper(final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException, IoTDBConnectionException,
StatementExecutionException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+ IoTDBLegacyPipeConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
- IoTDBLegacyPipeConnector.class.getName())) {
- return;
- }
doTransfer(pipeRawTabletInsertionEvent);
} finally {
pipeRawTabletInsertionEvent.decreaseReferenceCount(
@@ -369,12 +369,12 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
private void doTransferWrapper(final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
throws PipeException, TException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+ IoTDBLegacyPipeConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeTsFileInsertionEvent.increaseReferenceCount(
- IoTDBLegacyPipeConnector.class.getName())) {
- return;
- }
doTransfer(pipeTsFileInsertionEvent);
} finally {
pipeTsFileInsertionEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
index 390b2fe73bb..1885f181677 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/opcua/OpcUaConnector.java
@@ -204,12 +204,12 @@ public class OpcUaConnector implements PipeConnector {
private void transferTabletWrapper(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws UaException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ OpcUaConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
- OpcUaConnector.class.getName())) {
- return;
- }
for (final Tablet tablet :
pipeInsertNodeTabletInsertionEvent.convertToTablets()) {
nameSpace.transfer(tablet);
}
@@ -221,11 +221,11 @@ public class OpcUaConnector implements PipeConnector {
private void transferTabletWrapper(final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws UaException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName()))
{
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if
(!pipeRawTabletInsertionEvent.increaseReferenceCount(OpcUaConnector.class.getName()))
{
- return;
- }
nameSpace.transfer(pipeRawTabletInsertionEvent.convertToTablet());
} finally {
pipeRawTabletInsertionEvent.decreaseReferenceCount(OpcUaConnector.class.getName(),
false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 85ba921e24d..e4489799652 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -274,8 +274,6 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
PipeConsensusAsyncConnector.class.getName())) {
- pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
- PipeConsensusAsyncConnector.class.getName(), false);
return;
}
@@ -354,8 +352,6 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeTsFileInsertionEvent.increaseReferenceCount(
PipeConsensusAsyncConnector.class.getName())) {
- pipeTsFileInsertionEvent.decreaseReferenceCount(
- PipeConsensusAsyncConnector.class.getName(), false);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
index ad7ae47b302..3d274812091 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusSyncConnector.java
@@ -224,12 +224,12 @@ public class PipeConsensusSyncConnector extends
IoTDBConnector {
private void doTransferWrapper(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ PipeConsensusSyncConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
- PipeConsensusSyncConnector.class.getName())) {
- return;
- }
doTransfer(pipeInsertNodeTabletInsertionEvent);
} finally {
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 083f12485ba..5e13c4e0243 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -214,8 +214,6 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
IoTDBDataRegionAsyncConnector.class.getName())) {
- pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
- IoTDBDataRegionAsyncConnector.class.getName(), false);
return;
}
@@ -240,8 +238,6 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
IoTDBDataRegionAsyncConnector.class.getName())) {
- pipeRawTabletInsertionEvent.decreaseReferenceCount(
- IoTDBDataRegionAsyncConnector.class.getName(), false);
return;
}
@@ -318,8 +314,6 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
// We increase the reference count for this event to determine if the
event may be released.
if (!pipeTsFileInsertionEvent.increaseReferenceCount(
IoTDBDataRegionAsyncConnector.class.getName())) {
- pipeTsFileInsertionEvent.decreaseReferenceCount(
- IoTDBDataRegionAsyncConnector.class.getName(), false);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 9580346d25b..294e69a769a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -104,12 +104,12 @@ public abstract class IoTDBDataNodeSyncConnector extends
IoTDBSslSyncConnector {
protected void doTransferWrapper(
final PipeSchemaRegionWritePlanEvent pipeSchemaRegionWritePlanEvent)
throws PipeException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
+ IoTDBDataNodeSyncConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeSchemaRegionWritePlanEvent.increaseReferenceCount(
- IoTDBDataNodeSyncConnector.class.getName())) {
- return;
- }
doTransfer(pipeSchemaRegionWritePlanEvent);
} finally {
pipeSchemaRegionWritePlanEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index a92790c5e80..772a55b0e17 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -268,12 +268,12 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
private void doTransferWrapper(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionSyncConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionSyncConnector.class.getName())) {
- return;
- }
doTransfer(pipeInsertNodeTabletInsertionEvent);
} finally {
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -336,12 +336,12 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
private void doTransferWrapper(final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionSyncConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionSyncConnector.class.getName())) {
- return;
- }
doTransfer(pipeRawTabletInsertionEvent);
} finally {
pipeRawTabletInsertionEvent.decreaseReferenceCount(
@@ -395,12 +395,12 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
private void doTransferWrapper(final PipeTsFileInsertionEvent
pipeTsFileInsertionEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionSyncConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeTsFileInsertionEvent.increaseReferenceCount(
- IoTDBDataRegionSyncConnector.class.getName())) {
- return;
- }
doTransfer(
Collections.singletonMap(
new Pair<>(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
index 878430e027b..f70e18c0651 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBSchemaRegionConnector.java
@@ -74,12 +74,12 @@ public class IoTDBSchemaRegionConnector extends
IoTDBDataNodeSyncConnector {
private void doTransferWrapper(final PipeSchemaRegionSnapshotEvent
pipeSchemaRegionSnapshotEvent)
throws PipeException, IOException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
+ IoTDBSchemaRegionConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeSchemaRegionSnapshotEvent.increaseReferenceCount(
- IoTDBSchemaRegionConnector.class.getName())) {
- return;
- }
doTransfer(pipeSchemaRegionSnapshotEvent);
} finally {
pipeSchemaRegionSnapshotEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
index e1e5a5dd44c..cf258452346 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java
@@ -118,8 +118,13 @@ public class WebSocketConnector implements PipeConnector {
return;
}
- ((EnrichedEvent) tabletInsertionEvent)
- .increaseReferenceCount(WebSocketConnector.class.getName());
+ if (!((EnrichedEvent) tabletInsertionEvent)
+ .increaseReferenceCount(WebSocketConnector.class.getName())) {
+ LOGGER.warn(
+ "WebsocketConnector failed to increase the reference count of the
event. Ignore it. Current event: {}.",
+ tabletInsertionEvent);
+ return;
+ }
server.addEvent(tabletInsertionEvent, this);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
index 68789bd3ef5..7889b778d86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java
@@ -111,12 +111,12 @@ public class WriteBackConnector implements PipeConnector {
private void doTransferWrapper(
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ WriteBackConnector.class.getName())) {
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
- WriteBackConnector.class.getName())) {
- return;
- }
doTransfer(pipeInsertNodeTabletInsertionEvent);
} finally {
pipeInsertNodeTabletInsertionEvent.decreaseReferenceCount(
@@ -155,11 +155,11 @@ public class WriteBackConnector implements PipeConnector {
private void doTransferWrapper(final PipeRawTabletInsertionEvent
pipeRawTabletInsertionEvent)
throws PipeException {
+ // We increase the reference count for this event to determine if the
event may be released.
+ if
(!pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName()))
{
+ return;
+ }
try {
- // We increase the reference count for this event to determine if the
event may be released.
- if
(!pipeRawTabletInsertionEvent.increaseReferenceCount(WriteBackConnector.class.getName()))
{
- return;
- }
doTransfer(pipeRawTabletInsertionEvent);
} finally {
pipeRawTabletInsertionEvent.decreaseReferenceCount(WriteBackConnector.class.getName(),
false);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index ca1888b0200..16f6c6b7d91 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -574,11 +574,17 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
final TsFileResource resource = pendingQueue.poll();
if (resource == null) {
- isTerminateSignalSent = true;
final PipeTerminateEvent terminateEvent =
new PipeTerminateEvent(pipeName, creationTime, pipeTaskMeta,
dataRegionId);
- terminateEvent.increaseReferenceCount(
- PipeHistoricalDataRegionTsFileExtractor.class.getName());
+ if (!terminateEvent.increaseReferenceCount(
+ PipeHistoricalDataRegionTsFileExtractor.class.getName())) {
+ LOGGER.warn(
+ "Pipe {}@{}: failed to increase reference count for terminate
event, will resend it",
+ pipeName,
+ dataRegionId);
+ return null;
+ }
+ isTerminateSignalSent = true;
return terminateEvent;
}
@@ -603,18 +609,28 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
event.skipParsingTime();
}
-
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
try {
- PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
- } catch (final IOException e) {
- LOGGER.warn(
- "Pipe {}@{}: failed to unpin TsFileResource after creating event,
original path: {}",
- pipeName,
- dataRegionId,
- resource.getTsFilePath());
+ final boolean isReferenceCountIncreased =
+
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileExtractor.class.getName());
+ if (!isReferenceCountIncreased) {
+ LOGGER.warn(
+ "Pipe {}@{}: failed to increase reference count for historical
event {}, will discard it",
+ pipeName,
+ dataRegionId,
+ event);
+ }
+ return isReferenceCountIncreased ? event : null;
+ } finally {
+ try {
+ PipeDataNodeResourceManager.tsfile().unpinTsFileResource(resource);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ "Pipe {}@{}: failed to unpin TsFileResource after creating event,
original path: {}",
+ pipeName,
+ dataRegionId,
+ resource.getTsFilePath());
+ }
}
-
- return event;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 762577d2f84..c36a8877dff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -32,10 +32,15 @@ import
org.apache.iotdb.db.pipe.pattern.CachedSchemaPatternMatcher;
import org.apache.iotdb.db.pipe.pattern.PipeDataRegionMatcher;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.Closeable;
public class PipeDataRegionAssigner implements Closeable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataRegionAssigner.class);
+
private static final int nonForwardingEventsProgressReportInterval =
PipeConfig.getInstance().getPipeNonForwardingEventsProgressReportInterval();
@@ -64,7 +69,11 @@ public class PipeDataRegionAssigner implements Closeable {
}
public void publishToAssign(final PipeRealtimeEvent event) {
- event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+ if (!event.increaseReferenceCount(PipeDataRegionAssigner.class.getName()))
{
+ LOGGER.warn(
+ "The reference count of the realtime event {} cannot be increased,
skipping it.", event);
+ return;
+ }
disruptor.publish(event);
@@ -99,7 +108,12 @@ public class PipeDataRegionAssigner implements Closeable {
extractor.getRealtimeDataExtractionStartTime(),
extractor.getRealtimeDataExtractionEndTime());
reportEvent.bindProgressIndex(event.getProgressIndex());
-
reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+ if
(!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
+ LOGGER.warn(
+ "The reference count of the event {} cannot be
increased, skipping it.",
+ reportEvent);
+ return;
+ }
extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent));
return;
}
@@ -118,7 +132,12 @@ public class PipeDataRegionAssigner implements Closeable {
.disableMod4NonTransferPipes(extractor.isShouldTransferModFile());
}
-
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+ if
(!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
+ LOGGER.warn(
+ "The reference count of the event {} cannot be increased,
skipping it.",
+ copiedEvent);
+ return;
+ }
extractor.extract(copiedEvent);
if (innerEvent instanceof PipeHeartbeatEvent) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
index 0aed61801ef..b1405628f37 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataRegionEventCounter.java
@@ -36,17 +36,17 @@ public class PipeDataRegionEventCounter extends
PipeEventCounter {
private final AtomicInteger pipeHeartbeatEventCount = new AtomicInteger(0);
@Override
- public Integer getTsFileInsertionEventCount() {
+ public int getTsFileInsertionEventCount() {
return tsFileInsertionEventCount.get();
}
@Override
- public Integer getTabletInsertionEventCount() {
+ public int getTabletInsertionEventCount() {
return tabletInsertionEventCount.get();
}
@Override
- public Integer getPipeHeartbeatEventCount() {
+ public int getPipeHeartbeatEventCount() {
return pipeHeartbeatEventCount.get();
}
@@ -76,11 +76,11 @@ public class PipeDataRegionEventCounter extends
PipeEventCounter {
return;
}
if (event instanceof PipeHeartbeatEvent) {
- pipeHeartbeatEventCount.decrementAndGet();
+ pipeHeartbeatEventCount.getAndUpdate(count -> count > 0 ? count - 1 : 0);
} else if (event instanceof TabletInsertionEvent) {
- tabletInsertionEventCount.decrementAndGet();
+ tabletInsertionEventCount.getAndUpdate(count -> count > 0 ? count - 1 :
0);
} else if (event instanceof TsFileInsertionEvent) {
- tsFileInsertionEventCount.decrementAndGet();
+ tsFileInsertionEventCount.getAndUpdate(count -> count > 0 ? count - 1 :
0);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index c4c91f80f58..de0ede99623 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -56,6 +56,7 @@ public class PipeEventCollector implements EventCollector {
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
private boolean hasNoGeneratedEvent = true;
+ private boolean isFailedToIncreaseReferenceCount = false;
public PipeEventCollector(
final UnboundedBlockingPendingQueue<Event> pendingQueue,
@@ -162,10 +163,12 @@ public class PipeEventCollector implements EventCollector
{
}
private void collectEvent(final Event event) {
- collectInvocationCount.incrementAndGet();
-
if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).increaseReferenceCount(PipeEventCollector.class.getName());
+ if (!((EnrichedEvent)
event).increaseReferenceCount(PipeEventCollector.class.getName())) {
+ LOGGER.warn("PipeEventCollector: The event {} is already released,
skipping it.", event);
+ isFailedToIncreaseReferenceCount = true;
+ return;
+ }
// Assign a commit id for this event in order to report progress in
order.
PipeEventCommitManager.getInstance()
@@ -180,11 +183,13 @@ public class PipeEventCollector implements EventCollector
{
}
pendingQueue.directOffer(event);
+ collectInvocationCount.incrementAndGet();
}
- public void resetCollectInvocationCountAndGenerateFlag() {
+ public void resetFlags() {
collectInvocationCount.set(0);
hasNoGeneratedEvent = true;
+ isFailedToIncreaseReferenceCount = false;
}
public long getCollectInvocationCount() {
@@ -198,4 +203,8 @@ public class PipeEventCollector implements EventCollector {
public boolean hasNoGeneratedEvent() {
return hasNoGeneratedEvent;
}
+
+ public boolean isFailedToIncreaseReferenceCount() {
+ return isFailedToIncreaseReferenceCount;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index a832d43f73e..d55f18f9838 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -32,7 +32,6 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionConnectorMetrics;
import org.apache.iotdb.db.pipe.metric.PipeSchemaRegionConnectorMetrics;
-import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -100,6 +99,10 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
: UserDefinedEnrichedEvent.maybeOf(inputPendingQueue.waitedPoll());
// Record this event for retrying on connection failure or other exceptions
setLastEvent(event);
+ if (event instanceof EnrichedEvent && ((EnrichedEvent)
event).isReleased()) {
+ lastEvent = null;
+ return true;
+ }
try {
if (event == null) {
@@ -133,7 +136,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
: event);
}
- decreaseReferenceCountAndReleaseLastEvent(true);
+ decreaseReferenceCountAndReleaseLastEvent(event, true);
} catch (final PipeException e) {
if (!isClosed.get()) {
setLastExceptionEvent(event);
@@ -143,7 +146,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
"{} in pipe transfer, ignored because the connector subtask is
dropped.",
e.getClass().getSimpleName(),
e);
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(event);
}
} catch (final Exception e) {
if (!isClosed.get()) {
@@ -160,7 +163,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
} else {
LOGGER.info(
"Exception in pipe transfer, ignored because the connector subtask
is dropped.", e);
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(event);
}
}
@@ -204,13 +207,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
ErrorHandlingUtils.getRootCause(e).getMessage(),
e);
} finally {
- inputPendingQueue.forEach(
- event -> {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).clearReferenceCount(PipeEventCollector.class.getName());
- }
- });
- inputPendingQueue.clear();
+ inputPendingQueue.discardAllEvents();
// Should be called after outputPipeConnector.close()
super.close();
@@ -223,18 +220,9 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
*/
public void discardEventsOfPipe(final String pipeNameToDrop) {
// Try to remove the events as much as possible
- inputPendingQueue.removeIf(
- event -> {
- if (event instanceof EnrichedEvent
- && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()))
{
- ((EnrichedEvent) event)
-
.clearReferenceCount(IoTDBDataRegionAsyncConnector.class.getName());
- return true;
- }
- return false;
- });
-
- // synchronized to use the lastEvent and lastExceptionEvent
+ inputPendingQueue.discardEventsOfPipe(pipeNameToDrop);
+
+ // synchronized to use the lastEvent & lastExceptionEvent
synchronized (this) {
// Here we discard the last event, and re-submit the pipe task to avoid
that the pipe task has
// stopped submission but will not be stopped by critical exceptions,
because when it acquires
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
index a169cb6a338..6b33c248bfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.pipe.task.connection.BlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
@@ -33,7 +34,6 @@ import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
-import java.util.function.Predicate;
public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQueue<Event> {
@@ -153,9 +153,35 @@ public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQ
}
@Override
- public void removeIf(final Predicate<? super Event> filter) {
- super.removeIf(filter);
- pendingQueue.removeIf(filter);
+ public void discardAllEvents() {
+ super.discardAllEvents();
+ tsfileInsertEventDeque.removeIf(
+ event -> {
+ if (event instanceof EnrichedEvent) {
+ if (((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
+ eventCounter.decreaseEventCount(event);
+ }
+ }
+ return true;
+ });
+ eventCounter.reset();
+ }
+
+ @Override
+ public void discardEventsOfPipe(final String pipeNameToDrop) {
+ super.discardEventsOfPipe(pipeNameToDrop);
+ tsfileInsertEventDeque.removeIf(
+ event -> {
+ if (event instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()))
{
+ if (((EnrichedEvent) event)
+
.clearReferenceCount(PipeRealtimePriorityBlockingQueue.class.getName())) {
+ eventCounter.decreaseEventCount(event);
+ }
+ return true;
+ }
+ return false;
+ });
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 812b47a978a..ff17a11d8b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -129,7 +129,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
return false;
}
- outputEventCollector.resetCollectInvocationCountAndGenerateFlag();
+ outputEventCollector.resetFlags();
try {
// event can be supplied after the subtask is closed, so we need to
check isClosed here
if (!isClosed.get()) {
@@ -168,6 +168,9 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
// of the event must be zero in the processor stage, at this
time, the progress of the
// event needs to be reported.
&& outputEventCollector.hasNoGeneratedEvent()
+ // If the event's reference count cannot be increased, it means
that the event has
+ // been released, and the progress of the event can not be
reported.
+ && !outputEventCollector.isFailedToIncreaseReferenceCount()
// Events generated from consensusPipe's transferred data should
never be reported.
&& !(pipeProcessor instanceof PipeConsensusProcessor);
if (shouldReport
@@ -182,7 +185,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
PipeEventCommitManager.getInstance()
.enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
creationTime, regionId);
}
- decreaseReferenceCountAndReleaseLastEvent(shouldReport);
+ decreaseReferenceCountAndReleaseLastEvent(event, shouldReport);
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
LOGGER.info(
"Temporarily out of memory in pipe event processing, will wait for
the memory to release.",
@@ -201,7 +204,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
e);
} else {
LOGGER.info("Exception in pipe event processing, ignored because pipe
is dropped.", e);
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(event);
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index d3dba643e26..ef8f013f7a1 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -208,6 +208,7 @@ public class CommonConfig {
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
private long pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds = 20;
+ private long pipeSubtaskExecutorForcedRestartIntervalMs = Long.MAX_VALUE;
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private long pipeExtractorAssignerDisruptorRingBufferEntrySizeInBytes = 50;
// 50B
@@ -883,6 +884,15 @@ public class CommonConfig {
pipeSubtaskExecutorCronHeartbeatEventIntervalSeconds;
}
+ public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
+ return pipeSubtaskExecutorForcedRestartIntervalMs;
+ }
+
+ public void setPipeSubtaskExecutorForcedRestartIntervalMs(
+ long pipeSubtaskExecutorForcedRestartIntervalMs) {
+ this.pipeSubtaskExecutorForcedRestartIntervalMs =
pipeSubtaskExecutorForcedRestartIntervalMs;
+ }
+
public int getPipeRealTimeQueuePollHistoryThreshold() {
return pipeRealTimeQueuePollHistoryThreshold;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 5183ab1b776..76e0a853dc2 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -318,6 +318,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_subtask_executor_cron_heartbeat_event_interval_seconds",
String.valueOf(config.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()))));
+ config.setPipeSubtaskExecutorForcedRestartIntervalMs(
+ Long.parseLong(
+ properties.getProperty(
+ "pipe_subtask_executor_forced_restart_interval_ms",
+
String.valueOf(config.getPipeSubtaskExecutorForcedRestartIntervalMs()))));
config.setPipeExtractorAssignerDisruptorRingBufferSize(
Integer.parseInt(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index fc9fb0911a0..449bd149e67 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -92,6 +92,10 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();
}
+ public long getPipeSubtaskExecutorForcedRestartIntervalMs() {
+ return COMMON_CONFIG.getPipeSubtaskExecutorForcedRestartIntervalMs();
+ }
+
/////////////////////////////// Extractor ///////////////////////////////
public int getPipeExtractorAssignerDisruptorRingBufferSize() {
@@ -341,6 +345,9 @@ public class PipeConfig {
LOGGER.info(
"PipeSubtaskExecutorCronHeartbeatEventIntervalSeconds: {}",
getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+ LOGGER.info(
+ "PipeSubtaskExecutorForcedRestartIntervalMs: {}",
+ getPipeSubtaskExecutorForcedRestartIntervalMs());
LOGGER.info(
"PipeExtractorAssignerDisruptorRingBufferSize: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index cbdbef1e4e9..57a2ee6a6eb 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -67,7 +67,7 @@ public abstract class EnrichedEvent implements Event {
protected boolean isPatternParsed;
protected boolean isTimeParsed;
- protected boolean shouldReportOnCommit = true;
+ protected volatile boolean shouldReportOnCommit = true;
protected List<Supplier<Void>> onCommittedHooks = new ArrayList<>();
protected EnrichedEvent(
@@ -106,29 +106,32 @@ public abstract class EnrichedEvent implements Event {
* {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be
incremented
* regardless of the circumstances
*/
- public boolean increaseReferenceCount(final String holderMessage) {
+ public synchronized boolean increaseReferenceCount(final String
holderMessage) {
boolean isSuccessful = true;
- synchronized (this) {
- if (isReleased.get()) {
- LOGGER.warn(
- "re-increase reference count to event that has already been
released: {}, stack trace: {}",
- coreReportMessage(),
- Thread.currentThread().getStackTrace());
- isSuccessful = false;
- // Here we still increase the reference count, to remain consistent
with the behavior after
- // internal increase failure.
- referenceCount.incrementAndGet();
- } else {
- if (referenceCount.get() == 0) {
- // We assume that this function will not throw any exceptions.
- isSuccessful =
internallyIncreaseResourceReferenceCount(holderMessage);
- }
- referenceCount.incrementAndGet();
- }
+
+ if (isReleased.get()) {
+ LOGGER.warn(
+ "re-increase reference count to event that has already been
released: {}, stack trace: {}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
+ isSuccessful = false;
+ return isSuccessful;
}
- if (!isSuccessful) {
- LOGGER.warn("increase reference count failed, EnrichedEvent: {}",
coreReportMessage());
+
+ if (referenceCount.get() == 0) {
+ // We assume that this function will not throw any exceptions.
+ isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
+ }
+
+ if (isSuccessful) {
+ referenceCount.incrementAndGet();
+ } else {
+ LOGGER.warn(
+ "increase reference count failed, EnrichedEvent: {}, stack trace:
{}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
}
+
return isSuccessful;
}
@@ -156,31 +159,53 @@ public abstract class EnrichedEvent implements Event {
* {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be
decremented
* regardless of the circumstances
*/
- public boolean decreaseReferenceCount(final String holderMessage, final
boolean shouldReport) {
+ public synchronized boolean decreaseReferenceCount(
+ final String holderMessage, final boolean shouldReport) {
boolean isSuccessful = true;
- synchronized (this) {
- if (referenceCount.get() == 1 && !isReleased.get()) {
- // We assume that this function will not throw any exceptions.
- isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
- if (!shouldReport) {
- shouldReportOnCommit = false;
- }
- PipeEventCommitManager.getInstance().commit(this, committerKey);
+
+ if (isReleased.get()) {
+ LOGGER.warn(
+ "decrease reference count to event that has already been released:
{}, stack trace: {}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
+ isSuccessful = false;
+ return isSuccessful;
+ }
+
+ if (referenceCount.get() == 1) {
+ // We assume that this function will not throw any exceptions.
+ if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
+ LOGGER.warn(
+ "resource reference count is decreased to 0, but failed to release
the resource, EnrichedEvent: {}, stack trace: {}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
}
- final int newReferenceCount = referenceCount.decrementAndGet();
- if (newReferenceCount == 0) {
- isReleased.set(true);
+ if (!shouldReport) {
+ shouldReportOnCommit = false;
}
+ PipeEventCommitManager.getInstance().commit(this, committerKey);
+ }
+
+ // No matter whether the resource is released, we should decrease the
reference count.
+ final int newReferenceCount = referenceCount.decrementAndGet();
+ if (newReferenceCount <= 0) {
+ isReleased.set(true);
+ isSuccessful = newReferenceCount == 0;
if (newReferenceCount < 0) {
- LOGGER.debug(
+ LOGGER.warn(
"reference count is decreased to {}, event: {}, stack trace: {}",
newReferenceCount,
coreReportMessage(),
Thread.currentThread().getStackTrace());
+ referenceCount.set(0);
}
}
+
if (!isSuccessful) {
- LOGGER.warn("decrease reference count failed, EnrichedEvent: {}",
coreReportMessage());
+ LOGGER.warn(
+ "decrease reference count failed, EnrichedEvent: {}, stack trace:
{}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
}
return isSuccessful;
}
@@ -195,20 +220,29 @@ public abstract class EnrichedEvent implements Event {
* {@code false} otherwise; {@link EnrichedEvent#referenceCount} will be
reset to zero
* regardless of the circumstances
*/
- public boolean clearReferenceCount(final String holderMessage) {
- boolean isSuccessful = true;
- synchronized (this) {
- if (referenceCount.get() >= 1 && !isReleased.get()) {
- // We assume that this function will not throw any exceptions.
- isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
- isReleased.set(true);
- }
- referenceCount.set(0);
+ public synchronized boolean clearReferenceCount(final String holderMessage) {
+ if (isReleased.get()) {
+ LOGGER.warn(
+ "clear reference count to event that has already been released: {},
stack trace: {}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
+ return false;
}
- if (!isSuccessful) {
- LOGGER.warn("clear reference count failed, EnrichedEvent: {}",
coreReportMessage());
+
+ if (referenceCount.get() >= 1) {
+ shouldReportOnCommit = false;
+ // We assume that this function will not throw any exceptions.
+ if (!internallyDecreaseResourceReferenceCount(holderMessage)) {
+ LOGGER.warn(
+ "resource reference count is decreased to 0, but failed to release
the resource, EnrichedEvent: {}, stack trace: {}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
+ }
}
- return isSuccessful;
+
+ referenceCount.set(0);
+ isReleased.set(true);
+ return true;
}
/**
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
index c62e1ef60ff..591ec500dae 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/metric/PipeEventCounter.java
@@ -23,15 +23,15 @@ import org.apache.iotdb.pipe.api.event.Event;
public abstract class PipeEventCounter {
- public Integer getTsFileInsertionEventCount() {
+ public int getTsFileInsertionEventCount() {
return 0;
}
- public Integer getTabletInsertionEventCount() {
+ public int getTabletInsertionEventCount() {
return 0;
}
- public Integer getPipeHeartbeatEventCount() {
+ public int getPipeHeartbeatEventCount() {
return 0;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
index 04983a984d9..c67e80181f9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/connection/BlockingPendingQueue.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.task.connection;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.pipe.api.event.Event;
@@ -29,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
-import java.util.function.Predicate;
public abstract class BlockingPendingQueue<E extends Event> {
@@ -40,7 +40,7 @@ public abstract class BlockingPendingQueue<E extends Event> {
protected final BlockingQueue<E> pendingQueue;
- private final PipeEventCounter eventCounter;
+ protected final PipeEventCounter eventCounter;
protected BlockingPendingQueue(
final BlockingQueue<E> pendingQueue, final PipeEventCounter
eventCounter) {
@@ -106,12 +106,36 @@ public abstract class BlockingPendingQueue<E extends
Event> {
eventCounter.reset();
}
+ /** DO NOT FORGET to set eventCounter to new value after invoking this
method. */
public void forEach(final Consumer<? super E> action) {
pendingQueue.forEach(action);
}
- public void removeIf(final Predicate<? super E> filter) {
- pendingQueue.removeIf(filter);
+ public void discardAllEvents() {
+ pendingQueue.removeIf(
+ event -> {
+ if (event instanceof EnrichedEvent) {
+ if (((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
+ eventCounter.decreaseEventCount(event);
+ }
+ }
+ return true;
+ });
+ eventCounter.reset();
+ }
+
+ public void discardEventsOfPipe(final String pipeNameToDrop) {
+ pendingQueue.removeIf(
+ event -> {
+ if (event instanceof EnrichedEvent
+ && pipeNameToDrop.equals(((EnrichedEvent) event).getPipeName()))
{
+ if (((EnrichedEvent)
event).clearReferenceCount(BlockingPendingQueue.class.getName())) {
+ eventCounter.decreaseEventCount(event);
+ }
+ return true;
+ }
+ return false;
+ });
}
public boolean isEmpty() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index 25ddb438806..6e520de1b76 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -84,7 +84,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
LOGGER.info(
"onFailure in pipe transfer, ignored because the connector subtask
is dropped.",
throwable);
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(null);
return;
}
@@ -227,7 +227,8 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
protected synchronized void
clearReferenceCountAndReleaseLastExceptionEvent() {
if (lastExceptionEvent != null) {
- if (lastExceptionEvent instanceof EnrichedEvent) {
+ if (lastExceptionEvent instanceof EnrichedEvent
+ && !((EnrichedEvent) lastExceptionEvent).isReleased()) {
((EnrichedEvent)
lastExceptionEvent).clearReferenceCount(PipeSubtask.class.getName());
}
lastExceptionEvent = null;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index c366b4d62e4..cff90b88481 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -39,7 +39,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
public synchronized void onFailure(final Throwable throwable) {
if (isClosed.get()) {
LOGGER.info("onFailure in pipe subtask, ignored because pipe is
dropped.", throwable);
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(null);
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
index a6f11bcae9b..493e272e799 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
@@ -148,26 +148,47 @@ public abstract class PipeSubtask
@Override
public void close() {
- clearReferenceCountAndReleaseLastEvent();
+ clearReferenceCountAndReleaseLastEvent(null);
}
protected synchronized void decreaseReferenceCountAndReleaseLastEvent(
- final boolean shouldReport) {
+ final Event actualLastEvent, final boolean shouldReport) {
+ // lastEvent may be set to null due to
PipeConnectorSubtask#discardEventsOfPipe
if (lastEvent != null) {
- if (lastEvent instanceof EnrichedEvent) {
+ if (lastEvent instanceof EnrichedEvent && !((EnrichedEvent)
lastEvent).isReleased()) {
((EnrichedEvent) lastEvent)
.decreaseReferenceCount(PipeSubtask.class.getName(), shouldReport);
}
lastEvent = null;
+ return;
+ }
+
+ // If lastEvent is set to null due to
PipeConnectorSubtask#discardEventsOfPipe (connector close)
+ // and finally exception occurs, we need to release the actual last event
from the connector
+ // given by the parameter
+ if (actualLastEvent instanceof EnrichedEvent
+ && !((EnrichedEvent) actualLastEvent).isReleased()) {
+ ((EnrichedEvent) actualLastEvent)
+ .decreaseReferenceCount(PipeSubtask.class.getName(), shouldReport);
}
}
- protected synchronized void clearReferenceCountAndReleaseLastEvent() {
+ protected synchronized void clearReferenceCountAndReleaseLastEvent(final
Event actualLastEvent) {
+ // lastEvent may be set to null due to
PipeConnectorSubtask#discardEventsOfPipe
if (lastEvent != null) {
- if (lastEvent instanceof EnrichedEvent) {
+ if (lastEvent instanceof EnrichedEvent && !((EnrichedEvent)
lastEvent).isReleased()) {
((EnrichedEvent)
lastEvent).clearReferenceCount(PipeSubtask.class.getName());
}
lastEvent = null;
+ return;
+ }
+
+ // If lastEvent is set to null due to
PipeConnectorSubtask#discardEventsOfPipe (connector close)
+ // and finally exception occurs, we need to release the actual last event
from the connector
+ // given by the parameter
+ if (actualLastEvent instanceof EnrichedEvent
+ && !((EnrichedEvent) actualLastEvent).isReleased()) {
+ ((EnrichedEvent)
actualLastEvent).clearReferenceCount(PipeSubtask.class.getName());
}
}