This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 379993ec38a Add SSL support to Python client (#14789)
379993ec38a is described below
commit 379993ec38a5038d787c1d836a4af88c42b311f3
Author: CloudWise-Lukemiao <[email protected]>
AuthorDate: Mon Feb 17 15:34:33 2025 +0800
Add SSL support to Python client (#14789)
Co-authored-by: Haonan <[email protected]>
---
iotdb-client/client-py/iotdb/Session.py | 44 ++++++++---
iotdb-client/client-py/iotdb/SessionPool.py | 8 ++
iotdb-client/client-py/iotdb/table_session.py | 6 ++
iotdb-client/client-py/iotdb/table_session_pool.py | 4 +
iotdb-client/client-py/session_ssl_example.py | 85 ++++++++++++++++++++++
5 files changed, 138 insertions(+), 9 deletions(-)
diff --git a/iotdb-client/client-py/iotdb/Session.py
b/iotdb-client/client-py/iotdb/Session.py
index 23d1f65ccc7..b5e60a7503b 100644
--- a/iotdb-client/client-py/iotdb/Session.py
+++ b/iotdb-client/client-py/iotdb/Session.py
@@ -18,11 +18,13 @@
import logging
import random
+import sys
+import ssl
import struct
import time
import warnings
from thrift.protocol import TBinaryProtocol, TCompactProtocol
-from thrift.transport import TSocket, TTransport
+from thrift.transport import TSocket, TTransport, TSSLSocket
from iotdb.utils.SessionDataSet import SessionDataSet
from .template.Template import Template
@@ -85,6 +87,8 @@ class Session(object):
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
+ use_ssl=False,
+ ca_certs=None,
):
self.__host = host
self.__port = port
@@ -107,6 +111,8 @@ class Session(object):
self.__endpoint_to_connection = None
self.sql_dialect = self.SQL_DIALECT
self.database = None
+ self.__use_ssl = use_ssl
+ self.__ca_certs = ca_certs
@classmethod
def init_from_node_urls(
@@ -117,6 +123,8 @@ class Session(object):
fetch_size=DEFAULT_FETCH_SIZE,
zone_id=DEFAULT_ZONE_ID,
enable_redirection=True,
+ use_ssl=False,
+ ca_certs=None,
):
if node_urls is None:
raise RuntimeError("node urls is empty")
@@ -128,6 +136,8 @@ class Session(object):
fetch_size,
zone_id,
enable_redirection,
+ use_ssl=use_ssl,
+ ca_certs=ca_certs,
)
session.__hosts = []
session.__ports = []
@@ -175,16 +185,32 @@ class Session(object):
}
def init_connection(self, endpoint):
- transport = TTransport.TFramedTransport(
- TSocket.TSocket(endpoint.ip, endpoint.port)
- )
+ try:
+ if self.__use_ssl:
+ if sys.version_info >= (3, 10):
+ context =
ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
+ else:
+ context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+ context.verify_mode = ssl.CERT_REQUIRED
+ context.check_hostname = True
+ context.load_verify_locations(cafile=self.__ca_certs)
+ socket = TSSLSocket.TSSLSocket(
+ host=endpoint.ip, port=endpoint.port, ssl_context=context
+ )
+ else:
+ socket = TSocket.TSocket(endpoint.ip, endpoint.port)
+ transport = TTransport.TFramedTransport(socket)
- if not transport.isOpen():
- try:
- transport.open()
- except TTransport.TTransportException as e:
- raise IoTDBConnectionException(e) from None
+ if not transport.isOpen():
+ try:
+ transport.open()
+ except TTransport.TTransportException as e:
+ raise IoTDBConnectionException(e) from None
+ except ssl.SSLError as e:
+ print(f"SSL error occurred: {e}")
+ except Exception as e:
+ print(f"An unexpected error occurred: {e}")
if self.__enable_rpc_compression:
client =
Client(TCompactProtocol.TCompactProtocolAccelerated(transport))
else:
diff --git a/iotdb-client/client-py/iotdb/SessionPool.py
b/iotdb-client/client-py/iotdb/SessionPool.py
index 9fd6e5d0542..de132a62a46 100644
--- a/iotdb-client/client-py/iotdb/SessionPool.py
+++ b/iotdb-client/client-py/iotdb/SessionPool.py
@@ -44,6 +44,8 @@ class PoolConfig(object):
max_retry: int = DEFAULT_MAX_RETRY,
enable_compression: bool = False,
enable_redirection: bool = True,
+ use_ssl: bool = False,
+ ca_certs: str = None,
):
self.host = host
self.port = port
@@ -61,6 +63,8 @@ class PoolConfig(object):
self.max_retry = max_retry
self.enable_compression = enable_compression
self.enable_redirection = enable_redirection
+ self.use_ssl = use_ssl
+ self.ca_certs = ca_certs
class SessionPool(object):
@@ -86,6 +90,8 @@ class SessionPool(object):
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
enable_redirection=self.__pool_config.enable_redirection,
+ use_ssl=self.__pool_config.use_ssl,
+ ca_certs=self.__pool_config.ca_certs,
)
session.sql_dialect = self.sql_dialect
session.database = self.database
@@ -99,6 +105,8 @@ class SessionPool(object):
self.__pool_config.fetch_size,
self.__pool_config.time_zone,
enable_redirection=self.__pool_config.enable_redirection,
+ use_ssl=self.__pool_config.use_ssl,
+ ca_certs=self.__pool_config.ca_certs,
)
session.sql_dialect = self.sql_dialect
session.database = self.database
diff --git a/iotdb-client/client-py/iotdb/table_session.py
b/iotdb-client/client-py/iotdb/table_session.py
index 5008f342e56..12885ddbaad 100644
--- a/iotdb-client/client-py/iotdb/table_session.py
+++ b/iotdb-client/client-py/iotdb/table_session.py
@@ -35,6 +35,8 @@ class TableSessionConfig(object):
time_zone: str = Session.DEFAULT_ZONE_ID,
enable_redirection: bool = True,
enable_compression: bool = False,
+ use_ssl: bool = False,
+ ca_certs: str = None,
):
"""
Initialize a TableSessionConfig object with the provided parameters.
@@ -66,6 +68,8 @@ class TableSessionConfig(object):
self.time_zone = time_zone
self.enable_redirection = enable_redirection
self.enable_compression = enable_compression
+ self.use_ssl = use_ssl
+ self.ca_certs = ca_certs
class TableSession(object):
@@ -82,6 +86,8 @@ class TableSession(object):
table_session_config.fetch_size,
table_session_config.time_zone,
table_session_config.enable_redirection,
+ table_session_config.use_ssl,
+ table_session_config.ca_certs,
)
self.__session.sql_dialect = "table"
self.__session.database = table_session_config.database
diff --git a/iotdb-client/client-py/iotdb/table_session_pool.py
b/iotdb-client/client-py/iotdb/table_session_pool.py
index 8e3275a86e5..13ccf62ca6d 100644
--- a/iotdb-client/client-py/iotdb/table_session_pool.py
+++ b/iotdb-client/client-py/iotdb/table_session_pool.py
@@ -34,6 +34,8 @@ class TableSessionPoolConfig(object):
enable_compression: bool = False,
wait_timeout_in_ms: int = 10000,
max_retry: int = 3,
+ use_ssl: bool = False,
+ ca_certs: str = None,
):
"""
Initialize a TableSessionPoolConfig object with the provided
parameters.
@@ -71,6 +73,8 @@ class TableSessionPoolConfig(object):
max_retry=max_retry,
enable_redirection=enable_redirection,
enable_compression=enable_compression,
+ use_ssl=use_ssl,
+ ca_certs=ca_certs,
)
self.max_pool_size = max_pool_size
self.wait_timeout_in_ms = wait_timeout_in_ms
diff --git a/iotdb-client/client-py/session_ssl_example.py
b/iotdb-client/client-py/session_ssl_example.py
new file mode 100644
index 00000000000..ea367e00f3a
--- /dev/null
+++ b/iotdb-client/client-py/session_ssl_example.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from iotdb.SessionPool import PoolConfig, SessionPool
+from iotdb.Session import Session
+from iotdb.table_session import TableSession, TableSessionConfig
+
+ip = "127.0.0.1"
+port_ = "6667"
+username_ = "root"
+password_ = "root"
+# Configure SSL enabled
+use_ssl = True
+# Configure certificate path
+ca_certs = "/path/server.crt"
+
+
+def get_data():
+ session = Session(
+ ip, port_, username_, password_, use_ssl=use_ssl, ca_certs=ca_certs
+ )
+ session.open(False)
+ result = session.execute_query_statement("select * from root.eg.etth")
+ df = result.todf()
+ df.rename(columns={"Time": "date"}, inplace=True)
+ session.close()
+ return df
+
+
+def get_data2():
+ pool_config = PoolConfig(
+ host=ip,
+ port=port_,
+ user_name=username_,
+ password=password_,
+ fetch_size=1024,
+ time_zone="UTC+8",
+ max_retry=3,
+ use_ssl=use_ssl,
+ ca_certs=ca_certs,
+ )
+ max_pool_size = 5
+ wait_timeout_in_ms = 3000
+ session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms)
+ session = session_pool.get_session()
+ result = session.execute_query_statement("select * from root.eg.etth")
+ df = result.todf()
+ df.rename(columns={"Time": "date"}, inplace=True)
+ session_pool.put_back(session)
+ session_pool.close()
+
+
+def get_table_data():
+ pool_config = TableSessionConfig(
+ node_urls=["127.0.0.1:6667"],
+ username=username_,
+ password=password_,
+ fetch_size=1024,
+ time_zone="UTC+8",
+ use_ssl=use_ssl,
+ ca_certs=ca_certs,
+ )
+ session = TableSession(pool_config)
+ result = session.execute_query_statement("select * from test")
+ df = result.todf()
+ session.close()
+
+
+if __name__ == "__main__":
+ df = get_data()