This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new e8cbc76edd5 [IOTDB-6013] Pipe: pipe-related threads (pools) should not 
be initialized unless necessary (#10229)
e8cbc76edd5 is described below

commit e8cbc76edd5c80b12682120dc8c4b8a2514debf3
Author: Steve Yurong Su <[email protected]>
AuthorDate: Tue Jun 20 13:53:13 2023 +0800

    [IOTDB-6013] Pipe: pipe-related threads (pools) should not be initialized 
unless necessary (#10229)
    
    The number of resident threads has increased, which is not expected.
    
    There is an asynchronous client thread pool introduced to improve the 
performance limit of syncing data from Pipe to another iotdb instance, similar 
to the asynchronous client thread pool of IoTConsensus. It can be seen in the 
compute thread monitoring item. However, there is a problem that initialization 
should not be performed when it is not necessary.
---
 .../manager/pipe/runtime/PipeHeartbeatParser.java  | 47 ++++++++++++----------
 .../pipe/runtime/PipeLeaderChangeHandler.java      | 14 ++++---
 .../pipe/runtime/PipeRuntimeCoordinator.java       | 22 ++++++++--
 .../pipe/connector/v2/IoTDBThriftConnectorV2.java  | 47 ++++++++++++++--------
 4 files changed, 84 insertions(+), 46 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
index 537a5e7d1cc..5e5564758dc 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeHeartbeatParser.java
@@ -90,24 +90,29 @@ public class PipeHeartbeatParser {
       return;
     }
 
-    PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
-        () -> {
-          if (!pipeMetaByteBufferListFromDataNode.isEmpty()) {
-            parseHeartbeatAndSaveMetaChangeLocally(dataNodeId, 
pipeMetaByteBufferListFromDataNode);
-          }
-
-          if (canSubmitHandleMetaChangeProcedure.get()
-              && (needWriteConsensusOnConfigNodes.get() || 
needPushPipeMetaToDataNodes.get())) {
-            configManager
-                .getProcedureManager()
-                .pipeHandleMetaChange(
-                    needWriteConsensusOnConfigNodes.get(), 
needPushPipeMetaToDataNodes.get());
-
-            // reset flags after procedure is submitted
-            needWriteConsensusOnConfigNodes.set(false);
-            needPushPipeMetaToDataNodes.set(false);
-          }
-        });
+    configManager
+        .getPipeManager()
+        .getPipeRuntimeCoordinator()
+        .getProcedureSubmitter()
+        .submit(
+            () -> {
+              if (!pipeMetaByteBufferListFromDataNode.isEmpty()) {
+                parseHeartbeatAndSaveMetaChangeLocally(
+                    dataNodeId, pipeMetaByteBufferListFromDataNode);
+              }
+
+              if (canSubmitHandleMetaChangeProcedure.get()
+                  && (needWriteConsensusOnConfigNodes.get() || 
needPushPipeMetaToDataNodes.get())) {
+                configManager
+                    .getProcedureManager()
+                    .pipeHandleMetaChange(
+                        needWriteConsensusOnConfigNodes.get(), 
needPushPipeMetaToDataNodes.get());
+
+                // reset flags after procedure is submitted
+                needWriteConsensusOnConfigNodes.set(false);
+                needPushPipeMetaToDataNodes.set(false);
+              }
+            });
   }
 
   private void parseHeartbeatAndSaveMetaChangeLocally(
@@ -197,9 +202,9 @@ public class PipeHeartbeatParser {
               needPushPipeMetaToDataNodes.set(true);
 
               LOGGER.warn(
-                  String.format(
-                      "Detect PipeRuntimeCriticalException %s from DataNode, 
stop pipe %s.",
-                      exception, pipeName));
+                  "Detect PipeRuntimeCriticalException {} from DataNode, stop 
pipe {}.",
+                  exception,
+                  pipeName);
             }
 
             if (exception instanceof PipeRuntimeConnectorCriticalException) {
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
index 3f93fb022b9..1e91bd55cc4 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeLeaderChangeHandler.java
@@ -78,10 +78,14 @@ public class PipeLeaderChangeHandler implements 
IClusterStatusSubscriber {
     }
 
     // submit procedure in an async way to avoid blocking the caller
-    PipeRuntimeCoordinator.PROCEDURE_SUBMITTER.submit(
-        () ->
-            configManager
-                .getProcedureManager()
-                
.pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap));
+    configManager
+        .getPipeManager()
+        .getPipeRuntimeCoordinator()
+        .getProcedureSubmitter()
+        .submit(
+            () ->
+                configManager
+                    .getProcedureManager()
+                    
.pipeHandleLeaderChange(dataRegionGroupToOldAndNewLeaderPairMap));
   }
 }
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 23398038b55..59ed8e5056c 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -31,24 +31,40 @@ import org.jetbrains.annotations.NotNull;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeRuntimeCoordinator implements IClusterStatusSubscriber {
 
   // shared thread pool in the runtime package
-  static final ExecutorService PROCEDURE_SUBMITTER =
-      IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-          ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName());
+  private static final AtomicReference<ExecutorService> 
procedureSubmitterHolder =
+      new AtomicReference<>();
+  private final ExecutorService procedureSubmitter;
 
   private final PipeLeaderChangeHandler pipeLeaderChangeHandler;
   private final PipeHeartbeatParser pipeHeartbeatParser;
   private final PipeMetaSyncer pipeMetaSyncer;
 
   public PipeRuntimeCoordinator(ConfigManager configManager) {
+    if (procedureSubmitterHolder.get() == null) {
+      synchronized (PipeRuntimeCoordinator.class) {
+        if (procedureSubmitterHolder.get() == null) {
+          procedureSubmitterHolder.set(
+              IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+                  ThreadName.PIPE_RUNTIME_PROCEDURE_SUBMITTER.getName()));
+        }
+      }
+    }
+    procedureSubmitter = procedureSubmitterHolder.get();
+
     pipeLeaderChangeHandler = new PipeLeaderChangeHandler(configManager);
     pipeHeartbeatParser = new PipeHeartbeatParser(configManager);
     pipeMetaSyncer = new PipeMetaSyncer(configManager);
   }
 
+  public ExecutorService getProcedureSubmitter() {
+    return procedureSubmitter;
+  }
+
   @Override
   public void onClusterStatisticsChanged(StatisticsChangeEvent event) {
     pipeLeaderChangeHandler.onClusterStatisticsChanged(event);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
index 2773e24b9c2..a6b203077a6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/connector/v2/IoTDBThriftConnectorV2.java
@@ -71,14 +71,15 @@ import static 
org.apache.iotdb.db.pipe.config.constant.PipeConnectorConstant.CON
 public class IoTDBThriftConnectorV2 implements PipeConnector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftConnectorV2.class);
+  private static final String FAILED_TO_BORROW_CLIENT_FORMATTER =
+      "Failed to borrow client from client pool for receiver %s:%s.";
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
-  private static final IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
-      ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER =
-          new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
-              .createClientManager(
-                  new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+  private static volatile IClientManager<TEndPoint, 
AsyncPipeDataTransferServiceClient>
+      asyncPipeDataTransferClientManagerHolder;
+  private final IClientManager<TEndPoint, AsyncPipeDataTransferServiceClient>
+      asyncPipeDataTransferClientManager;
 
   private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
@@ -89,6 +90,21 @@ public class IoTDBThriftConnectorV2 implements PipeConnector 
{
 
   private List<TEndPoint> nodeUrls;
 
+  public IoTDBThriftConnectorV2() {
+    if (asyncPipeDataTransferClientManagerHolder == null) {
+      synchronized (IoTDBThriftConnectorV2.class) {
+        if (asyncPipeDataTransferClientManagerHolder == null) {
+          asyncPipeDataTransferClientManagerHolder =
+              new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
+                  .createClientManager(
+                      new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory());
+        }
+      }
+    }
+
+    asyncPipeDataTransferClientManager = 
asyncPipeDataTransferClientManagerHolder;
+  }
+
   public synchronized void commit(long requestCommitId, @Nullable 
EnrichedEvent enrichedEvent) {
     commitQueue.offer(
         new Pair<>(
@@ -148,11 +164,11 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
         throw new PipeException(String.format("Handshake error, result status 
%s.", resp.status));
       }
     } catch (TException e) {
-      LOGGER.warn(
+      throw new PipeConnectionException(
           String.format(
-              "Connect to receiver %s:%s error.", firstNodeUrl.getIp(), 
firstNodeUrl.getPort()),
+              "Connect to receiver %s:%s error: %s",
+              e.getMessage(), firstNodeUrl.getIp(), firstNodeUrl.getPort()),
           e);
-      throw new PipeConnectionException(e.getMessage(), e);
     }
   }
 
@@ -201,7 +217,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client =
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
 
       try {
         pipeTransferInsertNodeReqHandler.transfer(client);
@@ -216,8 +232,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
       pipeTransferInsertNodeReqHandler.onError(ex);
       LOGGER.warn(
           String.format(
-              "Failed to borrow client from client pool for receiver %s:%s.",
-              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
     }
   }
@@ -229,7 +244,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client =
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
 
       try {
         pipeTransferTabletReqHandler.transfer(client);
@@ -244,8 +259,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
       pipeTransferTabletReqHandler.onError(ex);
       LOGGER.warn(
           String.format(
-              "Failed to borrow client from client pool for receiver %s:%s.",
-              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
     }
   }
@@ -275,7 +289,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
 
     try {
       final AsyncPipeDataTransferServiceClient client =
-          ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER.borrowClient(targetNodeUrl);
+          asyncPipeDataTransferClientManager.borrowClient(targetNodeUrl);
 
       try {
         pipeTransferTsFileInsertionEventHandler.transfer(client);
@@ -290,8 +304,7 @@ public class IoTDBThriftConnectorV2 implements 
PipeConnector {
       pipeTransferTsFileInsertionEventHandler.onError(ex);
       LOGGER.warn(
           String.format(
-              "Failed to borrow client from client pool for receiver %s:%s.",
-              targetNodeUrl.getIp(), targetNodeUrl.getPort()),
+              FAILED_TO_BORROW_CLIENT_FORMATTER, targetNodeUrl.getIp(), 
targetNodeUrl.getPort()),
           ex);
     }
   }

Reply via email to