This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 66b1997b172 Feature/node supplier (#15230)
66b1997b172 is described below

commit 66b1997b172f55f1fe0461ed43f942faf02297bb
Author: Hongzhi Gao <[email protected]>
AuthorDate: Mon Apr 21 11:08:38 2025 +0800

    Feature/node supplier (#15230)
    
    * Implement node list retrieval
    
    * fix NodeSupplier
    
    * fix NodeSupplier
    
    * fix NodeSupplier
    
    * fix NodeSupplier
    
    * fix NodeSupplier
    
    * fix NodeSupplier
    
    * fix NodeSupplier
    
    * Extract INodesSuppiler as an abstract class
    
    * implement executeQueryStatementMayRedirect
    
    * Add redirection, reconnection and backup node support for IoTDB CPP client
    
    * Implement client-side caching for the insertRecord method
    
    * Removed some redundant code
---
 iotdb-client/client-cpp/pom.xml                    |  24 ++
 .../client-cpp/src/main/AbstractSessionBuilder.h   |   3 +
 iotdb-client/client-cpp/src/main/NodesSupplier.cpp | 221 ++++++++++++++++++
 iotdb-client/client-cpp/src/main/NodesSupplier.h   | 137 +++++++++++
 iotdb-client/client-cpp/src/main/Session.cpp       | 169 +++++++++++++-
 iotdb-client/client-cpp/src/main/Session.h         |  71 +++++-
 .../client-cpp/src/main/SessionConnection.cpp      | 256 +++++++++++++++++++++
 .../client-cpp/src/main/SessionConnection.h        |  81 +++++++
 .../client-cpp/src/main/ThriftConnection.cpp       | 158 +++++++++++++
 .../client-cpp/src/main/ThriftConnection.h         |  68 ++++++
 10 files changed, 1186 insertions(+), 2 deletions(-)

diff --git a/iotdb-client/client-cpp/pom.xml b/iotdb-client/client-cpp/pom.xml
index 98f7c6f9d11..abde39dbae0 100644
--- a/iotdb-client/client-cpp/pom.xml
+++ b/iotdb-client/client-cpp/pom.xml
@@ -175,6 +175,30 @@
                                     
<sourceFile>${project.basedir}/src/main/TableSessionBuilder.h</sourceFile>
                                     
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/TableSessionBuilder.h</destinationFile>
                                 </fileSet>
+                                <fileSet>
+                                    
<sourceFile>${project.basedir}/src/main/NodesSupplier.h</sourceFile>
+                                    
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/NodesSupplier.h</destinationFile>
+                                </fileSet>
+                                <fileSet>
+                                    
<sourceFile>${project.basedir}/src/main/NodesSupplier.cpp</sourceFile>
+                                    
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/NodesSupplier.cpp</destinationFile>
+                                </fileSet>
+                                <fileSet>
+                                    
<sourceFile>${project.basedir}/src/main/ThriftConnection.h</sourceFile>
+                                    
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/ThriftConnection.h</destinationFile>
+                                </fileSet>
+                                <fileSet>
+                                    
<sourceFile>${project.basedir}/src/main/ThriftConnection.cpp</sourceFile>
+                                    
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/ThriftConnection.cpp</destinationFile>
+                                </fileSet>
+                                <fileSet>
+                                    
<sourceFile>${project.basedir}/src/main/SessionConnection.h</sourceFile>
+                                    
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/SessionConnection.h</destinationFile>
+                                </fileSet>
+                                <fileSet>
+                                    
<sourceFile>${project.basedir}/src/main/SessionConnection.cpp</sourceFile>
+                                    
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/SessionConnection.cpp</destinationFile>
+                                </fileSet>
                                 <fileSet>
                                     
<sourceFile>${project.basedir}/src/main/AbstractSessionBuilder.h</sourceFile>
                                     
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/AbstractSessionBuilder.h</destinationFile>
diff --git a/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h 
b/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
index 1b4d9a729a1..441e3a41f4f 100644
--- a/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
+++ b/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
@@ -32,6 +32,9 @@ public:
     int fetchSize = 10000;
     std::string sqlDialect = "tree";
     std::string database = "";
+    bool enableAutoFetch = true;
+    bool enableRedirections = true;
+    bool enableRPCCompression = false;
 };
 
 #endif // IOTDB_ABSTRACTSESSIONBUILDER_H
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/NodesSupplier.cpp 
b/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
new file mode 100644
index 00000000000..932ba0c17fa
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
@@ -0,0 +1,221 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "NodesSupplier.h"
+#include "Session.h"
+#include <algorithm>
+#include <iostream>
+#include <utility>
+
+const std::string NodesSupplier::SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
+const std::string NodesSupplier::STATUS_COLUMN_NAME = "Status";
+const std::string NodesSupplier::IP_COLUMN_NAME = "RpcAddress";
+const std::string NodesSupplier::PORT_COLUMN_NAME = "RpcPort";
+const std::string NodesSupplier::REMOVING_STATUS = "Removing";
+
+const int64_t NodesSupplier::TIMEOUT_IN_MS = 60000;
+const int NodesSupplier::FETCH_SIZE = 10000;
+const int NodesSupplier::THRIFT_DEFAULT_BUFFER_SIZE = 4096;
+const int NodesSupplier::THRIFT_MAX_FRAME_SIZE = 1048576;
+const int NodesSupplier::CONNECTION_TIMEOUT_IN_MS = 1000;
+
+TEndPoint RoundRobinPolicy::select(const std::vector<TEndPoint>& nodes) {
+    static std::atomic_uint index{0};
+
+    if (nodes.empty()) {
+        throw IoTDBException("No available nodes");
+    }
+
+    return nodes[index++ % nodes.size()];
+}
+
+StaticNodesSupplier::StaticNodesSupplier(const std::vector<TEndPoint>& nodes,
+                                       NodeSelectionPolicy policy)
+    : availableNodes_(nodes), policy_(std::move(policy)) {}
+
+boost::optional<TEndPoint> StaticNodesSupplier::getQueryEndPoint() {
+    try {
+        if (availableNodes_.empty()) {
+            return boost::none;
+        }
+        return policy_(availableNodes_);
+    } catch (const IoTDBException& e) {
+        return boost::none;
+    }
+}
+
+std::vector<TEndPoint> StaticNodesSupplier::getEndPointList() {
+    return availableNodes_;
+}
+
+StaticNodesSupplier::~StaticNodesSupplier() = default;
+
+std::shared_ptr<NodesSupplier> NodesSupplier::create(
+    std::vector<TEndPoint> endpoints,
+    std::string userName, std::string password, std::string zoneId,
+    int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
+    int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
+    std::string version, std::chrono::milliseconds refreshInterval,
+    NodeSelectionPolicy policy) {
+    if (endpoints.empty()) {
+        return nullptr;
+    }
+    auto supplier = std::make_shared<NodesSupplier>(
+        userName, password, zoneId, thriftDefaultBufferSize,
+        thriftMaxFrameSize, connectionTimeoutInMs, useSSL,
+        enableRPCCompression, version, std::move(endpoints), std::move(policy)
+    );
+    supplier->startBackgroundRefresh(refreshInterval);
+    return supplier;
+}
+
+NodesSupplier::NodesSupplier(
+    std::string userName, std::string password, const std::string& zoneId,
+    int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
+    int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
+    std::string version, std::vector<TEndPoint> endpoints, NodeSelectionPolicy 
policy) : userName(std::move(userName)), password(std::move(password)), 
zoneId(zoneId),
+    thriftDefaultBufferSize(thriftDefaultBufferSize), 
thriftMaxFrameSize(thriftMaxFrameSize),
+    connectionTimeoutInMs(connectionTimeoutInMs), useSSL(useSSL), 
enableRPCCompression(enableRPCCompression), version(version), 
endpoints(std::move(endpoints)),
+    selectionPolicy(std::move(policy)) {
+    deduplicateEndpoints();
+}
+
+std::vector<TEndPoint> NodesSupplier::getEndPointList() {
+    std::lock_guard<std::mutex> lock(mutex);
+    return endpoints;
+}
+
+TEndPoint NodesSupplier::selectQueryEndpoint() {
+    std::lock_guard<std::mutex> lock(mutex);
+    try {
+        return selectionPolicy(endpoints);
+    } catch (const std::exception& e) {
+        log_error("NodesSupplier::selectQueryEndpoint exception: %s", 
e.what());
+        throw IoTDBException("NodesSupplier::selectQueryEndpoint exception, " 
+ std::string(e.what()));
+    }
+}
+
+boost::optional<TEndPoint> NodesSupplier::getQueryEndPoint() {
+    try {
+        return selectQueryEndpoint();
+    } catch (const IoTDBException& e) {
+        return boost::none;
+    }
+}
+
+NodesSupplier::~NodesSupplier() {
+    stopBackgroundRefresh();
+    client->close();
+}
+
+void NodesSupplier::deduplicateEndpoints() {
+    std::vector<TEndPoint> uniqueEndpoints;
+    uniqueEndpoints.reserve(endpoints.size());
+    for (const auto& endpoint : endpoints) {
+        if (std::find(uniqueEndpoints.begin(), uniqueEndpoints.end(), 
endpoint) == uniqueEndpoints.end()) {
+            uniqueEndpoints.push_back(endpoint);
+        }
+    }
+    endpoints = std::move(uniqueEndpoints);
+}
+
+void NodesSupplier::startBackgroundRefresh(std::chrono::milliseconds interval) 
{
+    isRunning = true;
+    refreshThread = std::thread([this, interval] {
+        while (isRunning) {
+            refreshEndpointList();
+            std::unique_lock<std::mutex> cvLock(this->mutex);
+            refreshCondition.wait_for(cvLock, interval, [this]() {
+                return !isRunning.load();
+            });
+        }
+    });
+}
+
+std::vector<TEndPoint> NodesSupplier::fetchLatestEndpoints() {
+    try {
+        if (client == nullptr) {
+            client = 
std::make_shared<ThriftConnection>(selectionPolicy(endpoints));
+            client->init(userName, password, enableRPCCompression, zoneId, 
version);
+        }
+
+        auto sessionDataSet = 
client->executeQueryStatement(SHOW_DATA_NODES_COMMAND);
+
+        uint32_t columnAddrIdx = -1, columnPortIdx = -1, columnStatusIdx = -1;
+        auto columnNames = sessionDataSet->getColumnNames();
+        for (uint32_t i = 0; i < columnNames.size(); i++) {
+            if (columnNames[i] == IP_COLUMN_NAME) {
+                columnAddrIdx = i;
+            } else if (columnNames[i] == PORT_COLUMN_NAME) {
+                columnPortIdx = i;
+            } else if (columnNames[i] == STATUS_COLUMN_NAME) {
+                columnStatusIdx = i;
+            }
+        }
+
+        if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx == 
-1) {
+            throw IoTDBException("Required columns not found in query 
result.");
+        }
+
+        std::vector<TEndPoint> ret;
+        while (sessionDataSet->hasNext()) {
+            RowRecord* record = sessionDataSet->next();
+            std::string ip = record->fields.at(columnAddrIdx).stringV;
+            int32_t port = record->fields.at(columnPortIdx).intV;
+            std::string status = record->fields.at(columnStatusIdx).stringV;
+
+            if (ip == "0.0.0.0" || status == REMOVING_STATUS) {
+                log_warn("Skipping invalid node: " + ip + ":" + 
to_string(port));
+                continue;
+            }
+            TEndPoint endpoint;
+            endpoint.ip = ip;
+            endpoint.port = port;
+            ret.emplace_back(endpoint);
+        }
+
+        return ret;
+    } catch (const IoTDBException& e) {
+        client.reset();
+        throw IoTDBException(std::string("NodesSupplier::fetchLatestEndpoints 
failed: ") + e.what());
+    }
+}
+
+void NodesSupplier::refreshEndpointList() {
+    try {
+        auto newEndpoints = fetchLatestEndpoints();
+        if (newEndpoints.empty()) {
+            return;
+        }
+
+        std::lock_guard<std::mutex> lock(mutex);
+        endpoints.swap(newEndpoints);
+        deduplicateEndpoints();
+    } catch (const IoTDBException& e) {
+        log_error(std::string("NodesSupplier::refreshEndpointList failed: ") + 
e.what());
+    }
+}
+
+void NodesSupplier::stopBackgroundRefresh() noexcept {
+    if (isRunning.exchange(false)) {
+        refreshCondition.notify_all();
+        if (refreshThread.joinable()) {
+            refreshThread.join();
+        }
+    }
+}
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/NodesSupplier.h 
b/iotdb-client/client-cpp/src/main/NodesSupplier.h
new file mode 100644
index 00000000000..27965afa968
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/NodesSupplier.h
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef IOTDB_NODES_SUPPLIER_H
+#define IOTDB_NODES_SUPPLIER_H
+
+#include <vector>
+#include <atomic>
+#include <boost/optional.hpp>
+#include <mutex>
+#include <chrono>
+#include <thread>
+#include <functional>
+#include <condition_variable>
+#include <algorithm>
+
+#include "ThriftConnection.h"
+
+class TEndPoint;
+
+class RoundRobinPolicy {
+public:
+    static TEndPoint select(const std::vector<TEndPoint>& nodes);
+};
+
+class INodesSupplier {
+public:
+    virtual ~INodesSupplier() = default;
+    virtual boost::optional<TEndPoint> getQueryEndPoint() = 0;
+    virtual std::vector<TEndPoint> getEndPointList() = 0;
+    using NodeSelectionPolicy = std::function<TEndPoint(const 
std::vector<TEndPoint>&)>;
+};
+
+class StaticNodesSupplier : public INodesSupplier {
+public:
+    explicit StaticNodesSupplier(const std::vector<TEndPoint>& nodes, 
+                                NodeSelectionPolicy policy = 
RoundRobinPolicy::select);
+
+    boost::optional<TEndPoint> getQueryEndPoint() override;
+
+    std::vector<TEndPoint> getEndPointList() override;
+
+    ~StaticNodesSupplier() override;
+
+private:
+    const std::vector<TEndPoint> availableNodes_;
+    NodeSelectionPolicy policy_;
+};
+
+class NodesSupplier : public INodesSupplier {
+public:
+    static const std::string SHOW_DATA_NODES_COMMAND;
+    static const std::string STATUS_COLUMN_NAME;
+    static const std::string IP_COLUMN_NAME;
+    static const std::string PORT_COLUMN_NAME;
+    static const std::string REMOVING_STATUS;
+
+    static const int64_t TIMEOUT_IN_MS;
+    static const int FETCH_SIZE;
+    static const int THRIFT_DEFAULT_BUFFER_SIZE;
+    static const int THRIFT_MAX_FRAME_SIZE;
+    static const int CONNECTION_TIMEOUT_IN_MS;
+
+    static std::shared_ptr<NodesSupplier> create(
+        std::vector<TEndPoint> endpoints,
+        std::string userName, std::string password, std::string zoneId = "",
+        int32_t thriftDefaultBufferSize = 
ThriftConnection::THRIFT_DEFAULT_BUFFER_SIZE,
+        int32_t thriftMaxFrameSize = ThriftConnection::THRIFT_MAX_FRAME_SIZE,
+        int32_t connectionTimeoutInMs = 
ThriftConnection::CONNECTION_TIMEOUT_IN_MS,
+        bool useSSL = false, bool enableRPCCompression = false,
+        std::string version = "V_1_0",
+        std::chrono::milliseconds refreshInterval = 
std::chrono::milliseconds(TIMEOUT_IN_MS),
+        NodeSelectionPolicy policy = RoundRobinPolicy::select
+    );
+
+    NodesSupplier(
+        std::string userName, std::string password, const std::string& zoneId,
+        int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
+        int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
+        std::string version, std::vector<TEndPoint> endpoints, 
NodeSelectionPolicy policy
+    );
+    std::vector<TEndPoint> getEndPointList() override;
+
+    boost::optional<TEndPoint> getQueryEndPoint() override;
+
+    ~NodesSupplier() override;
+
+private:
+    std::string userName;
+    std::string password;
+    int32_t thriftDefaultBufferSize;
+    int32_t thriftMaxFrameSize;
+    int32_t connectionTimeoutInMs;
+    bool useSSL;
+    bool enableRPCCompression;
+    std::string version;
+    std::string zoneId;
+
+    std::mutex mutex;
+    std::vector<TEndPoint> endpoints;
+    NodeSelectionPolicy selectionPolicy;
+
+    std::atomic<bool> isRunning{false};
+    std::thread refreshThread;
+    std::condition_variable refreshCondition;
+
+    std::shared_ptr<ThriftConnection> client;
+
+    void deduplicateEndpoints();
+
+    void startBackgroundRefresh(std::chrono::milliseconds interval);
+
+    std::vector<TEndPoint> fetchLatestEndpoints();
+
+    void refreshEndpointList();
+
+    TEndPoint selectQueryEndpoint();
+
+    void stopBackgroundRefresh() noexcept;
+};
+
+#endif
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/Session.cpp 
b/iotdb-client/client-cpp/src/main/Session.cpp
index 316bd1d9443..bc75effc86d 100644
--- a/iotdb-client/client-cpp/src/main/Session.cpp
+++ b/iotdb-client/client-cpp/src/main/Session.cpp
@@ -21,6 +21,7 @@
 #include <algorithm>
 #include <memory>
 #include <time.h>
+#include "NodesSupplier.h"
 
 using namespace std;
 
@@ -56,6 +57,29 @@ void RpcUtils::verifySuccess(const TSStatus &status) {
     }
 }
 
+void RpcUtils::verifySuccessWithRedirection(const TSStatus &status) {
+    verifySuccess(status);
+    if (status.__isset.redirectNode) {
+        throw RedirectException(to_string(status.code) + ": " + 
status.message, status.redirectNode);
+    }
+    if (status.__isset.subStatus) {
+        auto statusSubStatus = status.subStatus;
+        vector<TEndPoint> endPointList(statusSubStatus.size());
+        int count = 0;
+        for (TSStatus subStatus : statusSubStatus) {
+            if (subStatus.__isset.redirectNode) {
+                endPointList[count++] = subStatus.redirectNode;
+            } else {
+                TEndPoint endPoint;
+                endPointList[count++] = endPoint;
+            }
+        }
+        if (!endPointList.empty()) {
+            throw RedirectException(to_string(status.code) + ": " + 
status.message, endPointList);
+        }
+    }
+}
+
 void RpcUtils::verifySuccess(const vector<TSStatus> &statuses) {
     for (const TSStatus &status: statuses) {
         if (status.code != TSStatusCode::SUCCESS_STATUS) {
@@ -587,6 +611,12 @@ Session::~Session() {
     }
 }
 
+void Session::removeBrokenSessionConnection(shared_ptr<SessionConnection> 
sessionConnection) {
+    if (enableRedirection) {
+        
this->endPointToSessionConnection.erase(sessionConnection->getEndPoint());
+    }
+}
+
 /**
    * check whether the batch has been sorted
    *
@@ -795,6 +825,26 @@ void Session::initZoneId() {
     zoneId = zoneStr;
 }
 
+void Session::initNodesSupplier() {
+    std::vector<TEndPoint> endPoints;
+    TEndPoint endPoint;
+    endPoint.__set_ip(host);
+    endPoint.__set_port(rpcPort);
+    endPoints.emplace_back(endPoint);
+    if (enableAutoFetch) {
+        nodesSupplier = NodesSupplier::create(endPoints, username, password);
+    } else {
+        nodesSupplier = make_shared<StaticNodesSupplier>(endPoints);
+    }
+}
+
+void Session::initDefaultSessionConnection() {
+    defaultEndPoint.__set_ip(host);
+    defaultEndPoint.__set_port(rpcPort);
+    defaultSessionConnection = make_shared<SessionConnection>(this, 
defaultEndPoint, zoneId, nodesSupplier, 60, 500,
+            sqlDialect, database);
+}
+
 void Session::open() {
     open(false, DEFAULT_TIMEOUT_MS);
 }
@@ -875,6 +925,15 @@ void Session::open(bool enableRPCCompression, int 
connectionTimeoutInMs) {
     }
 
     isClosed = false;
+    try {
+        initDefaultSessionConnection();
+    } catch (const exception &e) {
+        log_debug(e.what());
+        throw IoTDBException(e.what());
+    }
+    if (enableRedirection) {
+        endPointToSessionConnection.insert(make_pair(defaultEndPoint, 
defaultSessionConnection));
+    }
 }
 
 
@@ -929,8 +988,20 @@ void Session::insertRecord(const string &deviceId, int64_t 
time,
     req.__set_isAligned(false);
     TSStatus respStatus;
     try {
-        client->insertStringRecord(respStatus, req);
+        
getSessionConnection(deviceId)->getSessionClient()->insertStringRecord(respStatus,
 req);
         RpcUtils::verifySuccess(respStatus);
+    } catch (RedirectException& e) {
+        handleRedirection(deviceId, e.endPoint);
+    } catch (const IoTDBConnectionException &e) {
+        if (enableRedirection && deviceIdToEndpoint.find(deviceId) != 
deviceIdToEndpoint.end()) {
+            deviceIdToEndpoint.erase(deviceId);
+            try {
+                
defaultSessionConnection->getSessionClient()->insertStringRecord(respStatus, 
req);
+            } catch (RedirectException& e) {
+            }
+        } else {
+            throw IoTDBConnectionException(e.what());
+        }
     } catch (const TTransportException &e) {
         log_debug(e.what());
         throw IoTDBConnectionException(e.what());
@@ -1836,6 +1907,38 @@ int64_t Session::getSessionId() {
     return sessionId;
 }
 
+shared_ptr<SessionConnection> Session::getQuerySessionConnection() {
+    auto endPoint = nodesSupplier->getQueryEndPoint();
+    if (!endPoint.has_value() || endPointToSessionConnection.empty()) {
+        return defaultSessionConnection;
+    }
+
+    auto it = endPointToSessionConnection.find(endPoint.value());
+    if (it != endPointToSessionConnection.end()) {
+        return it->second;
+    }
+
+    shared_ptr<SessionConnection> newConnection;
+    try {
+        newConnection = make_shared<SessionConnection>(this, endPoint.value(), 
zoneId, nodesSupplier,
+        60, 500, sqlDialect, database);
+        endPointToSessionConnection.emplace(endPoint.value(), newConnection);
+        return newConnection;
+    } catch (exception &e) {
+        log_debug("Session::getQuerySessionConnection() exception: " + 
e.what());
+        return newConnection;
+    }
+}
+
+shared_ptr<SessionConnection> Session::getSessionConnection(std::string 
deviceId) {
+    if (!enableRedirection ||
+            deviceIdToEndpoint.find(deviceId) == deviceIdToEndpoint.end() ||
+            endPointToSessionConnection.find(deviceIdToEndpoint[deviceId]) == 
endPointToSessionConnection.end()) {
+        return defaultSessionConnection;
+    }
+    return 
endPointToSessionConnection.find(deviceIdToEndpoint[deviceId])->second;
+}
+
 string Session::getTimeZone() {
     if (!zoneId.empty()) {
         return zoneId;
@@ -1908,6 +2011,70 @@ unique_ptr<SessionDataSet> 
Session::executeQueryStatement(const string &sql, int
             statementId, client, sessionId, queryDataSet));
 }
 
+void Session::handleQueryRedirection(TEndPoint endPoint) {
+    if (!enableRedirection) return;
+    shared_ptr<SessionConnection> newConnection;
+    auto it = endPointToSessionConnection.find(endPoint);
+    if (it != endPointToSessionConnection.end()) {
+        newConnection = it->second;
+    } else {
+        try {
+            newConnection = make_shared<SessionConnection>(this, endPoint, 
zoneId, nodesSupplier,
+                    60, 500, sqlDialect, database);
+
+            endPointToSessionConnection.emplace(endPoint, newConnection);
+        } catch (exception &e) {
+            throw IoTDBConnectionException(e.what());
+        }
+    }
+    defaultSessionConnection = newConnection;
+}
+
+void Session::handleRedirection(const std::string& deviceId, TEndPoint 
endPoint) {
+    if (!enableRedirection) return;
+    if (endPoint.ip == "127.0.0.1") return;
+    deviceIdToEndpoint[deviceId] = endPoint;
+
+    shared_ptr<SessionConnection> newConnection;
+    auto it = endPointToSessionConnection.find(endPoint);
+    if (it != endPointToSessionConnection.end()) {
+        newConnection = it->second;
+    } else {
+        try {
+            newConnection = make_shared<SessionConnection>(this, endPoint, 
zoneId, nodesSupplier,
+                    60, 500, sqlDialect, database);
+            endPointToSessionConnection.emplace(endPoint, newConnection);
+        } catch (exception &e) {
+            deviceIdToEndpoint.erase(deviceId);
+            throw IoTDBConnectionException(e.what());
+        }
+    }
+}
+
+std::unique_ptr<SessionDataSet> 
Session::executeQueryStatementMayRedirect(const std::string &sql, int64_t 
timeoutInMs) {
+    auto sessionConnection = getQuerySessionConnection();
+    if (!sessionConnection) {
+        log_warn("Session connection not found");
+        return nullptr;
+    }
+    try {
+        return sessionConnection->executeQueryStatement(sql, timeoutInMs);
+    } catch (RedirectException& e) {
+        log_warn("Session connection redirect exception: " + e.what());
+        handleQueryRedirection(e.endPoint);
+        try {
+            return defaultSessionConnection->executeQueryStatement(sql, 
timeoutInMs);
+        } catch (exception& e) {
+            log_error("Exception while executing redirected query statement: 
%s", e.what());
+            throw ExecutionException(e.what());
+        }
+    } catch (exception& e) {
+        log_error("Exception while executing query statement: %s", e.what());
+        throw e;
+    }
+
+}
+
 void Session::executeNonQueryStatement(const string &sql) {
     TSExecuteStatementReq req;
     req.__set_sessionId(sessionId);
diff --git a/iotdb-client/client-cpp/src/main/Session.h 
b/iotdb-client/client-cpp/src/main/Session.h
index 32da7839709..b795055ab8d 100644
--- a/iotdb-client/client-cpp/src/main/Session.h
+++ b/iotdb-client/client-cpp/src/main/Session.h
@@ -41,7 +41,9 @@
 #include <thrift/transport/TTransportException.h>
 #include <thrift/transport/TBufferTransports.h>
 #include "IClientRPCService.h"
+#include "NodesSupplier.h"
 #include "AbstractSessionBuilder.h"
+#include "SessionConnection.h"
 
 //== For compatible with Windows OS ==
 #ifndef LONG_LONG_MIN
@@ -73,7 +75,6 @@ extern LogLevelType LOG_LEVEL;
 #define log_warn(fmt,...)  do {if(LOG_LEVEL <= LEVEL_WARN)  {string 
s=string("[WARN]  %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, 
__LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
 #define log_error(fmt,...) do {if(LOG_LEVEL <= LEVEL_ERROR) {string 
s=string("[ERROR] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__, 
__LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
 
-
 class IoTDBException : public std::exception {
 public:
     IoTDBException() {}
@@ -126,6 +127,25 @@ public:
     std::vector<TSStatus> statusList;
 };
 
+class RedirectException : public IoTDBException {
+public:
+    RedirectException() {}
+
+    explicit RedirectException(const char *m) : IoTDBException(m) {}
+
+    explicit RedirectException(const std::string &m) : IoTDBException(m) {}
+
+    RedirectException(const std::string &m, const TEndPoint& endPoint) : 
IoTDBException(m), endPoint(endPoint) {}
+
+    RedirectException(const std::string &m, const map<string, TEndPoint>& 
deviceEndPointMap) : IoTDBException(m), deviceEndPointMap(deviceEndPointMap) {}
+
+    RedirectException(const std::string &m, const vector<TEndPoint> 
endPointList) : IoTDBException(m), endPointList(endPointList) {}
+
+    TEndPoint endPoint;
+    map<string, TEndPoint> deviceEndPointMap;
+    vector<TEndPoint> endPointList;
+};
+
 class UnSupportedDataTypeException : public IoTDBException {
 public:
     UnSupportedDataTypeException() {}
@@ -262,6 +282,8 @@ public:
 
     static void verifySuccess(const TSStatus &status);
 
+    static void verifySuccessWithRedirection(const TSStatus &status);
+
     static void verifySuccess(const std::vector<TSStatus> &statuses);
 
     static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
@@ -685,6 +707,11 @@ public:
         }
     }
 
+    void addTimestamp(size_t rowIndex, int64_t timestamp) {
+        timestamps[rowIndex] = timestamp;
+        rowSize = max(rowSize, rowIndex + 1);
+    }
+
     template<typename T>
     void addValue(size_t schemaId, size_t rowIndex, const T& value) {
         if (schemaId >= schemas.size()) {
@@ -1098,8 +1125,31 @@ private:
     Version::Version version;
     std::string sqlDialect = "tree"; // default sql dialect
     std::string database;
+    bool enableAutoFetch = true;
+    bool enableRedirection = true;
+    std::shared_ptr<INodesSupplier> nodesSupplier;
+    friend class SessionConnection;
+    std::shared_ptr<SessionConnection> defaultSessionConnection;
+
+    TEndPoint defaultEndPoint;
+
+    struct TEndPointHash {
+        size_t operator()(const TEndPoint& endpoint) const {
+            return std::hash<std::string>()(endpoint.ip) ^ 
std::hash<int>()(endpoint.port);
+        }
+    };
+    struct TEndPointEqual {
+        bool operator()(const TEndPoint& lhs, const TEndPoint& rhs) const {
+            return lhs.ip == rhs.ip && lhs.port == rhs.port;
+        }
+    };
+    using EndPointSessionMap = std::unordered_map<TEndPoint, 
shared_ptr<SessionConnection>, TEndPointHash, TEndPointEqual>;
+    EndPointSessionMap endPointToSessionConnection;
+    std::unordered_map<std::string, TEndPoint> deviceIdToEndpoint;
 
 private:
+    void removeBrokenSessionConnection(shared_ptr<SessionConnection> 
sessionConnection);
+
     static bool checkSorted(const Tablet &tablet);
 
     static bool checkSorted(const std::vector<int64_t> &times);
@@ -1127,12 +1177,15 @@ private:
     std::string getVersionString(Version::Version version);
 
     void initZoneId();
+    void initNodesSupplier();
+    void initDefaultSessionConnection();
 
 public:
     Session(const std::string &host, int rpcPort) : username("root"), 
password("root"), version(Version::V_1_0) {
         this->host = host;
         this->rpcPort = rpcPort;
         initZoneId();
+        initNodesSupplier();
     }
 
     Session(const std::string &host, int rpcPort, const std::string &username, 
const std::string &password)
@@ -1143,6 +1196,7 @@ public:
         this->password = password;
         this->version = Version::V_1_0;
         initZoneId();
+        initNodesSupplier();
     }
 
     Session(const std::string &host, int rpcPort, const std::string &username, 
const std::string &password,
@@ -1155,6 +1209,7 @@ public:
         this->fetchSize = fetchSize;
         this->version = Version::V_1_0;
         initZoneId();
+        initNodesSupplier();
     }
 
     Session(const std::string &host, const std::string &rpcPort, const 
std::string &username = "user",
@@ -1167,6 +1222,7 @@ public:
         this->fetchSize = fetchSize;
         this->version = Version::V_1_0;
         initZoneId();
+        initNodesSupplier();
     }
 
     Session(AbstractSessionBuilder* builder) {
@@ -1179,7 +1235,10 @@ public:
         this->version = Version::V_1_0;
         this->sqlDialect = builder->sqlDialect;
         this->database = builder->database;
+        this->enableAutoFetch = builder->enableAutoFetch;
+        this->enableRedirection = builder->enableRedirections;
         initZoneId();
+        initNodesSupplier();
     }
 
     ~Session();
@@ -1202,6 +1261,10 @@ public:
 
     int64_t getSessionId();
 
+    shared_ptr<SessionConnection> getQuerySessionConnection();
+
+    shared_ptr<SessionConnection> getSessionConnection(std::string deviceId);
+
     void open();
 
     void open(bool enableRPCCompression);
@@ -1214,6 +1277,10 @@ public:
 
     std::string getTimeZone();
 
+    void handleQueryRedirection(TEndPoint endPoint);
+
+    void handleRedirection(const std::string& deviceId, TEndPoint endPoint);
+
     void insertRecord(const std::string &deviceId, int64_t time, const 
std::vector<std::string> &measurements,
                       const std::vector<std::string> &values);
 
@@ -1359,6 +1426,8 @@ public:
 
     std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string 
&sql, int64_t timeoutInMs) ;
 
+    std::unique_ptr<SessionDataSet> executeQueryStatementMayRedirect(const 
std::string &sql, int64_t timeoutInMs);
+
     void executeNonQueryStatement(const std::string &sql);
 
     std::unique_ptr<SessionDataSet> executeRawDataQuery(const 
std::vector<std::string> &paths, int64_t startTime, int64_t endTime);
diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.cpp 
b/iotdb-client/client-cpp/src/main/SessionConnection.cpp
new file mode 100644
index 00000000000..f78e1ff7769
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/SessionConnection.cpp
@@ -0,0 +1,256 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "SessionConnection.h"
+#include "Session.h"
+#include "common_types.h"
+#include <thrift/protocol/TCompactProtocol.h>
+
+#include <utility>
+
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+
+SessionConnection::SessionConnection(Session* session_ptr, const TEndPoint& 
endpoint,
+                     const std::string& zoneId,
+                     std::shared_ptr<INodesSupplier> nodeSupplier,
+                     int maxRetries,
+                     int64_t retryInterval,
+                     std::string dialect,
+                     std::string db)
+    : session(session_ptr),
+      zoneId(zoneId),
+      endPoint(endpoint),
+      availableNodes(std::move(nodeSupplier)),
+      maxRetryCount(maxRetries),
+      retryIntervalMs(retryInterval),
+      sqlDialect(std::move(dialect)),
+      database(std::move(db)) {
+    this->zoneId = zoneId.empty() ? getSystemDefaultZoneId() : zoneId;
+    endPointList.push_back(endpoint);
+    init(endPoint);
+}
+
+void SessionConnection::close() {
+    bool needThrowException = false;
+    string errMsg;
+    session = nullptr;
+    try {
+        TSCloseSessionReq req;
+        req.__set_sessionId(sessionId);
+        TSStatus tsStatus;
+        client->closeSession(tsStatus, req);
+    } catch (const TTransportException &e) {
+        log_debug(e.what());
+        throw IoTDBConnectionException(e.what());
+    } catch (const exception &e) {
+        log_debug(e.what());
+        errMsg = errMsg + "Session::close() client->closeSession() error, 
maybe remote server is down. " + e.what() + "\n" ;
+        needThrowException = true;
+    }
+
+    try {
+        if (transport->isOpen()) {
+            transport->close();
+        }
+    }
+    catch (const exception &e) {
+        log_debug(e.what());
+        errMsg = errMsg + "Session::close() transport->close() error. " + 
e.what() + "\n" ;
+        needThrowException = true;
+    }
+
+    if (needThrowException) {
+        throw IoTDBException(errMsg);
+    }
+}
+
+SessionConnection::~SessionConnection() {
+    try {
+        close();
+    } catch (const exception &e) {
+        log_debug(e.what());
+    }
+}
+
+void SessionConnection::init(const TEndPoint& endpoint) {
+    shared_ptr<TSocket> socket(new TSocket(endpoint.ip, endpoint.port));
+    transport = std::make_shared<TFramedTransport>(socket);
+    socket->setConnTimeout(connectionTimeoutInMs);
+    if (!transport->isOpen()) {
+        try {
+            transport->open();
+        }
+        catch (TTransportException &e) {
+            log_debug(e.what());
+            throw IoTDBConnectionException(e.what());
+        }
+    }
+    if (enableRPCCompression) {
+        shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(transport));
+        client = std::make_shared<IClientRPCServiceClient>(protocol);
+    } else {
+        shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+        client = std::make_shared<IClientRPCServiceClient>(protocol);
+    }
+
+    std::map<std::string, std::string> configuration;
+    configuration["version"] = session->getVersionString(session->version);
+    configuration["sql_dialect"] = sqlDialect;
+    if (database != "") {
+        configuration["db"] = database;
+    }
+    TSOpenSessionReq openReq;
+    openReq.__set_username(session->username);
+    openReq.__set_password(session->password);
+    openReq.__set_zoneId(zoneId);
+    openReq.__set_configuration(configuration);
+    try {
+        TSOpenSessionResp openResp;
+        client->openSession(openResp, openReq);
+        RpcUtils::verifySuccess(openResp.status);
+        if (session->protocolVersion != openResp.serverProtocolVersion) {
+            if (openResp.serverProtocolVersion == 0) {// less than 0.10
+                throw logic_error(string("Protocol not supported, Client 
version is ") +
+                    to_string(session->protocolVersion) +
+                    ", but Server version is " + 
to_string(openResp.serverProtocolVersion));
+            }
+        }
+
+        sessionId = openResp.sessionId;
+        statementId = client->requestStatementId(sessionId);
+
+        if (!zoneId.empty()) {
+            setTimeZone(zoneId);
+        }
+    } catch (const TTransportException &e) {
+        log_debug(e.what());
+        transport->close();
+        throw IoTDBConnectionException(e.what());
+    } catch (const IoTDBException &e) {
+        log_debug(e.what());
+        transport->close();
+        throw;
+    } catch (const exception &e) {
+        log_debug(e.what());
+        transport->close();
+        throw;
+    }
+}
+
+std::unique_ptr<SessionDataSet> SessionConnection::executeQueryStatement(const 
std::string& sql, int64_t timeoutInMs) {
+    TSExecuteStatementReq req;
+    req.__set_sessionId(sessionId);
+    req.__set_statementId(statementId);
+    req.__set_statement(sql);
+    req.__set_timeout(timeoutInMs);
+    req.__set_enableRedirectQuery(true);
+    TSExecuteStatementResp resp;
+    try {
+        client->executeStatement(resp, req);
+        RpcUtils::verifySuccessWithRedirection(resp.status);
+    } catch (const TException &e) {
+        log_debug(e.what());
+        if (reconnect()) {
+            try {
+                req.__set_sessionId(sessionId);
+                req.__set_statementId(statementId);
+                client->executeStatement(resp, req);
+            } catch (TException &e) {
+                throw IoTDBConnectionException(e.what());
+            }
+        } else {
+            throw IoTDBConnectionException(e.what());
+        }
+    }
+    std::shared_ptr<TSQueryDataSet> queryDataSet(new 
TSQueryDataSet(resp.queryDataSet));
+    return std::unique_ptr<SessionDataSet>(new SessionDataSet(
+            sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, 
resp.ignoreTimeStamp, resp.queryId,
+            statementId, client, sessionId, queryDataSet));
+}
+
+const TEndPoint& SessionConnection::getEndPoint() {
+    return endPoint;
+}
+
+void SessionConnection::setTimeZone(const std::string& newZoneId) {
+    TSSetTimeZoneReq req;
+    req.__set_sessionId(sessionId);
+    req.__set_timeZone(newZoneId);
+
+    try {
+        TSStatus tsStatus;
+        client->setTimeZone(tsStatus, req);
+        zoneId = newZoneId;
+    } catch (const TException& e) {
+        throw IoTDBConnectionException(e.what());
+    }
+}
+
+std::string SessionConnection::getSystemDefaultZoneId() {
+    time_t ts = 0;
+    struct tm tmv{};
+#if defined(_WIN64) || defined (WIN32) || defined (_WIN32)
+    localtime_s(&tmv, &ts);
+#else
+    localtime_r(&ts, &tmv);
+#endif
+    char zoneStr[32];
+    strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
+    return zoneStr;
+}
+
+bool SessionConnection::reconnect() {
+    bool reconnect = false;
+    for (int i = 1; i <= 3; i++) {
+        if (transport != nullptr) {
+            transport->close();
+            endPointList = std::move(availableNodes->getEndPointList());
+            int currHostIndex = rand() % endPointList.size();
+            int tryHostNum = 0;
+            for (int j = currHostIndex; j < endPointList.size(); j++) {
+                if (tryHostNum == endPointList.size()) {
+                    break;
+                }
+                this->endPoint = endPointList[j];
+                if (j == endPointList.size() - 1) {
+                    j = -1;
+                }
+                tryHostNum++;
+                try {
+                    init(this->endPoint);
+                    reconnect = true;
+                } catch (const IoTDBConnectionException &e) {
+                    log_warn("The current node may have been down, connection 
exception: %s", e.what());
+                    continue;
+                } catch (exception &e) {
+                    log_warn("login in failed, because  %s", e.what());
+                }
+                break;
+            }
+        }
+        if (reconnect) {
+            session->removeBrokenSessionConnection(shared_from_this());
+            session->defaultEndPoint = this->endPoint;
+            session->defaultSessionConnection = shared_from_this();
+            
session->endPointToSessionConnection.insert(make_pair(this->endPoint, 
shared_from_this()));
+        }
+    }
+    return reconnect;
+}
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.h 
b/iotdb-client/client-cpp/src/main/SessionConnection.h
new file mode 100644
index 00000000000..8e8a6d17ac4
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/SessionConnection.h
@@ -0,0 +1,81 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef IOTDB_SESSIONCONNECTION_H
+#define IOTDB_SESSIONCONNECTION_H
+
+#include <memory>
+#include <vector>
+#include <string>
+#include <functional>
+#include <thrift/transport/TTransport.h>
+#include "IClientRPCService.h"
+#include "common_types.h"
+#include "NodesSupplier.h"
+
+class SessionDataSet;
+class Session;
+
+class SessionConnection : std::enable_shared_from_this<SessionConnection> {
+public:
+    SessionConnection(Session* session_ptr, const TEndPoint& endpoint,
+                     const std::string& zoneId,
+                     std::shared_ptr<INodesSupplier> nodeSupplier,
+                     int maxRetries = 60,
+                     int64_t retryInterval = 500,
+                     std::string dialect = "tree",
+                     std::string db = "");
+
+    ~SessionConnection();
+
+    void setTimeZone(const std::string& newZoneId);
+
+
+    const TEndPoint& getEndPoint();
+
+    void init(const TEndPoint& endpoint);
+
+    std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string& 
sql, int64_t timeoutInMs = -1);
+
+    std::shared_ptr<IClientRPCServiceClient> getSessionClient() {
+        return client;
+    }
+
+private:
+    void close();
+    std::string getSystemDefaultZoneId();
+    bool reconnect();
+
+    std::shared_ptr<apache::thrift::transport::TTransport> transport;
+    std::shared_ptr<IClientRPCServiceClient> client;
+    Session* session;
+    int64_t sessionId;
+    int64_t statementId;
+    int64_t connectionTimeoutInMs;
+    bool enableRPCCompression = false;
+    std::string zoneId;
+    TEndPoint endPoint;
+    std::vector<TEndPoint> endPointList;
+    std::shared_ptr<INodesSupplier> availableNodes;
+    int maxRetryCount;
+    int64_t retryIntervalMs;
+    std::string sqlDialect;
+    std::string database;
+};
+
+#endif
diff --git a/iotdb-client/client-cpp/src/main/ThriftConnection.cpp 
b/iotdb-client/client-cpp/src/main/ThriftConnection.cpp
new file mode 100644
index 00000000000..23c2890c34a
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/ThriftConnection.cpp
@@ -0,0 +1,158 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ThriftConnection.h"
+#include <ctime>
+#include <iostream>
+#include <thrift/transport/TSocket.h>
+
+#include "Session.h"
+
+const int ThriftConnection::THRIFT_DEFAULT_BUFFER_SIZE = 4096;
+const int ThriftConnection::THRIFT_MAX_FRAME_SIZE = 1048576;
+const int ThriftConnection::CONNECTION_TIMEOUT_IN_MS = 1000;
+
+ThriftConnection::ThriftConnection(const TEndPoint& endPoint,
+                                   int thriftDefaultBufferSize,
+                                   int thriftMaxFrameSize,
+                                   int connectionTimeoutInMs)
+    : endPoint(endPoint),
+      thriftDefaultBufferSize(thriftDefaultBufferSize),
+      thriftMaxFrameSize(thriftMaxFrameSize),
+      connectionTimeoutInMs(connectionTimeoutInMs) {}
+
+ThriftConnection::~ThriftConnection() = default;
+
+void ThriftConnection::initZoneId() {
+    if (!zoneId.empty()) {
+        return;
+    }
+
+    time_t ts = 0;
+    struct tm tmv{};
+#if defined(_WIN64) || defined (WIN32) || defined (_WIN32)
+    localtime_s(&tmv, &ts);
+#else
+    localtime_r(&ts, &tmv);
+#endif
+
+    char zoneStr[32];
+    strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
+    zoneId = zoneStr;
+}
+
+void ThriftConnection::init(const std::string& username,
+                            const std::string& password,
+                            bool enableRPCCompression,
+                            const std::string& zoneId,
+                            const std::string& version) {
+    std::shared_ptr<TSocket> socket(new TSocket(endPoint.ip, endPoint.port));
+    socket->setConnTimeout(connectionTimeoutInMs);
+    transport = std::make_shared<TFramedTransport>(socket);
+    if (!transport->isOpen()) {
+        try {
+            transport->open();
+        }
+        catch (TTransportException &e) {
+            throw IoTDBConnectionException(e.what());
+        }
+    }
+    if (zoneId.empty()) {
+        initZoneId();
+    } else {
+        this->zoneId = zoneId;
+    }
+
+    if (enableRPCCompression) {
+        std::shared_ptr<TCompactProtocol> protocol(new 
TCompactProtocol(transport));
+        client = std::make_shared<IClientRPCServiceClient>(protocol);
+    } else {
+        std::shared_ptr<TBinaryProtocol> protocol(new 
TBinaryProtocol(transport));
+        client = std::make_shared<IClientRPCServiceClient>(protocol);
+    }
+
+    std::map<std::string, std::string> configuration;
+    configuration["version"] = version;
+    TSOpenSessionReq openReq;
+    openReq.__set_username(username);
+    openReq.__set_password(password);
+    openReq.__set_zoneId(this->zoneId);
+    openReq.__set_configuration(configuration);
+    try {
+        TSOpenSessionResp openResp;
+        client->openSession(openResp, openReq);
+        RpcUtils::verifySuccess(openResp.status);
+        sessionId = openResp.sessionId;
+        statementId = client->requestStatementId(sessionId);
+    } catch (const TTransportException &e) {
+        transport->close();
+        throw IoTDBConnectionException(e.what());
+    } catch (const IoTDBException &e) {
+        transport->close();
+        throw IoTDBException(e.what());
+    } catch (const std::exception &e) {
+        transport->close();
+        throw IoTDBException(e.what());
+    }
+}
+
+std::unique_ptr<SessionDataSet> ThriftConnection::executeQueryStatement(const 
std::string& sql, int64_t timeoutInMs) {
+    TSExecuteStatementReq req;
+    req.__set_sessionId(sessionId);
+    req.__set_statementId(statementId);
+    req.__set_statement(sql);
+    req.__set_timeout(timeoutInMs);
+    TSExecuteStatementResp resp;
+    try {
+        client->executeStatement(resp, req);
+        RpcUtils::verifySuccess(resp.status);
+    } catch (const TTransportException &e) {
+        throw IoTDBConnectionException(e.what());
+    } catch (const IoTDBException &e) {
+        throw IoTDBConnectionException(e.what());
+    } catch (const std::exception &e) {
+        throw IoTDBException(e.what());
+    }
+    std::shared_ptr<TSQueryDataSet> queryDataSet(new 
TSQueryDataSet(resp.queryDataSet));
+    return std::unique_ptr<SessionDataSet>(new SessionDataSet(
+            sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap, 
resp.ignoreTimeStamp, resp.queryId,
+            statementId, client, sessionId, queryDataSet));
+}
+
+void ThriftConnection::close() {
+    try {
+        if (client) {
+            TSCloseSessionReq req;
+            req.__set_sessionId(sessionId);
+            TSStatus tsStatus;
+            client->closeSession(tsStatus, req);
+        }
+    } catch (const TTransportException &e) {
+        throw IoTDBConnectionException(e.what());
+    } catch (const std::exception &e) {
+        throw IoTDBConnectionException(e.what());
+    }
+
+    try {
+        if (transport->isOpen()) {
+            transport->close();
+        }
+    } catch (const std::exception &e) {
+        throw IoTDBConnectionException(e.what());
+    }
+}
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/ThriftConnection.h 
b/iotdb-client/client-cpp/src/main/ThriftConnection.h
new file mode 100644
index 00000000000..2810036078c
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/ThriftConnection.h
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef IOTDB_THRIFTCONNECTION_H
+#define IOTDB_THRIFTCONNECTION_H
+
+#include <memory>
+#include <thrift/transport/TBufferTransports.h>
+#include "IClientRPCService.h"
+
+class SessionDataSet;
+
+class ThriftConnection {
+public:
+    static const int THRIFT_DEFAULT_BUFFER_SIZE;
+    static const int THRIFT_MAX_FRAME_SIZE;
+    static const int CONNECTION_TIMEOUT_IN_MS;
+
+    explicit ThriftConnection(const TEndPoint& endPoint,
+                     int thriftDefaultBufferSize = THRIFT_DEFAULT_BUFFER_SIZE,
+                     int thriftMaxFrameSize = THRIFT_MAX_FRAME_SIZE,
+                     int connectionTimeoutInMs = CONNECTION_TIMEOUT_IN_MS);
+
+    ~ThriftConnection();
+
+    void init(const std::string& username,
+              const std::string& password,
+              bool enableRPCCompression = false,
+              const std::string& zoneId = std::string(),
+              const std::string& version = "V_1_0");
+
+    std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string& 
sql, int64_t timeoutInMs = -1);
+
+    void close();
+
+private:
+    TEndPoint endPoint;
+
+    int thriftDefaultBufferSize;
+    int thriftMaxFrameSize;
+    int connectionTimeoutInMs;
+
+    std::shared_ptr<apache::thrift::transport::TTransport> transport;
+    std::shared_ptr<IClientRPCServiceClient> client;
+    int64_t sessionId{};
+    int64_t statementId{};
+    std::string zoneId;
+    int timeFactor{};
+
+    void initZoneId();
+};
+
+#endif

Reply via email to