This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 199342da8df Pipe: clear event reference count instead of decreasing
when closing to improve idempotence & add creation time for committer key to
avoid outdated commit & detect re-increasing reference count to avoid NPE
(#12315)
199342da8df is described below
commit 199342da8df903ffa7ba821fd94fa4e17bbac11a
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Apr 12 23:59:53 2024 +0800
Pipe: clear event reference count instead of decreasing when closing to
improve idempotence & add creation time for committer key to avoid outdated
commit & detect re-increasing reference count to avoid NPE (#12315)
---
.../pipe/execution/PipeConfigNodeSubtask.java | 9 ++----
.../builder/PipeTransferBatchReqBuilder.java | 37 ++++++++++++++++------
.../async/IoTDBDataRegionAsyncConnector.java | 26 +++++++++++++++
.../PipeTransferTabletBatchEventHandler.java | 12 +++++--
.../PipeTransferTabletInsertionEventHandler.java | 5 ---
.../PipeTransferTsFileInsertionEventHandler.java | 2 --
.../db/pipe/resource/memory/PipeMemoryBlock.java | 7 +++-
.../pipe/task/connection/PipeEventCollector.java | 8 +++--
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 3 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 2 +-
.../subtask/connector/PipeConnectorSubtask.java | 6 ++--
.../connector/PipeConnectorSubtaskManager.java | 10 ++++--
.../subtask/processor/PipeProcessorSubtask.java | 5 +--
.../task/stage/SubscriptionTaskConnectorStage.java | 2 +-
.../SubscriptionConnectorSubtaskManager.java | 10 ++++--
.../iotdb/commons/pipe/event/EnrichedEvent.java | 24 +++++++++++++-
.../pipe/progress/PipeEventCommitManager.java | 26 +++++++++------
.../commons/pipe/progress/PipeEventCommitter.java | 15 +++++++--
.../task/subtask/PipeAbstractConnectorSubtask.java | 2 +-
.../pipe/task/subtask/PipeReportableSubtask.java | 2 +-
.../commons/pipe/task/subtask/PipeSubtask.java | 18 +++++++----
21 files changed, 168 insertions(+), 63 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
index bbe0ead0edd..376548b15d8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/execution/PipeConfigNodeSubtask.java
@@ -150,7 +150,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
outputPipeConnector.transfer(event);
- releaseLastEvent(true);
+ decreaseReferenceCountAndReleaseLastEvent(true);
} catch (PipeException e) {
if (!isClosed.get()) {
throw e;
@@ -159,7 +159,7 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
"{} in pipe transfer, ignored because pipe is dropped.",
e.getClass().getSimpleName(),
e);
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
}
} catch (Exception e) {
if (!isClosed.get()) {
@@ -169,16 +169,13 @@ public class PipeConfigNodeSubtask extends
PipeAbstractConnectorSubtask {
e);
} else {
LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
}
}
return true;
}
- // synchronized for close() and releaseLastEvent(). make sure that the
lastEvent
- // will not be updated after pipeProcessor.close() to avoid resource leak
- // because of the lastEvent is not released.
@Override
public void close() {
isClosed.set(true);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 0ea50ae73ae..225f39c149c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -43,6 +43,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_BATCH_DELAY_KEY;
@@ -70,6 +71,8 @@ public abstract class PipeTransferBatchReqBuilder implements
AutoCloseable {
protected final PipeMemoryBlock allocatedMemoryBlock;
protected long totalBufferSize = 0;
+ private final AtomicBoolean isClosed = new AtomicBoolean(true);
+
protected PipeTransferBatchReqBuilder(PipeParameters parameters) {
maxDelayInMs =
parameters.getIntOrDefault(
@@ -104,6 +107,8 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
requestMaxBatchSizeInBytes,
getMaxBatchSizeInBytes());
}
+
+ isClosed.set(false);
}
/**
@@ -123,17 +128,23 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
// The deduplication logic here is to avoid the accumulation of the same
event in a batch when
// retrying.
if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
- events.add(event);
- requestCommitIds.add(requestCommitId);
- final int bufferSize = buildTabletInsertionBuffer(event);
-
- ((EnrichedEvent)
event).increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName());
-
- if (firstEventProcessingTime == Long.MIN_VALUE) {
- firstEventProcessingTime = System.currentTimeMillis();
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (((EnrichedEvent) event)
+
.increaseReferenceCount(PipeTransferBatchReqBuilder.class.getName())) {
+ events.add(event);
+ requestCommitIds.add(requestCommitId);
+
+ final int bufferSize = buildTabletInsertionBuffer(event);
+ totalBufferSize += bufferSize;
+
+ if (firstEventProcessingTime == Long.MIN_VALUE) {
+ firstEventProcessingTime = System.currentTimeMillis();
+ }
+ } else {
+ LOGGER.error(
+ "TabletInsertionEvent {} can not be transferred because the
reference count can not be increased, the data represented by this event is
lost",
+ ((EnrichedEvent) event).coreReportMessage());
}
-
- totalBufferSize += bufferSize;
}
return totalBufferSize >= getMaxBatchSizeInBytes()
@@ -203,6 +214,8 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
@Override
public synchronized void close() {
+ isClosed.set(true);
+
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event).clearReferenceCount(this.getClass().getName());
@@ -210,4 +223,8 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
}
allocatedMemoryBlock.close();
}
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index dd6b002c523..2607415325f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -170,6 +170,15 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) {
final PipeInsertNodeTabletInsertionEvent
pipeInsertNodeTabletInsertionEvent =
(PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent;
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName())) {
+ LOGGER.error(
+ "PipeInsertNodeTabletInsertionEvent {} can not be transferred
because the reference count can not be increased, the data represented by this
event is lost",
+ pipeInsertNodeTabletInsertionEvent.coreReportMessage());
+ return;
+ }
+
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
final TPipeTransferReq pipeTransferReq =
@@ -185,6 +194,15 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
} else { // tabletInsertionEvent instanceof PipeRawTabletInsertionEvent
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
(PipeRawTabletInsertionEvent) tabletInsertionEvent;
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeRawTabletInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName())) {
+ LOGGER.error(
+ "PipeRawTabletInsertionEvent {} can not be transferred because
the reference count can not be increased, the data represented by this event is
lost",
+ pipeRawTabletInsertionEvent.coreReportMessage());
+ return;
+ }
+
final PipeTransferTabletRawReq pipeTransferTabletRawReq =
PipeTransferTabletRawReq.toTPipeTransferReq(
pipeRawTabletInsertionEvent.convertToTablet(),
@@ -245,6 +263,14 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
final PipeTsFileInsertionEvent pipeTsFileInsertionEvent =
(PipeTsFileInsertionEvent) tsFileInsertionEvent;
+ // We increase the reference count for this event to determine if the
event may be released.
+ if (!pipeTsFileInsertionEvent.increaseReferenceCount(
+ IoTDBDataRegionAsyncConnector.class.getName())) {
+ LOGGER.error(
+ "PipeTsFileInsertionEvent {} can not be transferred because the
reference count can not be increased, the data represented by this event is
lost",
+ pipeTsFileInsertionEvent.coreReportMessage());
+ return;
+ }
// Just in case. To avoid the case that exception occurred when
constructing the handler.
if (!pipeTsFileInsertionEvent.getTsFile().exists()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index f45c012bb1b..668757201f0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -49,6 +49,7 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
private final TPipeTransferReq req;
private final IoTDBDataRegionAsyncConnector connector;
+ private final IoTDBThriftAsyncPipeTransferBatchReqBuilder batchReqBuilder;
public PipeTransferTabletBatchEventHandler(
IoTDBThriftAsyncPipeTransferBatchReqBuilder batchBuilder,
@@ -60,6 +61,7 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
req = batchBuilder.toTPipeTransferReq();
this.connector = connector;
+ this.batchReqBuilder = batchBuilder;
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException {
@@ -83,10 +85,16 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
.statusHandler()
.handle(status, response.getStatus().getMessage(),
events.toString());
}
+ final boolean isClosed = batchReqBuilder.isClosed();
for (final Event event : events) {
if (event instanceof EnrichedEvent) {
- ((EnrichedEvent) event)
-
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(),
true);
+ if (isClosed) {
+ ((EnrichedEvent) event)
+
.clearReferenceCount(PipeTransferTabletBatchEventHandler.class.getName());
+ } else {
+ ((EnrichedEvent) event)
+
.decreaseReferenceCount(PipeTransferTabletBatchEventHandler.class.getName(),
true);
+ }
}
}
} catch (Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 8eb32d5a746..5d834440d7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -50,11 +50,6 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
this.event = event;
this.req = req;
this.connector = connector;
-
- if (this.event instanceof EnrichedEvent) {
- ((EnrichedEvent) this.event)
-
.increaseReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
- }
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
index 75fa051fe2b..467405cf82a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileInsertionEventHandler.java
@@ -91,8 +91,6 @@ public class PipeTransferTsFileInsertionEventHandler
: new RandomAccessFile(tsFile, "r");
isSealSignalSent = new AtomicBoolean(false);
-
-
event.increaseReferenceCount(PipeTransferTsFileInsertionEventHandler.class.getName());
}
public void transfer(AsyncPipeDataTransferServiceClient client) throws
TException, IOException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
index 98947caab56..06dfc0f64a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java
@@ -160,7 +160,12 @@ public class PipeMemoryBlock implements AutoCloseable {
@Override
public String toString() {
- return "PipeMemoryBlock{" + "memoryUsageInBytes=" +
memoryUsageInBytes.get() + '}';
+ return "PipeMemoryBlock{"
+ + "memoryUsageInBytes="
+ + memoryUsageInBytes.get()
+ + ", isReleased="
+ + isReleased
+ + '}';
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index bc6f1bb6291..730cc5bb661 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -46,14 +46,18 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
private final EnrichedDeque<Event> bufferQueue;
+ private final long creationTime;
+
private final int regionId;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
- public PipeEventCollector(BoundedBlockingPendingQueue<Event> pendingQueue,
int regionId) {
+ public PipeEventCollector(
+ BoundedBlockingPendingQueue<Event> pendingQueue, long creationTime, int
regionId) {
this.pendingQueue = pendingQueue;
+ this.creationTime = creationTime;
this.regionId = regionId;
bufferQueue = new EnrichedDeque<>(new LinkedList<>());
}
@@ -129,7 +133,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
// Assign a commit id for this event in order to report progress in
order.
PipeEventCommitManager.getInstance()
- .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, regionId);
+ .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
creationTime, regionId);
}
if (event instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index becb0c78dd7..001978ebe90 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -79,7 +79,8 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
@Override
public void dropSubtask() throws PipeException {
- PipeConnectorSubtaskManager.instance().deregister(pipeName, regionId,
connectorSubtaskId);
+ PipeConnectorSubtaskManager.instance()
+ .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 2540debdd8d..d88bfb0530c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -98,7 +98,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
// old one, so we need creationTime to make their hash code different in
the map.
final String taskId = pipeName + "_" + regionId + "_" + creationTime;
final PipeEventCollector pipeConnectorOutputEventCollector =
- new PipeEventCollector(pipeConnectorOutputPendingQueue, regionId);
+ new PipeEventCollector(pipeConnectorOutputPendingQueue, creationTime,
regionId);
this.pipeProcessorSubtask =
new PipeProcessorSubtask(
taskId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 0f0bdeee689..c45f2b6aede 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -113,7 +113,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
: event);
}
- releaseLastEvent(true);
+ decreaseReferenceCountAndReleaseLastEvent(true);
} catch (PipeException e) {
if (!isClosed.get()) {
throw e;
@@ -122,7 +122,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
"{} in pipe transfer, ignored because pipe is dropped.",
e.getClass().getSimpleName(),
e);
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
}
} catch (Exception e) {
if (!isClosed.get()) {
@@ -137,7 +137,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
e);
} else {
LOGGER.info("Exception in pipe transfer, ignored because pipe is
dropped.", e);
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index 124260a434e..968c73b4c5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -65,7 +65,11 @@ public class PipeConnectorSubtaskManager {
// for matching in `CONNECTOR_CONSTRUCTORS`
.toLowerCase();
PipeEventCommitManager.getInstance()
- .register(environment.getPipeName(), environment.getRegionId(),
connectorKey);
+ .register(
+ environment.getPipeName(),
+ environment.getCreationTime(),
+ environment.getRegionId(),
+ connectorKey);
final boolean isDataRegionConnector =
StorageEngine.getInstance()
@@ -145,7 +149,7 @@ public class PipeConnectorSubtaskManager {
}
public synchronized void deregister(
- String pipeName, int dataRegionId, String attributeSortedString) {
+ String pipeName, long creationTime, int dataRegionId, String
attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
@@ -158,7 +162,7 @@ public class PipeConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
- PipeEventCommitManager.getInstance().deregister(pipeName, dataRegionId);
+ PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
dataRegionId);
}
public synchronized void start(String attributeSortedString) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 6645f2c3d05..021099cac91 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -142,7 +142,8 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
outputEventCollector);
}
}
- releaseLastEvent(!isClosed.get() &&
outputEventCollector.hasNoCollectInvocationAfterReset());
+ decreaseReferenceCountAndReleaseLastEvent(
+ !isClosed.get() &&
outputEventCollector.hasNoCollectInvocationAfterReset());
} catch (PipeRuntimeOutOfMemoryCriticalException e) {
LOGGER.info(
"Temporarily out of memory in pipe event processing, will wait for
the memory to release.",
@@ -157,7 +158,7 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
e);
} else {
LOGGER.info("Exception in pipe event processing, ignored because pipe
is dropped.", e);
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
index 1bbcc153beb..e5f965f7176 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
@@ -67,7 +67,7 @@ public class SubscriptionTaskConnectorStage extends
PipeTaskConnectorStage {
@Override
public void dropSubtask() throws PipeException {
SubscriptionConnectorSubtaskManager.instance()
- .deregister(pipeName, regionId, connectorSubtaskId);
+ .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index ef4c7e578b1..6627bf2948a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -73,7 +73,11 @@ public class SubscriptionConnectorSubtaskManager {
}
PipeEventCommitManager.getInstance()
- .register(environment.getPipeName(), environment.getRegionId(),
connectorKey);
+ .register(
+ environment.getPipeName(),
+ environment.getCreationTime(),
+ environment.getRegionId(),
+ connectorKey);
String attributeSortedString = new
TreeMap<>(pipeConnectorParameters.getAttribute()).toString();
attributeSortedString = "__subscription_" + attributeSortedString;
@@ -137,7 +141,7 @@ public class SubscriptionConnectorSubtaskManager {
}
public synchronized void deregister(
- String pipeName, int dataRegionId, String attributeSortedString) {
+ String pipeName, long creationTime, int dataRegionId, String
attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(FAILED_TO_DEREGISTER_EXCEPTION_MESSAGE +
attributeSortedString);
}
@@ -148,7 +152,7 @@ public class SubscriptionConnectorSubtaskManager {
attributeSortedString2SubtaskLifeCycleMap.remove(attributeSortedString);
}
- PipeEventCommitManager.getInstance().deregister(pipeName, dataRegionId);
+ PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
dataRegionId);
}
public synchronized void start(String attributeSortedString) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index 3a147a38682..1f346c8f72d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -41,6 +42,7 @@ public abstract class EnrichedEvent implements Event {
private static final Logger LOGGER =
LoggerFactory.getLogger(EnrichedEvent.class);
protected final AtomicInteger referenceCount;
+ protected final AtomicBoolean isReleased;
protected final String pipeName;
protected final PipeTaskMeta pipeTaskMeta;
@@ -66,6 +68,7 @@ public abstract class EnrichedEvent implements Event {
long startTime,
long endTime) {
referenceCount = new AtomicInteger(0);
+ isReleased = new AtomicBoolean(false);
this.pipeName = pipeName;
this.pipeTaskMeta = pipeTaskMeta;
this.pipePattern = pipePattern;
@@ -87,6 +90,13 @@ public abstract class EnrichedEvent implements Event {
public boolean increaseReferenceCount(String holderMessage) {
boolean isSuccessful = true;
synchronized (this) {
+ if (isReleased.get()) {
+ LOGGER.warn(
+ "re-increase reference count to event that has already been
released: {}, stack trace: {}",
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
+ return false;
+ }
if (referenceCount.get() == 0) {
isSuccessful = internallyIncreaseResourceReferenceCount(holderMessage);
}
@@ -127,8 +137,15 @@ public abstract class EnrichedEvent implements Event {
PipeEventCommitManager.getInstance().commit(this, committerKey);
}
final int newReferenceCount = referenceCount.decrementAndGet();
+ if (newReferenceCount == 0) {
+ isReleased.set(true);
+ }
if (newReferenceCount < 0) {
- LOGGER.warn("reference count is decreased to {}.", newReferenceCount);
+ LOGGER.warn(
+ "reference count is decreased to {}, event: {}, stack trace: {}",
+ newReferenceCount,
+ coreReportMessage(),
+ Thread.currentThread().getStackTrace());
}
}
return isSuccessful;
@@ -150,6 +167,7 @@ public abstract class EnrichedEvent implements Event {
isSuccessful = internallyDecreaseResourceReferenceCount(holderMessage);
}
referenceCount.set(0);
+ isReleased.set(true);
}
return isSuccessful;
}
@@ -289,6 +307,8 @@ public abstract class EnrichedEvent implements Event {
return "EnrichedEvent{"
+ "referenceCount="
+ referenceCount.get()
+ + ", isReleased="
+ + isReleased.get()
+ ", pipeName='"
+ pipeName
+ "', pipeTaskMeta="
@@ -316,6 +336,8 @@ public abstract class EnrichedEvent implements Event {
return "EnrichedEvent{"
+ "referenceCount="
+ referenceCount.get()
+ + ", isReleased="
+ + isReleased.get()
+ ", pipeName='"
+ pipeName
+ "', committerKey='"
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
index 89eb7b2868c..bb985248854 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitManager.java
@@ -35,25 +35,25 @@ public class PipeEventCommitManager {
// key: pipeName_dataRegionId
private final Map<String, PipeEventCommitter> eventCommitterMap = new
ConcurrentHashMap<>();
- public void register(String pipeName, int regionId, String pipePluginName) {
+ public void register(String pipeName, long creationTime, int regionId,
String pipePluginName) {
if (pipeName == null || pipePluginName == null) {
return;
}
- final String committerKey = generateCommitterKey(pipeName, regionId);
+ final String committerKey = generateCommitterKey(pipeName, creationTime,
regionId);
if (eventCommitterMap.containsKey(committerKey)) {
LOGGER.warn(
"Pipe with same name is already registered on this data region,
overwriting: {}",
committerKey);
}
- PipeEventCommitter eventCommitter = new PipeEventCommitter(pipeName,
regionId);
+ PipeEventCommitter eventCommitter = new PipeEventCommitter(pipeName,
creationTime, regionId);
eventCommitterMap.put(committerKey, eventCommitter);
PipeEventCommitMetrics.getInstance().register(eventCommitter,
committerKey);
LOGGER.info("Pipe committer registered for pipe on data region: {}",
committerKey);
}
- public void deregister(String pipeName, int regionId) {
- final String committerKey = generateCommitterKey(pipeName, regionId);
+ public void deregister(String pipeName, long creationTime, int regionId) {
+ final String committerKey = generateCommitterKey(pipeName, creationTime,
regionId);
eventCommitterMap.remove(committerKey);
PipeEventCommitMetrics.getInstance().deregister(committerKey);
LOGGER.info("Pipe committer deregistered for pipe on data region: {}",
committerKey);
@@ -63,12 +63,13 @@ public class PipeEventCommitManager {
* Assign a commit id and a key for commit. Make sure {@code
EnrichedEvent.pipeName} is set before
* calling this.
*/
- public void enrichWithCommitterKeyAndCommitId(EnrichedEvent event, int
regionId) {
+ public void enrichWithCommitterKeyAndCommitId(
+ EnrichedEvent event, long creationTime, int regionId) {
if (event == null || event.getPipeName() == null || !event.needToCommit())
{
return;
}
- final String committerKey = generateCommitterKey(event.getPipeName(),
regionId);
+ final String committerKey = generateCommitterKey(event.getPipeName(),
creationTime, regionId);
final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
if (committer == null) {
return;
@@ -86,13 +87,20 @@ public class PipeEventCommitManager {
final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
if (committer == null) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "missing PipeEventCommitter({}) when commit event: {}, stack
trace: {}",
+ committerKey,
+ event.coreReportMessage(),
+ Thread.currentThread().getStackTrace());
+ }
return;
}
committer.commit(event);
}
- private static String generateCommitterKey(String pipeName, int regionId) {
- return String.format("%s_%s", pipeName, regionId);
+ private static String generateCommitterKey(String pipeName, long
creationTime, int regionId) {
+ return String.format("%s_%s_%s", pipeName, regionId, creationTime);
}
private PipeEventCommitManager() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
index 04664baf1ea..eeec9c8be25 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/progress/PipeEventCommitter.java
@@ -36,6 +36,7 @@ public class PipeEventCommitter {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCommitter.class);
private final String pipeName;
+ private final long creationTime;
private final int dataRegionId;
private final AtomicLong commitIdGenerator = new AtomicLong(0);
@@ -48,9 +49,10 @@ public class PipeEventCommitter {
event ->
Objects.requireNonNull(event, "committable event cannot be
null").getCommitId()));
- PipeEventCommitter(String pipeName, int dataRegionId) {
+ PipeEventCommitter(String pipeName, long creationTime, int dataRegionId) {
// make it package-private
this.pipeName = pipeName;
+ this.creationTime = creationTime;
this.dataRegionId = dataRegionId;
}
@@ -66,9 +68,11 @@ public class PipeEventCommitter {
if (e.getCommitId() <= lastCommitId.get()) {
LOGGER.warn(
- "commit id must be monotonically increasing, lastCommitId: {},
event: {}",
+ "commit id must be monotonically increasing, current commit id:
{}, last commit id: {}, event: {}, stack trace: {}",
+ e.getCommitId(),
lastCommitId.get(),
- e);
+ e.coreReportMessage(),
+ Thread.currentThread().getStackTrace());
commitQueue.poll();
continue;
}
@@ -89,6 +93,11 @@ public class PipeEventCommitter {
return pipeName;
}
+ // TODO: waiting called by PipeEventCommitMetrics
+ public long getCreationTime() {
+ return creationTime;
+ }
+
public int getDataRegionId() {
return dataRegionId;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
index 32ee1b292f0..4d0bae8e0e9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeAbstractConnectorSubtask.java
@@ -90,7 +90,7 @@ public abstract class PipeAbstractConnectorSubtask extends
PipeReportableSubtask
if (isClosed.get()) {
LOGGER.info("onFailure in pipe transfer, ignored because pipe is
dropped.", throwable);
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
index b04478b5834..63d807ba347 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeReportableSubtask.java
@@ -39,7 +39,7 @@ public abstract class PipeReportableSubtask extends
PipeSubtask {
public synchronized void onFailure(Throwable throwable) {
if (isClosed.get()) {
LOGGER.info("onFailure in pipe subtask, ignored because pipe is
dropped.", throwable);
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
return;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
index baa018059a4..a3834e94dd0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/subtask/PipeSubtask.java
@@ -90,7 +90,7 @@ public abstract class PipeSubtask
return hasAtLeastOneEventProcessed;
}
- /** Should be synchronized with {@link PipeSubtask#releaseLastEvent} */
+ /** Should be synchronized with {@link
PipeSubtask#decreaseReferenceCountAndReleaseLastEvent} */
protected synchronized void setLastEvent(Event event) {
lastEvent = event;
}
@@ -146,15 +146,12 @@ public abstract class PipeSubtask
return !shouldStopSubmittingSelf.get();
}
- // synchronized for close() and releaseLastEvent(). make sure that the
lastEvent
- // will not be updated after pipeProcessor.close() to avoid resource leak
- // because of the lastEvent is not released.
@Override
public void close() {
- releaseLastEvent(false);
+ clearReferenceCountAndReleaseLastEvent();
}
- protected synchronized void releaseLastEvent(boolean shouldReport) {
+ protected synchronized void
decreaseReferenceCountAndReleaseLastEvent(boolean shouldReport) {
if (lastEvent != null) {
if (lastEvent instanceof EnrichedEvent) {
((EnrichedEvent)
lastEvent).decreaseReferenceCount(this.getClass().getName(), shouldReport);
@@ -163,6 +160,15 @@ public abstract class PipeSubtask
}
}
+ protected synchronized void clearReferenceCountAndReleaseLastEvent() {
+ if (lastEvent != null) {
+ if (lastEvent instanceof EnrichedEvent) {
+ ((EnrichedEvent)
lastEvent).clearReferenceCount(this.getClass().getName());
+ }
+ lastEvent = null;
+ }
+ }
+
public String getTaskID() {
return taskID;
}