This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch ssl_between_nodes in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eaff16eecd8f56b47b1d5c22ed7c03391d09e664 Author: HTHou <hao...@apache.org> AuthorDate: Tue Jul 22 15:34:49 2025 +0800 finishing dev --- .../iotdb/rpc/NettyTNonBlockingTransport.java | 49 +++++++++++----------- iotdb-core/ainode/ainode/core/config.py | 45 +++++++++++++------- iotdb-core/ainode/ainode/core/ingress/iotdb.py | 9 +++- iotdb-core/ainode/ainode/core/rpc/client.py | 18 +++++--- iotdb-core/ainode/ainode/core/rpc/service.py | 17 +++++--- .../ainode/resources/conf/iotdb-ainode.properties | 32 +++++++++++++- .../conf/iotdb-system.properties.template | 2 +- .../iotdb/commons/conf/CommonDescriptor.java | 2 +- .../service/AbstractThriftServiceThread.java | 7 +++- .../iotdb/commons/service/ThriftServiceThread.java | 1 + 10 files changed, 126 insertions(+), 56 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java index d6911cea74d..a76a79fcee2 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java @@ -160,7 +160,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { logger.debug( "SSL handshake completed successfully for {}:{}", host, port); } else { - logger.error( + logger.debug( "SSL handshake failed for {}:{}: {}", host, port, @@ -254,7 +254,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { discard.clear(); } } catch (IOException e) { - logger.warn("Failed to drain dummy channel", e); + logger.debug("Failed to drain dummy channel", e); } if (readingResponseSize) { // Trigger OP_READ on dummy by writing dummy byte @@ -270,7 +270,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { return available; } catch (Exception e) { - logger.error("ByteBuffer read failed: {}", e.getMessage()); + logger.debug("ByteBuffer read failed: {}", e.getMessage()); throw new TTransportException(TTransportException.UNKNOWN, "Read failed", e); } finally { if (byteBuf != null) { @@ -290,7 +290,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); return read(buffer); } catch (Exception e) { - logger.error("Read failed: {}", e.getMessage()); + logger.debug("Read failed: {}", e.getMessage()); throw new TTransportException(TTransportException.UNKNOWN, "Read failed", e); } } @@ -320,13 +320,13 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { if (future1.isSuccess()) { logger.debug("ByteBuffer write completed successfully: {} bytes", remaining); } else { - logger.error("ByteBuffer write failed: {}", future1.cause().getMessage()); + logger.debug("ByteBuffer write failed: {}", future1.cause().getMessage()); } }); return remaining; } catch (Exception e) { byteBuf.release(); - logger.error("ByteBuffer write failed: {}", e.getMessage()); + logger.debug("ByteBuffer write failed: {}", e.getMessage()); throw new TTransportException(TTransportException.UNKNOWN, "Write failed", e); } } @@ -375,7 +375,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { dummyServer.close(); } } catch (IOException e) { - logger.warn("Failed to close dummy channels", e); + logger.debug("Failed to close dummy channels", e); } eventLoopGroup.shutdownGracefully(); logger.debug("EventLoopGroup shutdown initiated"); @@ -422,10 +422,10 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { } } } catch (IOException e) { - logger.warn("Failed to accept dummy connection", e); + logger.debug("Failed to accept dummy connection", e); } } else { - logger.error( + logger.debug( "Connection failed to {}:{}: {}", host, port, future1.cause().getMessage()); } connecting.set(false); @@ -435,8 +435,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { return false; // Return false to indicate pending connect for dummy } catch (Exception e) { connecting.set(false); - logger.error("Failed to start connection to {}:{}", host, port, e.getMessage()); - throw new IOException("Failed to start connection", e); + return false; } } @@ -493,20 +492,22 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { if (msg instanceof ByteBuf) { ByteBuf byteBuf = (ByteBuf) msg; logger.debug("Received {} bytes", byteBuf.readableBytes()); - - synchronized (lock) { - readQueue.offer(byteBuf.retain()); - // Trigger OP_READ on dummy by writing dummy byte - if (dummyServerAccepted != null) { - ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]); - dummyServerAccepted.write(dummyByte); - } - // Wakeup selector if needed - if (selector != null) { - selector.wakeup(); + try { + synchronized (lock) { + readQueue.offer(byteBuf.retain()); + // Trigger OP_READ on dummy by writing dummy byte + if (dummyServerAccepted != null) { + ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]); + dummyServerAccepted.write(dummyByte); + } + // Wakeup selector if needed + if (selector != null) { + selector.wakeup(); + } } + } finally { + byteBuf.release(); } - byteBuf.release(); } } @@ -522,7 +523,7 @@ public class NettyTNonBlockingTransport extends TNonblockingTransport { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - logger.error("Channel exception: {}", cause.getMessage()); + logger.debug("Channel exception: {}", cause.getMessage()); synchronized (lock) { ctx.close(); } diff --git a/iotdb-core/ainode/ainode/core/config.py b/iotdb-core/ainode/ainode/core/config.py index 832e45fed8b..c04edc83bed 100644 --- a/iotdb-core/ainode/ainode/core/config.py +++ b/iotdb-core/ainode/ainode/core/config.py @@ -75,9 +75,11 @@ class AINodeConfig(object): self._ain_thrift_compression_enabled = AINODE_THRIFT_COMPRESSION_ENABLED # use for ssl - self._ain_thrift_ssl_enabled = False + self._ain_cluster_ingress_ssl_enabled = False + self._ain_internal_ssl_enabled = False self._ain_thrift_ssl_ca_file = None self._ain_thrift_ssl_cert_file = None + self._ain_thrift_ssl_key_file = None # Cache number of model storage to avoid repeated loading self._ain_model_storage_cache_size = 30 @@ -181,30 +183,38 @@ class AINodeConfig(object): ) -> None: self._ain_thrift_compression_enabled = ain_thrift_compression_enabled - def get_ain_thrift_ssl_enabled(self) -> bool: - return self._ain_thrift_ssl_enabled + def get_ain_cluster_ingress_ssl_enabled(self) -> bool: + return self._ain_cluster_ingress_ssl_enabled - def set_ain_thrift_ssl_enabled( - self, ain_thrift_ssl_enabled: int + def set_ain_cluster_ingress_ssl_enabled( + self, ain_cluster_ingress_ssl_enabled: int ) -> None: - self._ain_thrift_ssl_enabled = ain_thrift_ssl_enabled + self._ain_cluster_ingress_ssl_enabled = ain_cluster_ingress_ssl_enabled + + def get_ain_internal_ssl_enabled(self) -> bool: + return self._ain_internal_ssl_enabled + + def set_ain_internal_ssl_enabled(self, ain_internal_ssl_enabled: int) -> None: + self._ain_internal_ssl_enabled = ain_internal_ssl_enabled def get_ain_thrift_ssl_ca_file(self) -> str: return self._ain_thrift_ssl_ca_file - def set_ain_thrift_ssl_ca_file( - self, ain_thrift_ssl_ca_file: str - ) -> None: + def set_ain_thrift_ssl_ca_file(self, ain_thrift_ssl_ca_file: str) -> None: self._ain_thrift_ssl_ca_file = ain_thrift_ssl_ca_file def get_ain_thrift_ssl_cert_file(self) -> str: return self._ain_thrift_ssl_cert_file - def set_ain_thrift_ssl_cert_file( - self, ain_thrift_ssl_cert_file: str - ) -> None: + def set_ain_thrift_ssl_cert_file(self, ain_thrift_ssl_cert_file: str) -> None: self._ain_thrift_ssl_cert_file = ain_thrift_ssl_cert_file + def get_ain_thrift_ssl_key_file(self) -> str: + return self._ain_thrift_ssl_key_file + + def set_ain_thrift_ssl_key_file(self, ain_thrift_ssl_key_file: str) -> None: + self._ain_thrift_ssl_key_file = ain_thrift_ssl_key_file + def get_ain_model_storage_cache_size(self) -> int: return self._ain_model_storage_cache_size @@ -338,9 +348,9 @@ class AINodeDescriptor(object): int(file_configs["ain_thrift_compression_enabled"]) ) - if "ain_thrift_ssl_enabled" in config_keys: - self._config.set_ain_thrift_ssl_enabled( - int(file_configs["ain_thrift_ssl_enabled"]) + if "ain_internal_ssl_enabled" in config_keys: + self._config.set_ain_internal_ssl_enabled( + int(file_configs["ain_internal_ssl_enabled"]) ) if "ain_thrift_ssl_ca_file" in config_keys: @@ -353,6 +363,11 @@ class AINodeDescriptor(object): file_configs["ain_thrift_ssl_cert_file"] ) + if "ain_thrift_ssl_key_file" in config_keys: + self._config.set_ain_thrift_ssl_key_file( + file_configs["ain_thrift_ssl_key_file"] + ) + if "ain_logs_dir" in config_keys: log_dir = file_configs["ain_logs_dir"] self._config.set_ain_logs_dir(log_dir) diff --git a/iotdb-core/ainode/ainode/core/ingress/iotdb.py b/iotdb-core/ainode/ainode/core/ingress/iotdb.py index 7bdc99f2baf..6602df75fe6 100644 --- a/iotdb-core/ainode/ainode/core/ingress/iotdb.py +++ b/iotdb-core/ainode/ainode/core/ingress/iotdb.py @@ -91,8 +91,11 @@ class IoTDBTreeModelDataset(BasicDatabaseForecastDataset): user=username, password=password, zone_id=time_zone, + use_ssl=AINodeDescriptor() + .get_config() + .get_ain_cluster_ingress_ssl_enabled(), + ca_certs=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file(), ) - # TODO(HAONAN) self.session.open(False) self.use_rate = use_rate self.offset_rate = offset_rate @@ -270,6 +273,10 @@ class IoTDBTableModelDataset(BasicDatabaseForecastDataset): username=username, password=password, time_zone=time_zone, + use_ssl=AINodeDescriptor() + .get_config() + .get_ain_cluster_ingress_ssl_enabled(), + ca_certs=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file(), ) self.session = TableSession(table_session_config) self.use_rate = use_rate diff --git a/iotdb-core/ainode/ainode/core/rpc/client.py b/iotdb-core/ainode/ainode/core/rpc/client.py index 954524cae19..bbc15a4c1c1 100644 --- a/iotdb-core/ainode/ainode/core/rpc/client.py +++ b/iotdb-core/ainode/ainode/core/rpc/client.py @@ -109,8 +109,8 @@ class ConfigNodeClient(object): raise TException(self._MSG_RECONNECTION_FAIL) def _connect(self, target_config_node: TEndPoint) -> None: - if AINodeDescriptor().get_config().get_ain_thrift_ssl_enabled(): - import ssl,sys + if AINodeDescriptor().get_config().get_ain_internal_ssl_enabled(): + import ssl, sys from thrift.transport import TSSLSocket if sys.version_info >= (3, 10): @@ -118,12 +118,18 @@ class ConfigNodeClient(object): else: context = ssl.SSLContext(ssl.PROTOCOL_TLS) context.verify_mode = ssl.CERT_REQUIRED - context.check_hostname = True - context.load_verify_locations(cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file()) + context.check_hostname = False + context.load_verify_locations( + cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file() + ) context.load_cert_chain( - certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file()) + certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file(), + keyfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_key_file(), + ) socket = TSSLSocket.TSSLSocket( - host=target_config_node.ip, port=target_config_node.port, ssl_context=context + host=target_config_node.ip, + port=target_config_node.port, + ssl_context=context, ) else: socket = TSocket.TSocket(target_config_node.ip, target_config_node.port) diff --git a/iotdb-core/ainode/ainode/core/rpc/service.py b/iotdb-core/ainode/ainode/core/rpc/service.py index e4558db5d5c..d2c75f39c45 100644 --- a/iotdb-core/ainode/ainode/core/rpc/service.py +++ b/iotdb-core/ainode/ainode/core/rpc/service.py @@ -71,8 +71,8 @@ class AINodeRPCService(threading.Thread): self._stop_event = threading.Event() self._handler = handler processor = IAINodeRPCService.Processor(handler=self._handler) - if AINodeDescriptor().get_config().get_ain_thrift_ssl_enabled(): - import ssl,sys + if AINodeDescriptor().get_config().get_ain_internal_ssl_enabled(): + import ssl, sys from thrift.transport import TSSLSocket if sys.version_info >= (3, 10): @@ -80,13 +80,18 @@ class AINodeRPCService(threading.Thread): else: context = ssl.SSLContext(ssl.PROTOCOL_TLS) context.verify_mode = ssl.CERT_REQUIRED - context.check_hostname = True - context.load_verify_locations(cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file()) - context.load_cert_chain(certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file()) + context.check_hostname = False + context.load_verify_locations( + cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file() + ) + context.load_cert_chain( + certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file(), + keyfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_key_file(), + ) transport = TSSLSocket.TSSLServerSocket( host=AINodeDescriptor().get_config().get_ain_inference_rpc_address(), port=AINodeDescriptor().get_config().get_ain_inference_rpc_port(), - ssl_context=context + ssl_context=context, ) else: transport = TSocket.TServerSocket( diff --git a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties index db1f9358480..1c3681a47f8 100644 --- a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties +++ b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties @@ -77,4 +77,34 @@ ain_inference_rpc_port=10810 # Whether to use compression in Thrift # Please use 0 or 1 # Datatype: Boolean -# ain_thrift_compression_enabled=0 \ No newline at end of file +# ain_thrift_compression_enabled=0 + +# Whether to use ssl for fetching IoTDB data +# Use the same value as enable_thrift_ssl in iotdb-system.properties +# Please use 0 or 1 +# Datatype: Boolean +# ain_cluster_ingress_ssl_enabled=0 + +# Whether enable SSL for IoTDB cluster internal connections +# Use the same value as enable_internal_ssl in iotdb-system.properties +# Please use 0 or 1 +# Datatype: Boolean +# ain_internal_ssl_enabled=0 + +# The AINode SSL ca file path. +# The starting directory of the relative path is related to the operating system. +# It is recommended to use an absolute path. +# Datatype: String +# ain_thrift_ssl_ca_file= + +# The AINode SSL cert file path. +# The starting directory of the relative path is related to the operating system. +# It is recommended to use an absolute path. +# Datatype: String +# ain_thrift_ssl_cert_file= + +# The AINode SSL key file path. +# The starting directory of the relative path is related to the operating system. +# It is recommended to use an absolute path. +# Datatype: String +# ain_thrift_ssl_key_file= diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 71c5c80e34f..457ded79b9c 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -451,7 +451,7 @@ enable_https=false # Whether enable SSL for cluster internal connections # effectiveMode: restart # Datatype: boolean -enable_internal_connection_ssl=false +enable_internal_ssl=false # SSL key store path # linux e.g. /home/iotdb/server.keystore (absolute path) or server.keystore (relative path) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 7c99e3e36ca..44be9bc34e4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -458,7 +458,7 @@ public class CommonDescriptor { config.setEnableInternalSSL( Boolean.parseBoolean( properties.getProperty( - "enable_internal_connection_ssl", Boolean.toString(config.isEnableInternalSSL())))); + "enable_internal_ssl", Boolean.toString(config.isEnableInternalSSL())))); config.setKeyStorePath(properties.getProperty("key_store_path", config.getKeyStorePath())); config.setKeyStorePwd(properties.getProperty("key_store_pwd", config.getKeyStorePwd())); config.setTrustStorePath( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java index b787cf500e1..7feb07cbc13 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java @@ -153,7 +153,12 @@ public abstract class AbstractThriftServiceThread extends Thread { } } - /** For sync ThriftServiceThread */ + /** + * Synced ThriftServiceThread with ssl enabled. When use it, some error logs like below may be + * printed. org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read + * timed out This issue is fixed in Thrift 0.15 and newer versions. See <a + * href="https://issues.apache.org/jira/browse/THRIFT-5411">https://issues.apache.org/jira/browse/THRIFT-5411</a> + */ @SuppressWarnings("squid:S107") protected AbstractThriftServiceThread( TProcessor processor, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java index aa53d8eee0f..caa660a7bdb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java @@ -119,6 +119,7 @@ public class ThriftServiceThread extends AbstractThriftServiceThread { transportFactory); } + /** for synced ThriftServiceThread with ssl enabled */ public ThriftServiceThread( TProcessor processor, String serviceName,