This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch ssl_between_nodes in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 808e46b586e7179bc4e2601327fdfaf8e96570bf Author: HTHou <hao...@apache.org> AuthorDate: Thu Jul 17 16:33:39 2025 +0800 recover some code --- .../iotdb/rpc/NettyTNonBlockingTransport.java | 29 ++++----- .../iotdb/rpc/TNonblockingSocketWrapper.java | 3 +- .../sync/SyncDataNodeHeartbeatClientPool.java | 70 ---------------------- .../iotdb/confignode/manager/load/LoadManager.java | 2 +- .../iot/client/AsyncIoTConsensusServiceClient.java | 1 + .../AsyncConfigNodeInternalServiceClient.java | 1 + .../async/AsyncDataNodeExternalServiceClient.java | 1 + .../async/AsyncDataNodeInternalServiceClient.java | 1 + .../AsyncDataNodeMPPDataExchangeServiceClient.java | 1 + 9 files changed, 23 insertions(+), 86 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java index 1f2456c4386..d6911cea74d 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java @@ -96,6 +96,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { public NettyTNonBlockingTransport( String host, int port, + int connectTimeoutMs, String keystorePath, String keystorePassword, String truststorePath, @@ -104,8 +105,8 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { super(new TConfiguration()); this.host = host; this.port = port; - this.connectTimeoutMs = 60000; - this.sslHandshakeTimeoutMs = 30000; + this.connectTimeoutMs = connectTimeoutMs; + this.sslHandshakeTimeoutMs = connectTimeoutMs; this.eventLoopGroup = new NioEventLoopGroup(); this.bootstrap = new Bootstrap(); this.keystorePath = keystorePath; @@ -156,7 +157,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { .addListener( future -> { if (future.isSuccess()) { - logger.info( + logger.debug( "SSL handshake completed successfully for {}:{}", host, port); } else { logger.error( @@ -226,7 +227,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { try { byteBuf = readQueue.poll(); if (byteBuf == null) { - logger.info("No data available for ByteBuffer read"); + logger.debug("No data available for ByteBuffer read"); return 0; } @@ -235,7 +236,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { byte[] tempArray = new byte[available]; byteBuf.readBytes(tempArray); buffer.put(tempArray); - logger.info( + logger.debug( "Read {} bytes into ByteBuffer, remaining space: {}", available, buffer.remaining()); } @@ -243,7 +244,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { ByteBuf remaining = byteBuf.slice(); remaining.retain(); readQueue.offer(remaining); - logger.info("Put back {} remaining bytes", remaining.readableBytes()); + logger.debug("Put back {} remaining bytes", remaining.readableBytes()); } // Drain dummy channel to clear OP_READ @@ -306,7 +307,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { return 0; } - logger.info("Writing {} bytes from ByteBuffer", remaining); + logger.debug("Writing {} bytes from ByteBuffer", remaining); synchronized (lock) { ByteBuf byteBuf = Unpooled.buffer(remaining); @@ -361,7 +362,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { if (channel != null) { channel.close(); channel = null; - logger.info("Channel closed for {}:{}", host, port); + logger.debug("Channel closed for {}:{}", host, port); } try { if (dummyClient != null) { @@ -377,7 +378,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { logger.warn("Failed to close dummy channels", e); } eventLoopGroup.shutdownGracefully(); - logger.info("EventLoopGroup shutdown initiated"); + logger.debug("EventLoopGroup shutdown initiated"); } } @@ -393,7 +394,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { return false; } - logger.info("Starting connection to {}:{}", host, port); + logger.debug("Starting connection to {}:{}", host, port); try { // Initiate dummy connect, it will pend until accept @@ -406,7 +407,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { future1 -> { synchronized (lock) { if (future1.isSuccess()) { - logger.info("Connection established successfully to {}:{}", host, port); + logger.debug("Connection established successfully to {}:{}", host, port); channel = future1.channel(); connected.set(true); // Now accept the dummy connection to complete it @@ -483,7 +484,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - logger.info("Channel active: {}", ctx.channel().remoteAddress()); + logger.debug("Channel active: {}", ctx.channel().remoteAddress()); super.channelActive(ctx); } @@ -491,7 +492,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) msg; - logger.info("Received {} bytes", byteBuf.readableBytes()); + logger.debug("Received {} bytes", byteBuf.readableBytes()); synchronized (lock) { readQueue.offer(byteBuf.retain()); @@ -511,7 +512,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - logger.info("Channel inactive: {}", ctx.channel().remoteAddress()); + logger.debug("Channel inactive: {}", ctx.channel().remoteAddress()); synchronized (lock) { connected.set(false); connecting.set(false); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java index 9370d4a4373..1cfcd83e6db 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TNonblockingSocketWrapper.java @@ -62,13 +62,14 @@ public class TNonblockingSocketWrapper { public static TNonblockingTransport wrap( String host, int port, + int timeout, String keyStorePath, String keyStorePwd, String trustStorePath, String trustStorePwd) { try { return new NettyTNonBlockingTransport( - host, port, keyStorePath, keyStorePwd, trustStorePath, trustStorePwd); + host, port, timeout, keyStorePath, keyStorePwd, trustStorePath, trustStorePwd); } catch (TTransportException e) { // never happen return null; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeHeartbeatClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeHeartbeatClientPool.java deleted file mode 100644 index b32f023dac2..00000000000 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeHeartbeatClientPool.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.confignode.client.sync; - -import org.apache.iotdb.common.rpc.thrift.TEndPoint; -import org.apache.iotdb.commons.client.ClientPoolFactory; -import org.apache.iotdb.commons.client.IClientManager; -import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; -import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler; -import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq; -import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp; - -/** Synchronously send RPC requests to DataNodes. See queryengine.thrift for more details. */ -public class SyncDataNodeHeartbeatClientPool { - - private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager; - - private SyncDataNodeHeartbeatClientPool() { - clientManager = - new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>() - .createClientManager( - new ClientPoolFactory.SyncDataNodeHeartbeatServiceClientPoolFactory()); - } - - /** - * Only used in LoadManager. - * - * @param endPoint The specific DataNode - */ - public void getDataNodeHeartBeat( - TEndPoint endPoint, TDataNodeHeartbeatReq req, DataNodeHeartbeatHandler handler) { - try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) { - TDataNodeHeartbeatResp resp = client.getDataNodeHeartBeat(req); - handler.onComplete(resp); - } catch (Exception e) { - handler.onError(e); - } - } - - private static class SyncDataNodeHeartbeatClientPoolHolder { - - private static final SyncDataNodeHeartbeatClientPool INSTANCE = - new SyncDataNodeHeartbeatClientPool(); - - private SyncDataNodeHeartbeatClientPoolHolder() { - // Empty constructor - } - } - - public static SyncDataNodeHeartbeatClientPool getInstance() { - return SyncDataNodeHeartbeatClientPoolHolder.INSTANCE; - } -} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java index b2f8a9189b3..54dd582551d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java @@ -152,7 +152,7 @@ public class LoadManager { statisticsService.startLoadStatisticsService(); eventService.startEventService(); partitionBalancer.setupPartitionBalancer(); - // topologyService.startTopologyService(); + topologyService.startTopologyService(); } public void stopLoadServices() { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java index 4419ab2a72d..4a20b7c2f12 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java @@ -62,6 +62,7 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl ? TNonblockingSocketWrapper.wrap( endpoint.getIp(), endpoint.getPort(), + property.getConnectionTimeoutMs(), commonConfig.getKeyStorePath(), commonConfig.getKeyStorePwd(), commonConfig.getTrustStorePath(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java index 0fae68ec249..0f86f1b2a37 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeInternalServiceClient.java @@ -64,6 +64,7 @@ public class AsyncConfigNodeInternalServiceClient extends IConfigNodeRPCService. ? TNonblockingSocketWrapper.wrap( endpoint.getIp(), endpoint.getPort(), + property.getConnectionTimeoutMs(), commonConfig.getKeyStorePath(), commonConfig.getKeyStorePwd(), commonConfig.getTrustStorePath(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java index dad3eb02dc4..974df8f4222 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeExternalServiceClient.java @@ -64,6 +64,7 @@ public class AsyncDataNodeExternalServiceClient extends IDataNodeRPCService.Asyn ? TNonblockingSocketWrapper.wrap( endpoint.getIp(), endpoint.getPort(), + property.getConnectionTimeoutMs(), commonConfig.getKeyStorePath(), commonConfig.getKeyStorePwd(), commonConfig.getTrustStorePath(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java index b39fce3bb87..f087d18b0e4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java @@ -66,6 +66,7 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn ? TNonblockingSocketWrapper.wrap( endpoint.getIp(), endpoint.getPort(), + property.getConnectionTimeoutMs(), commonConfig.getKeyStorePath(), commonConfig.getKeyStorePwd(), commonConfig.getTrustStorePath(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java index 685105ef075..964ff7f1d94 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java @@ -62,6 +62,7 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe ? TNonblockingSocketWrapper.wrap( endpoint.getIp(), endpoint.getPort(), + property.getConnectionTimeoutMs(), commonConfig.getKeyStorePath(), commonConfig.getKeyStorePwd(), commonConfig.getTrustStorePath(),