This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch check-shit
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/check-shit by this push:
new 879ef8064b0 opt logs
879ef8064b0 is described below
commit 879ef8064b0d8d6e6e05d0a78e255eecbfbcd01c
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jul 11 10:26:06 2025 +0800
opt logs
---
.../client/IoTDBDataNodeAsyncClientManager.java | 8 +++--
.../async/IoTDBDataRegionAsyncConnector.java | 38 +++++++++++++++++-----
.../async/handler/PipeTransferTsFileHandler.java | 28 +++++++++-------
.../async/AsyncPipeDataTransferServiceClient.java | 15 ---------
.../connector/protocol/IoTDBSslSyncConnector.java | 2 +-
5 files changed, 53 insertions(+), 38 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index d8f886f496e..8b23e52710f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -299,15 +299,19 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
client.resetMethodStateIfStopped();
throw e;
} finally {
- client.setShouldReturnSelf(true);
if (isClosed) {
try {
client.close();
client.invalidateAll();
} catch (final Exception e) {
- LOGGER.warn("111");
+ LOGGER.warn(
+ "Failed to close client {}:{} after handshake failure when the
manager is closed.",
+ targetNodeUrl.getIp(),
+ targetNodeUrl.getPort(),
+ e);
}
}
+ client.setShouldReturnSelf(true);
client.returnSelf();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 7d88103f815..176bcd94cfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -74,6 +74,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -431,15 +432,34 @@ public class IoTDBDataRegionAsyncConnector extends
IoTDBConnector {
private void transfer(final PipeTransferTsFileHandler
pipeTransferTsFileHandler)
throws Exception {
transferTsFileCounter.incrementAndGet();
- AsyncPipeDataTransferServiceClient client = null;
- try {
- client = transferTsFileClientManager.borrowClient();
- pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
- } catch (final Exception ex) {
- logOnClientException(client, ex);
- pipeTransferTsFileHandler.onError(ex);
- } finally {
- transferTsFileCounter.decrementAndGet();
+ CompletableFuture<Void> completableFuture =
+ CompletableFuture.supplyAsync(
+ () -> {
+ AsyncPipeDataTransferServiceClient client = null;
+ try {
+ client = transferTsFileClientManager.borrowClient();
+
pipeTransferTsFileHandler.transfer(transferTsFileClientManager, client);
+ } catch (final Exception ex) {
+ logOnClientException(client, ex);
+ pipeTransferTsFileHandler.onError(ex);
+ } finally {
+ transferTsFileCounter.decrementAndGet();
+ }
+ return null;
+ },
+ executor);
+
+ if (PipeConfig.getInstance().isTransferTsFileSync()) {
+ try {
+ completableFuture.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.error("Transfer tsfile event asynchronously was interrupted.",
e);
+ throw new PipeException("Transfer tsfile event asynchronously was
interrupted.", e);
+ } catch (Exception e) {
+ LOGGER.error("Failed to transfer tsfile event asynchronously.", e);
+ throw e;
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 8dc74dbf4c3..f56ceeed0d2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -431,19 +431,25 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
}
private void returnClientIfNecessary() {
- if (client != null) {
- client.setShouldReturnSelf(true);
- if (connector.isClosed()) {
- try {
- client.close();
- client.invalidateAll();
- } catch (final Exception e) {
- LOGGER.warn("111");
- }
+ if (client == null) {
+ return;
+ }
+
+ if (connector.isClosed()) {
+ try {
+ client.close();
+ client.invalidateAll();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to close or invalidate client when connector is closed.
Client: {}, Exception: {}",
+ client,
+ e.getMessage(),
+ e);
}
- client.returnSelf();
- client = null;
}
+ client.setShouldReturnSelf(true);
+ client.returnSelf();
+ client = null;
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
index eadbb4435ab..c7b1aee0d1c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java
@@ -72,14 +72,12 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
this.printLogWhenEncounterException =
property.isPrintLogWhenEncounterException();
this.endpoint = endpoint;
this.clientManager = clientManager;
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, constructor", id);
}
@Override
public void onComplete() {
super.onComplete();
returnSelf();
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, onComplete", id);
}
@Override
@@ -87,22 +85,18 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
super.onError(e);
ThriftClient.resolveException(e, this);
returnSelf();
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, onError", id, e);
}
@Override
public void invalidate() {
if (!hasError()) {
super.onError(new Exception(String.format("This client %d has been
invalidated", id)));
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidate 1",
id);
}
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidate 2",
id, new Exception());
}
@Override
public void invalidateAll() {
clientManager.clear(endpoint);
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, invalidateAll",
id, new Exception());
}
@Override
@@ -116,14 +110,8 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
*/
public void returnSelf() {
if (shouldReturnSelf.get()) {
- if (clientManager.isClosed()) {
- this.close();
- this.invalidateAll();
- }
clientManager.returnClient(endpoint, this);
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 1",
id);
}
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, returnSelf 2",
id);
}
public void setShouldReturnSelf(final boolean shouldReturnSelf) {
@@ -179,13 +167,10 @@ public class AsyncPipeDataTransferServiceClient extends
IClientRPCService.AsyncC
if (___transport != null && ___transport.isOpen()) {
___transport.close();
LOGGER.warn("Manually closing transport to prevent resource leakage.");
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {},
resetMethodState 1", id);
}
___currentMethod = null;
LOGGER.info("Method state has been reset due to manager not running.");
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {},
resetMethodState 2", id);
}
- LOGGER.warn("AsyncPipeDataTransferServiceClient id = {}, resetMethodState
3", id);
}
public String getIp() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
index 82886e11ffe..44e91874641 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBSslSyncConnector.java
@@ -66,7 +66,7 @@ public abstract class IoTDBSslSyncConnector extends
IoTDBConnector {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSslSyncConnector.class);
- protected volatile IoTDBSyncClientManager clientManager;
+ private volatile IoTDBSyncClientManager clientManager;
protected IoTDBSyncClientManager getClientManager() {
if (clientManager == null) {