This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch ssl_between_nodes
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 808e46b586e7179bc4e2601327fdfaf8e96570bf
Author: HTHou <hao...@apache.org>
AuthorDate: Thu Jul 17 16:33:39 2025 +0800

    recover some code
---
 .../iotdb/rpc/NettyTNonBlockingTransport.java      | 29 ++++-----
 .../iotdb/rpc/TNonblockingSocketWrapper.java       |  3 +-
 .../sync/SyncDataNodeHeartbeatClientPool.java      | 70 ----------------------
 .../iotdb/confignode/manager/load/LoadManager.java |  2 +-
 .../iot/client/AsyncIoTConsensusServiceClient.java |  1 +
 .../AsyncConfigNodeInternalServiceClient.java      |  1 +
 .../async/AsyncDataNodeExternalServiceClient.java  |  1 +
 .../async/AsyncDataNodeInternalServiceClient.java  |  1 +
 .../AsyncDataNodeMPPDataExchangeServiceClient.java |  1 +
 9 files changed, 23 insertions(+), 86 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
index 1f2456c4386..d6911cea74d 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
@@ -96,6 +96,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
   public NettyTNonBlockingTransport(
       String host,
       int port,
+      int connectTimeoutMs,
       String keystorePath,
       String keystorePassword,
       String truststorePath,
@@ -104,8 +105,8 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
     super(new TConfiguration());
     this.host = host;
     this.port = port;
-    this.connectTimeoutMs = 60000;
-    this.sslHandshakeTimeoutMs = 30000;
+    this.connectTimeoutMs = connectTimeoutMs;
+    this.sslHandshakeTimeoutMs = connectTimeoutMs;
     this.eventLoopGroup = new NioEventLoopGroup();
     this.bootstrap = new Bootstrap();
     this.keystorePath = keystorePath;
@@ -156,7 +157,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
                     .addListener(
                         future -> {
                           if (future.isSuccess()) {
-                            logger.info(
+                            logger.debug(
                                 "SSL handshake completed successfully for 
{}:{}", host, port);
                           } else {
                             logger.error(
@@ -226,7 +227,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
     try {
       byteBuf = readQueue.poll();
       if (byteBuf == null) {
-        logger.info("No data available for ByteBuffer read");
+        logger.debug("No data available for ByteBuffer read");
         return 0;
       }
 
@@ -235,7 +236,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
         byte[] tempArray = new byte[available];
         byteBuf.readBytes(tempArray);
         buffer.put(tempArray);
-        logger.info(
+        logger.debug(
             "Read {} bytes into ByteBuffer, remaining space: {}", available, 
buffer.remaining());
       }
 
@@ -243,7 +244,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
         ByteBuf remaining = byteBuf.slice();
         remaining.retain();
         readQueue.offer(remaining);
-        logger.info("Put back {} remaining bytes", remaining.readableBytes());
+        logger.debug("Put back {} remaining bytes", remaining.readableBytes());
       }
 
       // Drain dummy channel to clear OP_READ
@@ -306,7 +307,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       return 0;
     }
 
-    logger.info("Writing {} bytes from ByteBuffer", remaining);
+    logger.debug("Writing {} bytes from ByteBuffer", remaining);
 
     synchronized (lock) {
       ByteBuf byteBuf = Unpooled.buffer(remaining);
@@ -361,7 +362,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       if (channel != null) {
         channel.close();
         channel = null;
-        logger.info("Channel closed for {}:{}", host, port);
+        logger.debug("Channel closed for {}:{}", host, port);
       }
       try {
         if (dummyClient != null) {
@@ -377,7 +378,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
         logger.warn("Failed to close dummy channels", e);
       }
       eventLoopGroup.shutdownGracefully();
-      logger.info("EventLoopGroup shutdown initiated");
+      logger.debug("EventLoopGroup shutdown initiated");
     }
   }
 
@@ -393,7 +394,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       return false;
     }
 
-    logger.info("Starting connection to {}:{}", host, port);
+    logger.debug("Starting connection to {}:{}", host, port);
 
     try {
       // Initiate dummy connect, it will pend until accept
@@ -406,7 +407,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
               future1 -> {
                 synchronized (lock) {
                   if (future1.isSuccess()) {
-                    logger.info("Connection established successfully to 
{}:{}", host, port);
+                    logger.debug("Connection established successfully to 
{}:{}", host, port);
                     channel = future1.channel();
                     connected.set(true);
                     // Now accept the dummy connection to complete it
@@ -483,7 +484,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
-      logger.info("Channel active: {}", ctx.channel().remoteAddress());
+      logger.debug("Channel active: {}", ctx.channel().remoteAddress());
       super.channelActive(ctx);
     }
 
@@ -491,7 +492,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
       if (msg instanceof ByteBuf) {
         ByteBuf byteBuf = (ByteBuf) msg;
-        logger.info("Received {} bytes", byteBuf.readableBytes());
+        logger.debug("Received {} bytes", byteBuf.readableBytes());
 
         synchronized (lock) {
           readQueue.offer(byteBuf.retain());
@@ -511,7 +512,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-      logger.info("Channel inactive: {}", ctx.channel().remoteAddress());
+      logger.debug("Channel inactive: {}", ctx.channel().remoteAddress());
       synchronized (lock) {
         connected.set(false);
         connecting.set(false);
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
index 9370d4a4373..1cfcd83e6db 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java
@@ -62,13 +62,14 @@ public class TNonblockingSocketWrapper {
   public static TNonblockingTransport wrap(
       String host,
       int port,
+      int timeout,
       String keyStorePath,
       String keyStorePwd,
       String trustStorePath,
       String trustStorePwd) {
     try {
       return new NettyTNonBlockingTransport(
-          host, port, keyStorePath, keyStorePwd, trustStorePath, 
trustStorePwd);
+          host, port, timeout, keyStorePath, keyStorePwd, trustStorePath, 
trustStorePwd);
     } catch (TTransportException e) {
       // never happen
       return null;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeHeartbeatClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeHeartbeatClientPool.java
deleted file mode 100644
index b32f023dac2..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeHeartbeatClientPool.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.client.sync;
-
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientPoolFactory;
-import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
-import 
org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
-import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
-import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
-
-/** Synchronously send RPC requests to DataNodes. See queryengine.thrift for 
more details. */
-public class SyncDataNodeHeartbeatClientPool {
-
-  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
clientManager;
-
-  private SyncDataNodeHeartbeatClientPool() {
-    clientManager =
-        new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
-            .createClientManager(
-                new 
ClientPoolFactory.SyncDataNodeHeartbeatServiceClientPoolFactory());
-  }
-
-  /**
-   * Only used in LoadManager.
-   *
-   * @param endPoint The specific DataNode
-   */
-  public void getDataNodeHeartBeat(
-      TEndPoint endPoint, TDataNodeHeartbeatReq req, DataNodeHeartbeatHandler 
handler) {
-    try (SyncDataNodeInternalServiceClient client = 
clientManager.borrowClient(endPoint)) {
-      TDataNodeHeartbeatResp resp = client.getDataNodeHeartBeat(req);
-      handler.onComplete(resp);
-    } catch (Exception e) {
-      handler.onError(e);
-    }
-  }
-
-  private static class SyncDataNodeHeartbeatClientPoolHolder {
-
-    private static final SyncDataNodeHeartbeatClientPool INSTANCE =
-        new SyncDataNodeHeartbeatClientPool();
-
-    private SyncDataNodeHeartbeatClientPoolHolder() {
-      // Empty constructor
-    }
-  }
-
-  public static SyncDataNodeHeartbeatClientPool getInstance() {
-    return SyncDataNodeHeartbeatClientPoolHolder.INSTANCE;
-  }
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index b2f8a9189b3..54dd582551d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -152,7 +152,7 @@ public class LoadManager {
     statisticsService.startLoadStatisticsService();
     eventService.startEventService();
     partitionBalancer.setupPartitionBalancer();
-    //    topologyService.startTopologyService();
+    topologyService.startTopologyService();
   }
 
   public void stopLoadServices() {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index 4419ab2a72d..4a20b7c2f12 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -62,6 +62,7 @@ public class AsyncIoTConsensusServiceClient extends 
IoTConsensusIService.AsyncCl
             ? TNonblockingSocketWrapper.wrap(
                 endpoint.getIp(),
                 endpoint.getPort(),
+                property.getConnectionTimeoutMs(),
                 commonConfig.getKeyStorePath(),
                 commonConfig.getKeyStorePwd(),
                 commonConfig.getTrustStorePath(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
index 0fae68ec249..0f86f1b2a37 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java
@@ -64,6 +64,7 @@ public class AsyncConfigNodeInternalServiceClient extends 
IConfigNodeRPCService.
             ? TNonblockingSocketWrapper.wrap(
                 endpoint.getIp(),
                 endpoint.getPort(),
+                property.getConnectionTimeoutMs(),
                 commonConfig.getKeyStorePath(),
                 commonConfig.getKeyStorePwd(),
                 commonConfig.getTrustStorePath(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
index dad3eb02dc4..974df8f4222 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java
@@ -64,6 +64,7 @@ public class AsyncDataNodeExternalServiceClient extends 
IDataNodeRPCService.Asyn
             ? TNonblockingSocketWrapper.wrap(
                 endpoint.getIp(),
                 endpoint.getPort(),
+                property.getConnectionTimeoutMs(),
                 commonConfig.getKeyStorePath(),
                 commonConfig.getKeyStorePwd(),
                 commonConfig.getTrustStorePath(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index b39fce3bb87..f087d18b0e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -66,6 +66,7 @@ public class AsyncDataNodeInternalServiceClient extends 
IDataNodeRPCService.Asyn
             ? TNonblockingSocketWrapper.wrap(
                 endpoint.getIp(),
                 endpoint.getPort(),
+                property.getConnectionTimeoutMs(),
                 commonConfig.getKeyStorePath(),
                 commonConfig.getKeyStorePwd(),
                 commonConfig.getTrustStorePath(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 685105ef075..964ff7f1d94 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -62,6 +62,7 @@ public class AsyncDataNodeMPPDataExchangeServiceClient 
extends MPPDataExchangeSe
             ? TNonblockingSocketWrapper.wrap(
                 endpoint.getIp(),
                 endpoint.getPort(),
+                property.getConnectionTimeoutMs(),
                 commonConfig.getKeyStorePath(),
                 commonConfig.getKeyStorePwd(),
                 commonConfig.getTrustStorePath(),

Reply via email to