This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch ssl_between_nodes
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit eaff16eecd8f56b47b1d5c22ed7c03391d09e664
Author: HTHou <hao...@apache.org>
AuthorDate: Tue Jul 22 15:34:49 2025 +0800

    finishing dev
---
 .../iotdb/rpc/NettyTNonBlockingTransport.java      | 49 +++++++++++-----------
 iotdb-core/ainode/ainode/core/config.py            | 45 +++++++++++++-------
 iotdb-core/ainode/ainode/core/ingress/iotdb.py     |  9 +++-
 iotdb-core/ainode/ainode/core/rpc/client.py        | 18 +++++---
 iotdb-core/ainode/ainode/core/rpc/service.py       | 17 +++++---
 .../ainode/resources/conf/iotdb-ainode.properties  | 32 +++++++++++++-
 .../conf/iotdb-system.properties.template          |  2 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |  2 +-
 .../service/AbstractThriftServiceThread.java       |  7 +++-
 .../iotdb/commons/service/ThriftServiceThread.java |  1 +
 10 files changed, 126 insertions(+), 56 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
index d6911cea74d..a76a79fcee2 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/NettyTNonBlockingTransport.java
@@ -160,7 +160,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
                             logger.debug(
                                 "SSL handshake completed successfully for 
{}:{}", host, port);
                           } else {
-                            logger.error(
+                            logger.debug(
                                 "SSL handshake failed for {}:{}: {}",
                                 host,
                                 port,
@@ -254,7 +254,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
           discard.clear();
         }
       } catch (IOException e) {
-        logger.warn("Failed to drain dummy channel", e);
+        logger.debug("Failed to drain dummy channel", e);
       }
       if (readingResponseSize) {
         // Trigger OP_READ on dummy by writing dummy byte
@@ -270,7 +270,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
 
       return available;
     } catch (Exception e) {
-      logger.error("ByteBuffer read failed: {}", e.getMessage());
+      logger.debug("ByteBuffer read failed: {}", e.getMessage());
       throw new TTransportException(TTransportException.UNKNOWN, "Read 
failed", e);
     } finally {
       if (byteBuf != null) {
@@ -290,7 +290,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       ByteBuffer buffer = ByteBuffer.wrap(buf, off, len);
       return read(buffer);
     } catch (Exception e) {
-      logger.error("Read failed: {}", e.getMessage());
+      logger.debug("Read failed: {}", e.getMessage());
       throw new TTransportException(TTransportException.UNKNOWN, "Read 
failed", e);
     }
   }
@@ -320,13 +320,13 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
                   if (future1.isSuccess()) {
                     logger.debug("ByteBuffer write completed successfully: {} 
bytes", remaining);
                   } else {
-                    logger.error("ByteBuffer write failed: {}", 
future1.cause().getMessage());
+                    logger.debug("ByteBuffer write failed: {}", 
future1.cause().getMessage());
                   }
                 });
         return remaining;
       } catch (Exception e) {
         byteBuf.release();
-        logger.error("ByteBuffer write failed: {}", e.getMessage());
+        logger.debug("ByteBuffer write failed: {}", e.getMessage());
         throw new TTransportException(TTransportException.UNKNOWN, "Write 
failed", e);
       }
     }
@@ -375,7 +375,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
           dummyServer.close();
         }
       } catch (IOException e) {
-        logger.warn("Failed to close dummy channels", e);
+        logger.debug("Failed to close dummy channels", e);
       }
       eventLoopGroup.shutdownGracefully();
       logger.debug("EventLoopGroup shutdown initiated");
@@ -422,10 +422,10 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
                         }
                       }
                     } catch (IOException e) {
-                      logger.warn("Failed to accept dummy connection", e);
+                      logger.debug("Failed to accept dummy connection", e);
                     }
                   } else {
-                    logger.error(
+                    logger.debug(
                         "Connection failed to {}:{}: {}", host, port, 
future1.cause().getMessage());
                   }
                   connecting.set(false);
@@ -435,8 +435,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       return false; // Return false to indicate pending connect for dummy
     } catch (Exception e) {
       connecting.set(false);
-      logger.error("Failed to start connection to {}:{}", host, port, 
e.getMessage());
-      throw new IOException("Failed to start connection", e);
+      return false;
     }
   }
 
@@ -493,20 +492,22 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
       if (msg instanceof ByteBuf) {
         ByteBuf byteBuf = (ByteBuf) msg;
         logger.debug("Received {} bytes", byteBuf.readableBytes());
-
-        synchronized (lock) {
-          readQueue.offer(byteBuf.retain());
-          // Trigger OP_READ on dummy by writing dummy byte
-          if (dummyServerAccepted != null) {
-            ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]);
-            dummyServerAccepted.write(dummyByte);
-          }
-          // Wakeup selector if needed
-          if (selector != null) {
-            selector.wakeup();
+        try {
+          synchronized (lock) {
+            readQueue.offer(byteBuf.retain());
+            // Trigger OP_READ on dummy by writing dummy byte
+            if (dummyServerAccepted != null) {
+              ByteBuffer dummyByte = ByteBuffer.wrap(new byte[1]);
+              dummyServerAccepted.write(dummyByte);
+            }
+            // Wakeup selector if needed
+            if (selector != null) {
+              selector.wakeup();
+            }
           }
+        } finally {
+          byteBuf.release();
         }
-        byteBuf.release();
       }
     }
 
@@ -522,7 +523,7 @@ public class NettyTNonBlockingTransport extends 
TNonblockingTransport {
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
-      logger.error("Channel exception: {}", cause.getMessage());
+      logger.debug("Channel exception: {}", cause.getMessage());
       synchronized (lock) {
         ctx.close();
       }
diff --git a/iotdb-core/ainode/ainode/core/config.py 
b/iotdb-core/ainode/ainode/core/config.py
index 832e45fed8b..c04edc83bed 100644
--- a/iotdb-core/ainode/ainode/core/config.py
+++ b/iotdb-core/ainode/ainode/core/config.py
@@ -75,9 +75,11 @@ class AINodeConfig(object):
         self._ain_thrift_compression_enabled = 
AINODE_THRIFT_COMPRESSION_ENABLED
 
         # use for ssl
-        self._ain_thrift_ssl_enabled = False
+        self._ain_cluster_ingress_ssl_enabled = False
+        self._ain_internal_ssl_enabled = False
         self._ain_thrift_ssl_ca_file = None
         self._ain_thrift_ssl_cert_file = None
+        self._ain_thrift_ssl_key_file = None
 
         # Cache number of model storage to avoid repeated loading
         self._ain_model_storage_cache_size = 30
@@ -181,30 +183,38 @@ class AINodeConfig(object):
     ) -> None:
         self._ain_thrift_compression_enabled = ain_thrift_compression_enabled
 
-    def get_ain_thrift_ssl_enabled(self) -> bool:
-        return self._ain_thrift_ssl_enabled
+    def get_ain_cluster_ingress_ssl_enabled(self) -> bool:
+        return self._ain_cluster_ingress_ssl_enabled
 
-    def set_ain_thrift_ssl_enabled(
-        self, ain_thrift_ssl_enabled: int
+    def set_ain_cluster_ingress_ssl_enabled(
+        self, ain_cluster_ingress_ssl_enabled: int
     ) -> None:
-        self._ain_thrift_ssl_enabled = ain_thrift_ssl_enabled
+        self._ain_cluster_ingress_ssl_enabled = ain_cluster_ingress_ssl_enabled
+
+    def get_ain_internal_ssl_enabled(self) -> bool:
+        return self._ain_internal_ssl_enabled
+
+    def set_ain_internal_ssl_enabled(self, ain_internal_ssl_enabled: int) -> 
None:
+        self._ain_internal_ssl_enabled = ain_internal_ssl_enabled
 
     def get_ain_thrift_ssl_ca_file(self) -> str:
         return self._ain_thrift_ssl_ca_file
 
-    def set_ain_thrift_ssl_ca_file(
-        self, ain_thrift_ssl_ca_file: str
-    ) -> None:
+    def set_ain_thrift_ssl_ca_file(self, ain_thrift_ssl_ca_file: str) -> None:
         self._ain_thrift_ssl_ca_file = ain_thrift_ssl_ca_file
 
     def get_ain_thrift_ssl_cert_file(self) -> str:
         return self._ain_thrift_ssl_cert_file
 
-    def set_ain_thrift_ssl_cert_file(
-        self, ain_thrift_ssl_cert_file: str
-    ) -> None:
+    def set_ain_thrift_ssl_cert_file(self, ain_thrift_ssl_cert_file: str) -> 
None:
         self._ain_thrift_ssl_cert_file = ain_thrift_ssl_cert_file
 
+    def get_ain_thrift_ssl_key_file(self) -> str:
+        return self._ain_thrift_ssl_key_file
+
+    def set_ain_thrift_ssl_key_file(self, ain_thrift_ssl_key_file: str) -> 
None:
+        self._ain_thrift_ssl_key_file = ain_thrift_ssl_key_file
+
     def get_ain_model_storage_cache_size(self) -> int:
         return self._ain_model_storage_cache_size
 
@@ -338,9 +348,9 @@ class AINodeDescriptor(object):
                     int(file_configs["ain_thrift_compression_enabled"])
                 )
 
-            if "ain_thrift_ssl_enabled" in config_keys:
-                self._config.set_ain_thrift_ssl_enabled(
-                    int(file_configs["ain_thrift_ssl_enabled"])
+            if "ain_internal_ssl_enabled" in config_keys:
+                self._config.set_ain_internal_ssl_enabled(
+                    int(file_configs["ain_internal_ssl_enabled"])
                 )
 
             if "ain_thrift_ssl_ca_file" in config_keys:
@@ -353,6 +363,11 @@ class AINodeDescriptor(object):
                     file_configs["ain_thrift_ssl_cert_file"]
                 )
 
+            if "ain_thrift_ssl_key_file" in config_keys:
+                self._config.set_ain_thrift_ssl_key_file(
+                    file_configs["ain_thrift_ssl_key_file"]
+                )
+
             if "ain_logs_dir" in config_keys:
                 log_dir = file_configs["ain_logs_dir"]
                 self._config.set_ain_logs_dir(log_dir)
diff --git a/iotdb-core/ainode/ainode/core/ingress/iotdb.py 
b/iotdb-core/ainode/ainode/core/ingress/iotdb.py
index 7bdc99f2baf..6602df75fe6 100644
--- a/iotdb-core/ainode/ainode/core/ingress/iotdb.py
+++ b/iotdb-core/ainode/ainode/core/ingress/iotdb.py
@@ -91,8 +91,11 @@ class IoTDBTreeModelDataset(BasicDatabaseForecastDataset):
             user=username,
             password=password,
             zone_id=time_zone,
+            use_ssl=AINodeDescriptor()
+            .get_config()
+            .get_ain_cluster_ingress_ssl_enabled(),
+            
ca_certs=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file(),
         )
-        # TODO(HAONAN)
         self.session.open(False)
         self.use_rate = use_rate
         self.offset_rate = offset_rate
@@ -270,6 +273,10 @@ class IoTDBTableModelDataset(BasicDatabaseForecastDataset):
             username=username,
             password=password,
             time_zone=time_zone,
+            use_ssl=AINodeDescriptor()
+            .get_config()
+            .get_ain_cluster_ingress_ssl_enabled(),
+            
ca_certs=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file(),
         )
         self.session = TableSession(table_session_config)
         self.use_rate = use_rate
diff --git a/iotdb-core/ainode/ainode/core/rpc/client.py 
b/iotdb-core/ainode/ainode/core/rpc/client.py
index 954524cae19..bbc15a4c1c1 100644
--- a/iotdb-core/ainode/ainode/core/rpc/client.py
+++ b/iotdb-core/ainode/ainode/core/rpc/client.py
@@ -109,8 +109,8 @@ class ConfigNodeClient(object):
         raise TException(self._MSG_RECONNECTION_FAIL)
 
     def _connect(self, target_config_node: TEndPoint) -> None:
-        if AINodeDescriptor().get_config().get_ain_thrift_ssl_enabled():
-            import ssl,sys
+        if AINodeDescriptor().get_config().get_ain_internal_ssl_enabled():
+            import ssl, sys
             from thrift.transport import TSSLSocket
 
             if sys.version_info >= (3, 10):
@@ -118,12 +118,18 @@ class ConfigNodeClient(object):
             else:
                 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
                 context.verify_mode = ssl.CERT_REQUIRED
-                context.check_hostname = True
-            
context.load_verify_locations(cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file())
+            context.check_hostname = False
+            context.load_verify_locations(
+                
cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file()
+            )
             context.load_cert_chain(
-                
certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file())
+                
certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file(),
+                
keyfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_key_file(),
+            )
             socket = TSSLSocket.TSSLSocket(
-                host=target_config_node.ip, port=target_config_node.port, 
ssl_context=context
+                host=target_config_node.ip,
+                port=target_config_node.port,
+                ssl_context=context,
             )
         else:
             socket = TSocket.TSocket(target_config_node.ip, 
target_config_node.port)
diff --git a/iotdb-core/ainode/ainode/core/rpc/service.py 
b/iotdb-core/ainode/ainode/core/rpc/service.py
index e4558db5d5c..d2c75f39c45 100644
--- a/iotdb-core/ainode/ainode/core/rpc/service.py
+++ b/iotdb-core/ainode/ainode/core/rpc/service.py
@@ -71,8 +71,8 @@ class AINodeRPCService(threading.Thread):
         self._stop_event = threading.Event()
         self._handler = handler
         processor = IAINodeRPCService.Processor(handler=self._handler)
-        if AINodeDescriptor().get_config().get_ain_thrift_ssl_enabled():
-            import ssl,sys
+        if AINodeDescriptor().get_config().get_ain_internal_ssl_enabled():
+            import ssl, sys
             from thrift.transport import TSSLSocket
 
             if sys.version_info >= (3, 10):
@@ -80,13 +80,18 @@ class AINodeRPCService(threading.Thread):
             else:
                 context = ssl.SSLContext(ssl.PROTOCOL_TLS)
                 context.verify_mode = ssl.CERT_REQUIRED
-                context.check_hostname = True
-            
context.load_verify_locations(cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file())
-            
context.load_cert_chain(certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file())
+            context.check_hostname = False
+            context.load_verify_locations(
+                
cafile=AINodeDescriptor().get_config().get_ain_thrift_ssl_ca_file()
+            )
+            context.load_cert_chain(
+                
certfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_cert_file(),
+                
keyfile=AINodeDescriptor().get_config().get_ain_thrift_ssl_key_file(),
+            )
             transport = TSSLSocket.TSSLServerSocket(
                 
host=AINodeDescriptor().get_config().get_ain_inference_rpc_address(),
                 
port=AINodeDescriptor().get_config().get_ain_inference_rpc_port(),
-                ssl_context=context
+                ssl_context=context,
             )
         else:
             transport = TSocket.TServerSocket(
diff --git a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties 
b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
index db1f9358480..1c3681a47f8 100644
--- a/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
+++ b/iotdb-core/ainode/resources/conf/iotdb-ainode.properties
@@ -77,4 +77,34 @@ ain_inference_rpc_port=10810
 # Whether to use compression in Thrift
 # Please use 0 or 1
 # Datatype: Boolean
-# ain_thrift_compression_enabled=0
\ No newline at end of file
+# ain_thrift_compression_enabled=0
+
+# Whether to use ssl for fetching IoTDB data
+# Use the same value as enable_thrift_ssl in iotdb-system.properties
+# Please use 0 or 1
+# Datatype: Boolean
+# ain_cluster_ingress_ssl_enabled=0
+
+# Whether enable SSL for IoTDB cluster internal connections
+# Use the same value as enable_internal_ssl in iotdb-system.properties
+# Please use 0 or 1
+# Datatype: Boolean
+# ain_internal_ssl_enabled=0
+
+# The AINode SSL ca file path.
+# The starting directory of the relative path is related to the operating 
system.
+# It is recommended to use an absolute path.
+# Datatype: String
+# ain_thrift_ssl_ca_file=
+
+# The AINode SSL cert file path.
+# The starting directory of the relative path is related to the operating 
system.
+# It is recommended to use an absolute path.
+# Datatype: String
+# ain_thrift_ssl_cert_file=
+
+# The AINode SSL key file path.
+# The starting directory of the relative path is related to the operating 
system.
+# It is recommended to use an absolute path.
+# Datatype: String
+# ain_thrift_ssl_key_file=
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 71c5c80e34f..457ded79b9c 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -451,7 +451,7 @@ enable_https=false
 # Whether enable SSL for cluster internal connections
 # effectiveMode: restart
 # Datatype: boolean
-enable_internal_connection_ssl=false
+enable_internal_ssl=false
 
 # SSL key store path
 # linux e.g. /home/iotdb/server.keystore (absolute path) or server.keystore 
(relative path)
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 7c99e3e36ca..44be9bc34e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -458,7 +458,7 @@ public class CommonDescriptor {
     config.setEnableInternalSSL(
         Boolean.parseBoolean(
             properties.getProperty(
-                "enable_internal_connection_ssl", 
Boolean.toString(config.isEnableInternalSSL()))));
+                "enable_internal_ssl", 
Boolean.toString(config.isEnableInternalSSL()))));
     config.setKeyStorePath(properties.getProperty("key_store_path", 
config.getKeyStorePath()));
     config.setKeyStorePwd(properties.getProperty("key_store_pwd", 
config.getKeyStorePwd()));
     config.setTrustStorePath(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
index b787cf500e1..7feb07cbc13 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/AbstractThriftServiceThread.java
@@ -153,7 +153,12 @@ public abstract class AbstractThriftServiceThread extends 
Thread {
     }
   }
 
-  /** For sync ThriftServiceThread */
+  /**
+   * Synced ThriftServiceThread with ssl enabled. When use it, some error logs 
like below may be
+   * printed. org.apache.thrift.transport.TTransportException: 
java.net.SocketTimeoutException: Read
+   * timed out This issue is fixed in Thrift 0.15 and newer versions. See <a
+   * 
href="https://issues.apache.org/jira/browse/THRIFT-5411";>https://issues.apache.org/jira/browse/THRIFT-5411</a>
+   */
   @SuppressWarnings("squid:S107")
   protected AbstractThriftServiceThread(
       TProcessor processor,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index aa53d8eee0f..caa660a7bdb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -119,6 +119,7 @@ public class ThriftServiceThread extends 
AbstractThriftServiceThread {
         transportFactory);
   }
 
+  /** for synced ThriftServiceThread with ssl enabled */
   public ThriftServiceThread(
       TProcessor processor,
       String serviceName,

Reply via email to