This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 44bb010bd05 Pipe: fix the reference count leak of events in async
connector during restart (#14324) (#14439)
44bb010bd05 is described below
commit 44bb010bd05bfc376955e7971103b7ed4cb300ef
Author: VGalaxies <[email protected]>
AuthorDate: Mon Dec 16 16:46:24 2024 +0800
Pipe: fix the reference count leak of events in async connector during
restart (#14324) (#14439)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../subtask/connector/PipeConnectorSubtask.java | 9 ++
.../async/IoTDBDataRegionAsyncConnector.java | 39 ++++++++-
.../PipeTransferTabletBatchEventHandler.java | 33 +++++---
.../PipeTransferTabletInsertNodeEventHandler.java | 14 ++--
.../PipeTransferTabletInsertionEventHandler.java | 36 ++++----
.../handler/PipeTransferTabletRawEventHandler.java | 15 ++--
.../handler/PipeTransferTrackableHandler.java | 98 ++++++++++++++++++++++
.../async/handler/PipeTransferTsFileHandler.java | 39 ++++++---
.../realtime/assigner/PipeDataRegionAssigner.java | 18 ++--
.../resource/ref/PipePhantomReferenceManager.java | 2 +-
10 files changed, 239 insertions(+), 64 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
index cc2d5b5b346..c0470becb3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtask.java
@@ -175,6 +175,9 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
throw new PipeConnectionException(
"PipeConnector: "
+ outputPipeConnector.getClass().getName()
+ + "(id: "
+ + taskID
+ + ")"
+ " heartbeat failed, or encountered failure when transferring
generic event. Failure: "
+ e.getMessage(),
e);
@@ -196,7 +199,13 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
isClosed.set(true);
try {
+ final long startTime = System.currentTimeMillis();
outputPipeConnector.close();
+ LOGGER.info(
+ "Pipe: connector subtask {} was closed {} within {} ms",
+ taskID,
+ outputPipeConnector,
+ System.currentTimeMillis() - startTime);
} catch (final Exception e) {
LOGGER.info(
"Exception occurred when closing pipe connector subtask {}, root
cause: {}",
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 7ced0ca87d3..7641eb30f8e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransfer
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletBatchEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletInsertNodeEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTabletRawEventHandler;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTrackableHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler.PipeTransferTsFileHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync.IoTDBDataRegionSyncConnector;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
@@ -67,6 +68,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -87,14 +89,17 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private static final String THRIFT_ERROR_FORMATTER_WITH_ENDPOINT =
"Exception occurred while sending to receiver %s:%s.";
- private IoTDBDataNodeAsyncClientManager clientManager;
-
private final IoTDBDataRegionSyncConnector retryConnector = new
IoTDBDataRegionSyncConnector();
private final BlockingQueue<Event> retryEventQueue = new
LinkedBlockingQueue<>();
+ private IoTDBDataNodeAsyncClientManager clientManager;
+
private PipeTransferBatchReqBuilder tabletBatchBuilder;
+ // use these variables to prevent reference count leaks under some corner
cases when closing
private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final Map<PipeTransferTrackableHandler,
PipeTransferTrackableHandler> pendingHandlers =
+ new ConcurrentHashMap<>();
@Override
public void validate(final PipeParameterValidator validator) throws
Exception {
@@ -536,12 +541,17 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
isClosed.set(true);
retryConnector.close();
- clearRetryEventsReferenceCount();
if (tabletBatchBuilder != null) {
tabletBatchBuilder.close();
}
+ // ensure all on-the-fly handlers have been cleared
+ if (hasPendingHandlers()) {
+ pendingHandlers.forEach((handler, _handler) ->
handler.clearEventsReferenceCount());
+ pendingHandlers.clear();
+ }
+
try {
if (clientManager != null) {
clientManager.close();
@@ -550,10 +560,13 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
LOGGER.warn("Failed to close client manager.", e);
}
+ // clear reference count of events in retry queue after closing async
client
+ clearRetryEventsReferenceCount();
+
super.close();
}
- //////////////////////////// APIs provided for metric framework
////////////////////////////
+ //////////////////////// APIs provided for metric framework
////////////////////////
public int getRetryEventQueueSize() {
return retryEventQueue.size();
@@ -579,4 +592,22 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
return count.get();
}
}
+
+ //////////////////////// APIs provided for PipeTransferTrackableHandler
////////////////////////
+
+ public boolean isClosed() {
+ return isClosed.get();
+ }
+
+ public void trackHandler(final PipeTransferTrackableHandler handler) {
+ pendingHandlers.put(handler, handler);
+ }
+
+ public void eliminateHandler(final PipeTransferTrackableHandler handler) {
+ pendingHandlers.remove(handler);
+ }
+
+ public boolean hasPendingHandlers() {
+ return !pendingHandlers.isEmpty();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
index 6ef424565e8..8370dd667ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletBatchEventHandler.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +42,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTabletBatchEventHandler extends
PipeTransferTrackableHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletBatchEventHandler.class);
@@ -54,11 +53,11 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
private final TPipeTransferReq req;
private final double reqCompressionRatio;
- private final IoTDBDataRegionAsyncConnector connector;
-
public PipeTransferTabletBatchEventHandler(
final PipeTabletEventPlainBatch batch, final
IoTDBDataRegionAsyncConnector connector)
throws IOException {
+ super(connector);
+
// Deep copy to keep events' reference
events = batch.deepCopyEvents();
pipeName2BytesAccumulated = batch.deepCopyPipeName2BytesAccumulated();
@@ -70,8 +69,6 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
uncompressedReq, connector.getCompressors())
: uncompressedReq;
reqCompressionRatio = (double) req.getBody().length /
uncompressedReq.getBody().length;
-
- this.connector = connector;
}
public void transfer(final AsyncPipeDataTransferServiceClient client) throws
TException {
@@ -83,15 +80,15 @@ public class PipeTransferTabletBatchEventHandler implements
AsyncMethodCallback<
(long) (entry.getValue() * reqCompressionRatio));
}
- client.pipeTransfer(req, this);
+ tryTransfer(client, req);
}
@Override
- public void onComplete(final TPipeTransferResp response) {
+ protected boolean onCompleteInternal(final TPipeTransferResp response) {
// Just in case
if (response == null) {
onError(new PipeException("TPipeTransferResp is null"));
- return;
+ return false;
}
try {
@@ -114,11 +111,14 @@ public class PipeTransferTabletBatchEventHandler
implements AsyncMethodCallback<
PipeTransferTabletBatchEventHandler.class.getName(), true));
} catch (final Exception e) {
onError(e);
+ return false;
}
+
+ return true;
}
@Override
- public void onError(final Exception exception) {
+ protected void onErrorInternal(final Exception exception) {
try {
LOGGER.warn(
"Failed to transfer TabletInsertionEvent batch. Total failed events:
{}, related pipe names: {}",
@@ -129,4 +129,17 @@ public class PipeTransferTabletBatchEventHandler
implements AsyncMethodCallback<
connector.addFailureEventsToRetryQueue(events);
}
}
+
+ @Override
+ protected void doTransfer(
+ final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
+ throws TException {
+ client.pipeTransfer(req, this);
+ }
+
+ @Override
+ public void clearEventsReferenceCount() {
+ events.forEach(
+ event ->
event.clearReferenceCount(PipeTransferTabletBatchEventHandler.class.getName()));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
index 43dfb6aad29..107dccceec7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertNodeEventHandler.java
@@ -24,28 +24,28 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
public class PipeTransferTabletInsertNodeEventHandler
- extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+ extends PipeTransferTabletInsertionEventHandler {
public PipeTransferTabletInsertNodeEventHandler(
- PipeInsertNodeTabletInsertionEvent event,
- TPipeTransferReq req,
- IoTDBDataRegionAsyncConnector connector) {
+ final PipeInsertNodeTabletInsertionEvent event,
+ final TPipeTransferReq req,
+ final IoTDBDataRegionAsyncConnector connector) {
super(event, req, connector);
}
@Override
- protected void doTransfer(AsyncPipeDataTransferServiceClient client,
TPipeTransferReq req)
+ protected void doTransfer(
+ final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
client.pipeTransfer(req, this);
}
@Override
- protected void updateLeaderCache(TSStatus status) {
+ protected void updateLeaderCache(final TSStatus status) {
connector.updateLeaderCache(
((PipeInsertNodeTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
index 40561121724..f77d41b6662 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletInsertionEventHandler.java
@@ -30,12 +30,10 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class PipeTransferTabletInsertionEventHandler<E extends
TPipeTransferResp>
- implements AsyncMethodCallback<E> {
+public abstract class PipeTransferTabletInsertionEventHandler extends
PipeTransferTrackableHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTabletInsertionEventHandler.class);
@@ -43,15 +41,14 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
protected final TabletInsertionEvent event;
protected final TPipeTransferReq req;
- protected final IoTDBDataRegionAsyncConnector connector;
-
protected PipeTransferTabletInsertionEventHandler(
final TabletInsertionEvent event,
final TPipeTransferReq req,
final IoTDBDataRegionAsyncConnector connector) {
+ super(connector);
+
this.event = event;
this.req = req;
- this.connector = connector;
}
public void transfer(final AsyncPipeDataTransferServiceClient client) throws
TException {
@@ -63,19 +60,15 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
req.getBody().length);
}
- doTransfer(client, req);
+ tryTransfer(client, req);
}
- protected abstract void doTransfer(
- final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
- throws TException;
-
@Override
- public void onComplete(final TPipeTransferResp response) {
+ protected boolean onCompleteInternal(final TPipeTransferResp response) {
// Just in case
if (response == null) {
onError(new PipeException("TPipeTransferResp is null"));
- return;
+ return false;
}
final TSStatus status = response.getStatus();
@@ -96,13 +89,14 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
}
} catch (final Exception e) {
onError(e);
+ return false;
}
- }
- protected abstract void updateLeaderCache(final TSStatus status);
+ return true;
+ }
@Override
- public void onError(final Exception exception) {
+ protected void onErrorInternal(final Exception exception) {
try {
LOGGER.warn(
"Failed to transfer TabletInsertionEvent {} (committer key={},
commit id={}).",
@@ -116,4 +110,14 @@ public abstract class
PipeTransferTabletInsertionEventHandler<E extends TPipeTra
connector.addFailureEventToRetryQueue(event);
}
}
+
+ @Override
+ public void clearEventsReferenceCount() {
+ if (event instanceof EnrichedEvent) {
+ ((EnrichedEvent) event)
+
.clearReferenceCount(PipeTransferTabletInsertionEventHandler.class.getName());
+ }
+ }
+
+ protected abstract void updateLeaderCache(final TSStatus status);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
index 481b4830d93..e8c5c2c2f07 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTabletRawEventHandler.java
@@ -24,28 +24,27 @@ import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
-import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.thrift.TException;
-public class PipeTransferTabletRawEventHandler
- extends PipeTransferTabletInsertionEventHandler<TPipeTransferResp> {
+public class PipeTransferTabletRawEventHandler extends
PipeTransferTabletInsertionEventHandler {
public PipeTransferTabletRawEventHandler(
- PipeRawTabletInsertionEvent event,
- TPipeTransferReq req,
- IoTDBDataRegionAsyncConnector connector) {
+ final PipeRawTabletInsertionEvent event,
+ final TPipeTransferReq req,
+ final IoTDBDataRegionAsyncConnector connector) {
super(event, req, connector);
}
@Override
- protected void doTransfer(AsyncPipeDataTransferServiceClient client,
TPipeTransferReq req)
+ protected void doTransfer(
+ final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
throws TException {
client.pipeTransfer(req, this);
}
@Override
- protected void updateLeaderCache(TSStatus status) {
+ protected void updateLeaderCache(final TSStatus status) {
connector.updateLeaderCache(
((PipeRawTabletInsertionEvent) event).getDeviceId(),
status.getRedirectNode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
new file mode 100644
index 00000000000..033ce5353f5
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTrackableHandler.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.protocol.thrift.async.handler;
+
+import
org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient;
+import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
+import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+
+public abstract class PipeTransferTrackableHandler
+ implements AsyncMethodCallback<TPipeTransferResp> {
+
+ protected final IoTDBDataRegionAsyncConnector connector;
+
+ public PipeTransferTrackableHandler(final IoTDBDataRegionAsyncConnector
connector) {
+ this.connector = connector;
+ }
+
+ @Override
+ public void onComplete(final TPipeTransferResp response) {
+ if (!connector.isClosed()) {
+ if (onCompleteInternal(response)) {
+ // eliminate handler only when all transmissions corresponding to the
handler have been
+ // completed
+ connector.eliminateHandler(this);
+ }
+ } else {
+ clearEventsReferenceCount();
+ connector.eliminateHandler(this);
+ }
+ }
+
+ @Override
+ public void onError(final Exception exception) {
+ if (!connector.isClosed()) {
+ onErrorInternal(exception);
+ } else {
+ clearEventsReferenceCount();
+ }
+ connector.eliminateHandler(this);
+ }
+
+ /**
+ * Attempts to transfer data using the provided client and request.
+ *
+ * @param client the client used for data transfer
+ * @param req the request containing transfer details
+ * @return {@code true} if the transfer was initiated successfully, {@code
false} if the connector
+ * is closed
+ * @throws TException if an error occurs during the transfer
+ */
+ protected boolean tryTransfer(
+ final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
+ throws TException {
+ // track handler before checking if connector is closed
+ connector.trackHandler(this);
+ if (connector.isClosed()) {
+ clearEventsReferenceCount();
+ return false;
+ }
+ doTransfer(client, req);
+ return true;
+ }
+
+ /**
+ * @return {@code true} if all transmissions corresponding to the handler
have been completed,
+ * {@code false} otherwise
+ */
+ protected abstract boolean onCompleteInternal(final TPipeTransferResp
response);
+
+ protected abstract void onErrorInternal(final Exception exception);
+
+ protected abstract void doTransfer(
+ final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
+ throws TException;
+
+ public abstract void clearEventsReferenceCount();
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index a5f126d7123..2cac135ad0d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -38,7 +38,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.commons.io.FileUtils;
import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,13 +54,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTransferResp> {
+public class PipeTransferTsFileHandler extends PipeTransferTrackableHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTransferTsFileHandler.class);
- // Used to transfer the file
- private final IoTDBDataRegionAsyncConnector connector;
-
// Used to rate limit the transfer
private final Map<Pair<String, Long>, Double> pipeName2WeightMap;
@@ -98,7 +94,7 @@ public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTrans
final File modFile,
final boolean transferMod)
throws FileNotFoundException {
- this.connector = connector;
+ super(connector);
this.pipeName2WeightMap = pipeName2WeightMap;
@@ -168,7 +164,9 @@ public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTrans
client.getEndPoint(),
(long) (req.getBody().length * weight)));
- client.pipeTransfer(req, this);
+ if (!tryTransfer(client, req)) {
+ return;
+ }
}
return;
}
@@ -197,13 +195,15 @@ public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTrans
client.getEndPoint(),
(long) (req.getBody().length * weight)));
- client.pipeTransfer(req, this);
+ if (!tryTransfer(client, req)) {
+ return;
+ }
position += readLength;
}
@Override
- public void onComplete(final TPipeTransferResp response) {
+ protected boolean onCompleteInternal(final TPipeTransferResp response) {
if (isSealSignalSent.get()) {
try {
final TSStatus status = response.getStatus();
@@ -220,7 +220,7 @@ public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTrans
}
} catch (final Exception e) {
onError(e);
- return;
+ return false;
}
try {
@@ -262,7 +262,7 @@ public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTrans
client.returnSelf();
}
}
- return;
+ return true;
}
// If the isSealSignalSent is false, then the response must be a
PipeTransferFilePieceResp
@@ -292,11 +292,14 @@ public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTrans
transfer(clientManager, client);
} catch (final Exception e) {
onError(e);
+ return false;
}
+
+ return false; // due to seal transfer not yet completed
}
@Override
- public void onError(final Exception exception) {
+ protected void onErrorInternal(final Exception exception) {
try {
if (events.size() <= 1 || LOGGER.isDebugEnabled()) {
LOGGER.warn(
@@ -347,4 +350,16 @@ public class PipeTransferTsFileHandler implements
AsyncMethodCallback<TPipeTrans
}
}
}
+
+ @Override
+ protected void doTransfer(
+ final AsyncPipeDataTransferServiceClient client, final TPipeTransferReq
req)
+ throws TException {
+ client.pipeTransfer(req, this);
+ }
+
+ @Override
+ public void clearEventsReferenceCount() {
+ events.forEach(event ->
event.clearReferenceCount(PipeTransferTsFileHandler.class.getName()));
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index c5f08634df4..3e35a369b9e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -91,10 +91,14 @@ public class PipeDataRegionAssigner implements Closeable {
((PipeHeartbeatEvent) innerEvent).onPublished();
}
- if (!disruptor.isClosed()) {
- disruptor.publish(event);
- } else {
- onAssignedHook(event);
+ // use synchronized here for completely preventing reference count leaks
under extreme thread
+ // scheduling when closing
+ synchronized (this) {
+ if (!disruptor.isClosed()) {
+ disruptor.publish(event);
+ } else {
+ onAssignedHook(event);
+ }
}
}
@@ -216,12 +220,14 @@ public class PipeDataRegionAssigner implements Closeable {
* should not be used after calling this method.
*/
@Override
- public void close() {
+ // use synchronized here for completely preventing reference count leaks
under extreme thread
+ // scheduling when closing
+ public synchronized void close() {
PipeAssignerMetrics.getInstance().deregister(dataRegionId);
- matcher.clear();
final long startTime = System.currentTimeMillis();
disruptor.shutdown();
+ matcher.clear();
LOGGER.info(
"Pipe: Assigner on data region {} shutdown internal disruptor within
{} ms",
dataRegionId,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
index d95616e685d..9f7306ed9af 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/ref/PipePhantomReferenceManager.java
@@ -155,7 +155,7 @@ public abstract class PipePhantomReferenceManager {
}
if (referenceCount.get() >= 1) {
- LOGGER.error("PIPE EVENT RESOURCE LEAK DETECTED: {}", holderMessage);
+ LOGGER.error("PIPE EVENT RESOURCE LEAK DETECTED ({}): {}",
referenceCount, holderMessage);
finalizeResource();
}