This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 66b1997b172 Feature/node supplier (#15230)
66b1997b172 is described below
commit 66b1997b172f55f1fe0461ed43f942faf02297bb
Author: Hongzhi Gao <[email protected]>
AuthorDate: Mon Apr 21 11:08:38 2025 +0800
Feature/node supplier (#15230)
* Implement node list retrieval
* fix NodeSupplier
* fix NodeSupplier
* fix NodeSupplier
* fix NodeSupplier
* fix NodeSupplier
* fix NodeSupplier
* fix NodeSupplier
* Extract INodesSuppiler as an abstract class
* implement executeQueryStatementMayRedirect
* Add redirection, reconnection and backup node support for IoTDB CPP client
* Implement client-side caching for the insertRecord method
* Removed some redundant code
---
iotdb-client/client-cpp/pom.xml | 24 ++
.../client-cpp/src/main/AbstractSessionBuilder.h | 3 +
iotdb-client/client-cpp/src/main/NodesSupplier.cpp | 221 ++++++++++++++++++
iotdb-client/client-cpp/src/main/NodesSupplier.h | 137 +++++++++++
iotdb-client/client-cpp/src/main/Session.cpp | 169 +++++++++++++-
iotdb-client/client-cpp/src/main/Session.h | 71 +++++-
.../client-cpp/src/main/SessionConnection.cpp | 256 +++++++++++++++++++++
.../client-cpp/src/main/SessionConnection.h | 81 +++++++
.../client-cpp/src/main/ThriftConnection.cpp | 158 +++++++++++++
.../client-cpp/src/main/ThriftConnection.h | 68 ++++++
10 files changed, 1186 insertions(+), 2 deletions(-)
diff --git a/iotdb-client/client-cpp/pom.xml b/iotdb-client/client-cpp/pom.xml
index 98f7c6f9d11..abde39dbae0 100644
--- a/iotdb-client/client-cpp/pom.xml
+++ b/iotdb-client/client-cpp/pom.xml
@@ -175,6 +175,30 @@
<sourceFile>${project.basedir}/src/main/TableSessionBuilder.h</sourceFile>
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/TableSessionBuilder.h</destinationFile>
</fileSet>
+ <fileSet>
+
<sourceFile>${project.basedir}/src/main/NodesSupplier.h</sourceFile>
+
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/NodesSupplier.h</destinationFile>
+ </fileSet>
+ <fileSet>
+
<sourceFile>${project.basedir}/src/main/NodesSupplier.cpp</sourceFile>
+
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/NodesSupplier.cpp</destinationFile>
+ </fileSet>
+ <fileSet>
+
<sourceFile>${project.basedir}/src/main/ThriftConnection.h</sourceFile>
+
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/ThriftConnection.h</destinationFile>
+ </fileSet>
+ <fileSet>
+
<sourceFile>${project.basedir}/src/main/ThriftConnection.cpp</sourceFile>
+
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/ThriftConnection.cpp</destinationFile>
+ </fileSet>
+ <fileSet>
+
<sourceFile>${project.basedir}/src/main/SessionConnection.h</sourceFile>
+
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/SessionConnection.h</destinationFile>
+ </fileSet>
+ <fileSet>
+
<sourceFile>${project.basedir}/src/main/SessionConnection.cpp</sourceFile>
+
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/SessionConnection.cpp</destinationFile>
+ </fileSet>
<fileSet>
<sourceFile>${project.basedir}/src/main/AbstractSessionBuilder.h</sourceFile>
<destinationFile>${project.build.directory}/build/main/generated-sources-cpp/AbstractSessionBuilder.h</destinationFile>
diff --git a/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
b/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
index 1b4d9a729a1..441e3a41f4f 100644
--- a/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
+++ b/iotdb-client/client-cpp/src/main/AbstractSessionBuilder.h
@@ -32,6 +32,9 @@ public:
int fetchSize = 10000;
std::string sqlDialect = "tree";
std::string database = "";
+ bool enableAutoFetch = true;
+ bool enableRedirections = true;
+ bool enableRPCCompression = false;
};
#endif // IOTDB_ABSTRACTSESSIONBUILDER_H
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
b/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
new file mode 100644
index 00000000000..932ba0c17fa
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/NodesSupplier.cpp
@@ -0,0 +1,221 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "NodesSupplier.h"
+#include "Session.h"
+#include <algorithm>
+#include <iostream>
+#include <utility>
+
+const std::string NodesSupplier::SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
+const std::string NodesSupplier::STATUS_COLUMN_NAME = "Status";
+const std::string NodesSupplier::IP_COLUMN_NAME = "RpcAddress";
+const std::string NodesSupplier::PORT_COLUMN_NAME = "RpcPort";
+const std::string NodesSupplier::REMOVING_STATUS = "Removing";
+
+const int64_t NodesSupplier::TIMEOUT_IN_MS = 60000;
+const int NodesSupplier::FETCH_SIZE = 10000;
+const int NodesSupplier::THRIFT_DEFAULT_BUFFER_SIZE = 4096;
+const int NodesSupplier::THRIFT_MAX_FRAME_SIZE = 1048576;
+const int NodesSupplier::CONNECTION_TIMEOUT_IN_MS = 1000;
+
+TEndPoint RoundRobinPolicy::select(const std::vector<TEndPoint>& nodes) {
+ static std::atomic_uint index{0};
+
+ if (nodes.empty()) {
+ throw IoTDBException("No available nodes");
+ }
+
+ return nodes[index++ % nodes.size()];
+}
+
+StaticNodesSupplier::StaticNodesSupplier(const std::vector<TEndPoint>& nodes,
+ NodeSelectionPolicy policy)
+ : availableNodes_(nodes), policy_(std::move(policy)) {}
+
+boost::optional<TEndPoint> StaticNodesSupplier::getQueryEndPoint() {
+ try {
+ if (availableNodes_.empty()) {
+ return boost::none;
+ }
+ return policy_(availableNodes_);
+ } catch (const IoTDBException& e) {
+ return boost::none;
+ }
+}
+
+std::vector<TEndPoint> StaticNodesSupplier::getEndPointList() {
+ return availableNodes_;
+}
+
+StaticNodesSupplier::~StaticNodesSupplier() = default;
+
+std::shared_ptr<NodesSupplier> NodesSupplier::create(
+ std::vector<TEndPoint> endpoints,
+ std::string userName, std::string password, std::string zoneId,
+ int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
+ int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
+ std::string version, std::chrono::milliseconds refreshInterval,
+ NodeSelectionPolicy policy) {
+ if (endpoints.empty()) {
+ return nullptr;
+ }
+ auto supplier = std::make_shared<NodesSupplier>(
+ userName, password, zoneId, thriftDefaultBufferSize,
+ thriftMaxFrameSize, connectionTimeoutInMs, useSSL,
+ enableRPCCompression, version, std::move(endpoints), std::move(policy)
+ );
+ supplier->startBackgroundRefresh(refreshInterval);
+ return supplier;
+}
+
+NodesSupplier::NodesSupplier(
+ std::string userName, std::string password, const std::string& zoneId,
+ int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
+ int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
+ std::string version, std::vector<TEndPoint> endpoints, NodeSelectionPolicy
policy) : userName(std::move(userName)), password(std::move(password)),
zoneId(zoneId),
+ thriftDefaultBufferSize(thriftDefaultBufferSize),
thriftMaxFrameSize(thriftMaxFrameSize),
+ connectionTimeoutInMs(connectionTimeoutInMs), useSSL(useSSL),
enableRPCCompression(enableRPCCompression), version(version),
endpoints(std::move(endpoints)),
+ selectionPolicy(std::move(policy)) {
+ deduplicateEndpoints();
+}
+
+std::vector<TEndPoint> NodesSupplier::getEndPointList() {
+ std::lock_guard<std::mutex> lock(mutex);
+ return endpoints;
+}
+
+TEndPoint NodesSupplier::selectQueryEndpoint() {
+ std::lock_guard<std::mutex> lock(mutex);
+ try {
+ return selectionPolicy(endpoints);
+ } catch (const std::exception& e) {
+ log_error("NodesSupplier::selectQueryEndpoint exception: %s",
e.what());
+ throw IoTDBException("NodesSupplier::selectQueryEndpoint exception, "
+ std::string(e.what()));
+ }
+}
+
+boost::optional<TEndPoint> NodesSupplier::getQueryEndPoint() {
+ try {
+ return selectQueryEndpoint();
+ } catch (const IoTDBException& e) {
+ return boost::none;
+ }
+}
+
+NodesSupplier::~NodesSupplier() {
+ stopBackgroundRefresh();
+ client->close();
+}
+
+void NodesSupplier::deduplicateEndpoints() {
+ std::vector<TEndPoint> uniqueEndpoints;
+ uniqueEndpoints.reserve(endpoints.size());
+ for (const auto& endpoint : endpoints) {
+ if (std::find(uniqueEndpoints.begin(), uniqueEndpoints.end(),
endpoint) == uniqueEndpoints.end()) {
+ uniqueEndpoints.push_back(endpoint);
+ }
+ }
+ endpoints = std::move(uniqueEndpoints);
+}
+
+void NodesSupplier::startBackgroundRefresh(std::chrono::milliseconds interval)
{
+ isRunning = true;
+ refreshThread = std::thread([this, interval] {
+ while (isRunning) {
+ refreshEndpointList();
+ std::unique_lock<std::mutex> cvLock(this->mutex);
+ refreshCondition.wait_for(cvLock, interval, [this]() {
+ return !isRunning.load();
+ });
+ }
+ });
+}
+
+std::vector<TEndPoint> NodesSupplier::fetchLatestEndpoints() {
+ try {
+ if (client == nullptr) {
+ client =
std::make_shared<ThriftConnection>(selectionPolicy(endpoints));
+ client->init(userName, password, enableRPCCompression, zoneId,
version);
+ }
+
+ auto sessionDataSet =
client->executeQueryStatement(SHOW_DATA_NODES_COMMAND);
+
+ uint32_t columnAddrIdx = -1, columnPortIdx = -1, columnStatusIdx = -1;
+ auto columnNames = sessionDataSet->getColumnNames();
+ for (uint32_t i = 0; i < columnNames.size(); i++) {
+ if (columnNames[i] == IP_COLUMN_NAME) {
+ columnAddrIdx = i;
+ } else if (columnNames[i] == PORT_COLUMN_NAME) {
+ columnPortIdx = i;
+ } else if (columnNames[i] == STATUS_COLUMN_NAME) {
+ columnStatusIdx = i;
+ }
+ }
+
+ if (columnAddrIdx == -1 || columnPortIdx == -1 || columnStatusIdx ==
-1) {
+ throw IoTDBException("Required columns not found in query
result.");
+ }
+
+ std::vector<TEndPoint> ret;
+ while (sessionDataSet->hasNext()) {
+ RowRecord* record = sessionDataSet->next();
+ std::string ip = record->fields.at(columnAddrIdx).stringV;
+ int32_t port = record->fields.at(columnPortIdx).intV;
+ std::string status = record->fields.at(columnStatusIdx).stringV;
+
+ if (ip == "0.0.0.0" || status == REMOVING_STATUS) {
+ log_warn("Skipping invalid node: " + ip + ":" +
to_string(port));
+ continue;
+ }
+ TEndPoint endpoint;
+ endpoint.ip = ip;
+ endpoint.port = port;
+ ret.emplace_back(endpoint);
+ }
+
+ return ret;
+ } catch (const IoTDBException& e) {
+ client.reset();
+ throw IoTDBException(std::string("NodesSupplier::fetchLatestEndpoints
failed: ") + e.what());
+ }
+}
+
+void NodesSupplier::refreshEndpointList() {
+ try {
+ auto newEndpoints = fetchLatestEndpoints();
+ if (newEndpoints.empty()) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> lock(mutex);
+ endpoints.swap(newEndpoints);
+ deduplicateEndpoints();
+ } catch (const IoTDBException& e) {
+ log_error(std::string("NodesSupplier::refreshEndpointList failed: ") +
e.what());
+ }
+}
+
+void NodesSupplier::stopBackgroundRefresh() noexcept {
+ if (isRunning.exchange(false)) {
+ refreshCondition.notify_all();
+ if (refreshThread.joinable()) {
+ refreshThread.join();
+ }
+ }
+}
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/NodesSupplier.h
b/iotdb-client/client-cpp/src/main/NodesSupplier.h
new file mode 100644
index 00000000000..27965afa968
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/NodesSupplier.h
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef IOTDB_NODES_SUPPLIER_H
+#define IOTDB_NODES_SUPPLIER_H
+
+#include <vector>
+#include <atomic>
+#include <boost/optional.hpp>
+#include <mutex>
+#include <chrono>
+#include <thread>
+#include <functional>
+#include <condition_variable>
+#include <algorithm>
+
+#include "ThriftConnection.h"
+
+class TEndPoint;
+
+class RoundRobinPolicy {
+public:
+ static TEndPoint select(const std::vector<TEndPoint>& nodes);
+};
+
+class INodesSupplier {
+public:
+ virtual ~INodesSupplier() = default;
+ virtual boost::optional<TEndPoint> getQueryEndPoint() = 0;
+ virtual std::vector<TEndPoint> getEndPointList() = 0;
+ using NodeSelectionPolicy = std::function<TEndPoint(const
std::vector<TEndPoint>&)>;
+};
+
+class StaticNodesSupplier : public INodesSupplier {
+public:
+ explicit StaticNodesSupplier(const std::vector<TEndPoint>& nodes,
+ NodeSelectionPolicy policy =
RoundRobinPolicy::select);
+
+ boost::optional<TEndPoint> getQueryEndPoint() override;
+
+ std::vector<TEndPoint> getEndPointList() override;
+
+ ~StaticNodesSupplier() override;
+
+private:
+ const std::vector<TEndPoint> availableNodes_;
+ NodeSelectionPolicy policy_;
+};
+
+class NodesSupplier : public INodesSupplier {
+public:
+ static const std::string SHOW_DATA_NODES_COMMAND;
+ static const std::string STATUS_COLUMN_NAME;
+ static const std::string IP_COLUMN_NAME;
+ static const std::string PORT_COLUMN_NAME;
+ static const std::string REMOVING_STATUS;
+
+ static const int64_t TIMEOUT_IN_MS;
+ static const int FETCH_SIZE;
+ static const int THRIFT_DEFAULT_BUFFER_SIZE;
+ static const int THRIFT_MAX_FRAME_SIZE;
+ static const int CONNECTION_TIMEOUT_IN_MS;
+
+ static std::shared_ptr<NodesSupplier> create(
+ std::vector<TEndPoint> endpoints,
+ std::string userName, std::string password, std::string zoneId = "",
+ int32_t thriftDefaultBufferSize =
ThriftConnection::THRIFT_DEFAULT_BUFFER_SIZE,
+ int32_t thriftMaxFrameSize = ThriftConnection::THRIFT_MAX_FRAME_SIZE,
+ int32_t connectionTimeoutInMs =
ThriftConnection::CONNECTION_TIMEOUT_IN_MS,
+ bool useSSL = false, bool enableRPCCompression = false,
+ std::string version = "V_1_0",
+ std::chrono::milliseconds refreshInterval =
std::chrono::milliseconds(TIMEOUT_IN_MS),
+ NodeSelectionPolicy policy = RoundRobinPolicy::select
+ );
+
+ NodesSupplier(
+ std::string userName, std::string password, const std::string& zoneId,
+ int32_t thriftDefaultBufferSize, int32_t thriftMaxFrameSize,
+ int32_t connectionTimeoutInMs, bool useSSL, bool enableRPCCompression,
+ std::string version, std::vector<TEndPoint> endpoints,
NodeSelectionPolicy policy
+ );
+ std::vector<TEndPoint> getEndPointList() override;
+
+ boost::optional<TEndPoint> getQueryEndPoint() override;
+
+ ~NodesSupplier() override;
+
+private:
+ std::string userName;
+ std::string password;
+ int32_t thriftDefaultBufferSize;
+ int32_t thriftMaxFrameSize;
+ int32_t connectionTimeoutInMs;
+ bool useSSL;
+ bool enableRPCCompression;
+ std::string version;
+ std::string zoneId;
+
+ std::mutex mutex;
+ std::vector<TEndPoint> endpoints;
+ NodeSelectionPolicy selectionPolicy;
+
+ std::atomic<bool> isRunning{false};
+ std::thread refreshThread;
+ std::condition_variable refreshCondition;
+
+ std::shared_ptr<ThriftConnection> client;
+
+ void deduplicateEndpoints();
+
+ void startBackgroundRefresh(std::chrono::milliseconds interval);
+
+ std::vector<TEndPoint> fetchLatestEndpoints();
+
+ void refreshEndpointList();
+
+ TEndPoint selectQueryEndpoint();
+
+ void stopBackgroundRefresh() noexcept;
+};
+
+#endif
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/Session.cpp
b/iotdb-client/client-cpp/src/main/Session.cpp
index 316bd1d9443..bc75effc86d 100644
--- a/iotdb-client/client-cpp/src/main/Session.cpp
+++ b/iotdb-client/client-cpp/src/main/Session.cpp
@@ -21,6 +21,7 @@
#include <algorithm>
#include <memory>
#include <time.h>
+#include "NodesSupplier.h"
using namespace std;
@@ -56,6 +57,29 @@ void RpcUtils::verifySuccess(const TSStatus &status) {
}
}
+void RpcUtils::verifySuccessWithRedirection(const TSStatus &status) {
+ verifySuccess(status);
+ if (status.__isset.redirectNode) {
+ throw RedirectException(to_string(status.code) + ": " +
status.message, status.redirectNode);
+ }
+ if (status.__isset.subStatus) {
+ auto statusSubStatus = status.subStatus;
+ vector<TEndPoint> endPointList(statusSubStatus.size());
+ int count = 0;
+ for (TSStatus subStatus : statusSubStatus) {
+ if (subStatus.__isset.redirectNode) {
+ endPointList[count++] = subStatus.redirectNode;
+ } else {
+ TEndPoint endPoint;
+ endPointList[count++] = endPoint;
+ }
+ }
+ if (!endPointList.empty()) {
+ throw RedirectException(to_string(status.code) + ": " +
status.message, endPointList);
+ }
+ }
+}
+
void RpcUtils::verifySuccess(const vector<TSStatus> &statuses) {
for (const TSStatus &status: statuses) {
if (status.code != TSStatusCode::SUCCESS_STATUS) {
@@ -587,6 +611,12 @@ Session::~Session() {
}
}
+void Session::removeBrokenSessionConnection(shared_ptr<SessionConnection>
sessionConnection) {
+ if (enableRedirection) {
+
this->endPointToSessionConnection.erase(sessionConnection->getEndPoint());
+ }
+}
+
/**
* check whether the batch has been sorted
*
@@ -795,6 +825,26 @@ void Session::initZoneId() {
zoneId = zoneStr;
}
+void Session::initNodesSupplier() {
+ std::vector<TEndPoint> endPoints;
+ TEndPoint endPoint;
+ endPoint.__set_ip(host);
+ endPoint.__set_port(rpcPort);
+ endPoints.emplace_back(endPoint);
+ if (enableAutoFetch) {
+ nodesSupplier = NodesSupplier::create(endPoints, username, password);
+ } else {
+ nodesSupplier = make_shared<StaticNodesSupplier>(endPoints);
+ }
+}
+
+void Session::initDefaultSessionConnection() {
+ defaultEndPoint.__set_ip(host);
+ defaultEndPoint.__set_port(rpcPort);
+ defaultSessionConnection = make_shared<SessionConnection>(this,
defaultEndPoint, zoneId, nodesSupplier, 60, 500,
+ sqlDialect, database);
+}
+
void Session::open() {
open(false, DEFAULT_TIMEOUT_MS);
}
@@ -875,6 +925,15 @@ void Session::open(bool enableRPCCompression, int
connectionTimeoutInMs) {
}
isClosed = false;
+ try {
+ initDefaultSessionConnection();
+ } catch (const exception &e) {
+ log_debug(e.what());
+ throw IoTDBException(e.what());
+ }
+ if (enableRedirection) {
+ endPointToSessionConnection.insert(make_pair(defaultEndPoint,
defaultSessionConnection));
+ }
}
@@ -929,8 +988,20 @@ void Session::insertRecord(const string &deviceId, int64_t
time,
req.__set_isAligned(false);
TSStatus respStatus;
try {
- client->insertStringRecord(respStatus, req);
+
getSessionConnection(deviceId)->getSessionClient()->insertStringRecord(respStatus,
req);
RpcUtils::verifySuccess(respStatus);
+ } catch (RedirectException& e) {
+ handleRedirection(deviceId, e.endPoint);
+ } catch (const IoTDBConnectionException &e) {
+ if (enableRedirection && deviceIdToEndpoint.find(deviceId) !=
deviceIdToEndpoint.end()) {
+ deviceIdToEndpoint.erase(deviceId);
+ try {
+
defaultSessionConnection->getSessionClient()->insertStringRecord(respStatus,
req);
+ } catch (RedirectException& e) {
+ }
+ } else {
+ throw IoTDBConnectionException(e.what());
+ }
} catch (const TTransportException &e) {
log_debug(e.what());
throw IoTDBConnectionException(e.what());
@@ -1836,6 +1907,38 @@ int64_t Session::getSessionId() {
return sessionId;
}
+shared_ptr<SessionConnection> Session::getQuerySessionConnection() {
+ auto endPoint = nodesSupplier->getQueryEndPoint();
+ if (!endPoint.has_value() || endPointToSessionConnection.empty()) {
+ return defaultSessionConnection;
+ }
+
+ auto it = endPointToSessionConnection.find(endPoint.value());
+ if (it != endPointToSessionConnection.end()) {
+ return it->second;
+ }
+
+ shared_ptr<SessionConnection> newConnection;
+ try {
+ newConnection = make_shared<SessionConnection>(this, endPoint.value(),
zoneId, nodesSupplier,
+ 60, 500, sqlDialect, database);
+ endPointToSessionConnection.emplace(endPoint.value(), newConnection);
+ return newConnection;
+ } catch (exception &e) {
+ log_debug("Session::getQuerySessionConnection() exception: " +
e.what());
+ return newConnection;
+ }
+}
+
+shared_ptr<SessionConnection> Session::getSessionConnection(std::string
deviceId) {
+ if (!enableRedirection ||
+ deviceIdToEndpoint.find(deviceId) == deviceIdToEndpoint.end() ||
+ endPointToSessionConnection.find(deviceIdToEndpoint[deviceId]) ==
endPointToSessionConnection.end()) {
+ return defaultSessionConnection;
+ }
+ return
endPointToSessionConnection.find(deviceIdToEndpoint[deviceId])->second;
+}
+
string Session::getTimeZone() {
if (!zoneId.empty()) {
return zoneId;
@@ -1908,6 +2011,70 @@ unique_ptr<SessionDataSet>
Session::executeQueryStatement(const string &sql, int
statementId, client, sessionId, queryDataSet));
}
+void Session::handleQueryRedirection(TEndPoint endPoint) {
+ if (!enableRedirection) return;
+ shared_ptr<SessionConnection> newConnection;
+ auto it = endPointToSessionConnection.find(endPoint);
+ if (it != endPointToSessionConnection.end()) {
+ newConnection = it->second;
+ } else {
+ try {
+ newConnection = make_shared<SessionConnection>(this, endPoint,
zoneId, nodesSupplier,
+ 60, 500, sqlDialect, database);
+
+ endPointToSessionConnection.emplace(endPoint, newConnection);
+ } catch (exception &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ }
+ defaultSessionConnection = newConnection;
+}
+
+void Session::handleRedirection(const std::string& deviceId, TEndPoint
endPoint) {
+ if (!enableRedirection) return;
+ if (endPoint.ip == "127.0.0.1") return;
+ deviceIdToEndpoint[deviceId] = endPoint;
+
+ shared_ptr<SessionConnection> newConnection;
+ auto it = endPointToSessionConnection.find(endPoint);
+ if (it != endPointToSessionConnection.end()) {
+ newConnection = it->second;
+ } else {
+ try {
+ newConnection = make_shared<SessionConnection>(this, endPoint,
zoneId, nodesSupplier,
+ 60, 500, sqlDialect, database);
+ endPointToSessionConnection.emplace(endPoint, newConnection);
+ } catch (exception &e) {
+ deviceIdToEndpoint.erase(deviceId);
+ throw IoTDBConnectionException(e.what());
+ }
+ }
+}
+
+std::unique_ptr<SessionDataSet>
Session::executeQueryStatementMayRedirect(const std::string &sql, int64_t
timeoutInMs) {
+ auto sessionConnection = getQuerySessionConnection();
+ if (!sessionConnection) {
+ log_warn("Session connection not found");
+ return nullptr;
+ }
+ try {
+ return sessionConnection->executeQueryStatement(sql, timeoutInMs);
+ } catch (RedirectException& e) {
+ log_warn("Session connection redirect exception: " + e.what());
+ handleQueryRedirection(e.endPoint);
+ try {
+ return defaultSessionConnection->executeQueryStatement(sql,
timeoutInMs);
+ } catch (exception& e) {
+ log_error("Exception while executing redirected query statement:
%s", e.what());
+ throw ExecutionException(e.what());
+ }
+ } catch (exception& e) {
+ log_error("Exception while executing query statement: %s", e.what());
+ throw e;
+ }
+
+}
+
void Session::executeNonQueryStatement(const string &sql) {
TSExecuteStatementReq req;
req.__set_sessionId(sessionId);
diff --git a/iotdb-client/client-cpp/src/main/Session.h
b/iotdb-client/client-cpp/src/main/Session.h
index 32da7839709..b795055ab8d 100644
--- a/iotdb-client/client-cpp/src/main/Session.h
+++ b/iotdb-client/client-cpp/src/main/Session.h
@@ -41,7 +41,9 @@
#include <thrift/transport/TTransportException.h>
#include <thrift/transport/TBufferTransports.h>
#include "IClientRPCService.h"
+#include "NodesSupplier.h"
#include "AbstractSessionBuilder.h"
+#include "SessionConnection.h"
//== For compatible with Windows OS ==
#ifndef LONG_LONG_MIN
@@ -73,7 +75,6 @@ extern LogLevelType LOG_LEVEL;
#define log_warn(fmt,...) do {if(LOG_LEVEL <= LEVEL_WARN) {string
s=string("[WARN] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__,
__LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
#define log_error(fmt,...) do {if(LOG_LEVEL <= LEVEL_ERROR) {string
s=string("[ERROR] %s:%d (%s) - ") + fmt + "\n"; printf(s.c_str(), __FILE__,
__LINE__, __FUNCTION__, ##__VA_ARGS__);}} while(0)
-
class IoTDBException : public std::exception {
public:
IoTDBException() {}
@@ -126,6 +127,25 @@ public:
std::vector<TSStatus> statusList;
};
+class RedirectException : public IoTDBException {
+public:
+ RedirectException() {}
+
+ explicit RedirectException(const char *m) : IoTDBException(m) {}
+
+ explicit RedirectException(const std::string &m) : IoTDBException(m) {}
+
+ RedirectException(const std::string &m, const TEndPoint& endPoint) :
IoTDBException(m), endPoint(endPoint) {}
+
+ RedirectException(const std::string &m, const map<string, TEndPoint>&
deviceEndPointMap) : IoTDBException(m), deviceEndPointMap(deviceEndPointMap) {}
+
+ RedirectException(const std::string &m, const vector<TEndPoint>
endPointList) : IoTDBException(m), endPointList(endPointList) {}
+
+ TEndPoint endPoint;
+ map<string, TEndPoint> deviceEndPointMap;
+ vector<TEndPoint> endPointList;
+};
+
class UnSupportedDataTypeException : public IoTDBException {
public:
UnSupportedDataTypeException() {}
@@ -262,6 +282,8 @@ public:
static void verifySuccess(const TSStatus &status);
+ static void verifySuccessWithRedirection(const TSStatus &status);
+
static void verifySuccess(const std::vector<TSStatus> &statuses);
static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
@@ -685,6 +707,11 @@ public:
}
}
+ void addTimestamp(size_t rowIndex, int64_t timestamp) {
+ timestamps[rowIndex] = timestamp;
+ rowSize = max(rowSize, rowIndex + 1);
+ }
+
template<typename T>
void addValue(size_t schemaId, size_t rowIndex, const T& value) {
if (schemaId >= schemas.size()) {
@@ -1098,8 +1125,31 @@ private:
Version::Version version;
std::string sqlDialect = "tree"; // default sql dialect
std::string database;
+ bool enableAutoFetch = true;
+ bool enableRedirection = true;
+ std::shared_ptr<INodesSupplier> nodesSupplier;
+ friend class SessionConnection;
+ std::shared_ptr<SessionConnection> defaultSessionConnection;
+
+ TEndPoint defaultEndPoint;
+
+ struct TEndPointHash {
+ size_t operator()(const TEndPoint& endpoint) const {
+ return std::hash<std::string>()(endpoint.ip) ^
std::hash<int>()(endpoint.port);
+ }
+ };
+ struct TEndPointEqual {
+ bool operator()(const TEndPoint& lhs, const TEndPoint& rhs) const {
+ return lhs.ip == rhs.ip && lhs.port == rhs.port;
+ }
+ };
+ using EndPointSessionMap = std::unordered_map<TEndPoint,
shared_ptr<SessionConnection>, TEndPointHash, TEndPointEqual>;
+ EndPointSessionMap endPointToSessionConnection;
+ std::unordered_map<std::string, TEndPoint> deviceIdToEndpoint;
private:
+ void removeBrokenSessionConnection(shared_ptr<SessionConnection>
sessionConnection);
+
static bool checkSorted(const Tablet &tablet);
static bool checkSorted(const std::vector<int64_t> ×);
@@ -1127,12 +1177,15 @@ private:
std::string getVersionString(Version::Version version);
void initZoneId();
+ void initNodesSupplier();
+ void initDefaultSessionConnection();
public:
Session(const std::string &host, int rpcPort) : username("root"),
password("root"), version(Version::V_1_0) {
this->host = host;
this->rpcPort = rpcPort;
initZoneId();
+ initNodesSupplier();
}
Session(const std::string &host, int rpcPort, const std::string &username,
const std::string &password)
@@ -1143,6 +1196,7 @@ public:
this->password = password;
this->version = Version::V_1_0;
initZoneId();
+ initNodesSupplier();
}
Session(const std::string &host, int rpcPort, const std::string &username,
const std::string &password,
@@ -1155,6 +1209,7 @@ public:
this->fetchSize = fetchSize;
this->version = Version::V_1_0;
initZoneId();
+ initNodesSupplier();
}
Session(const std::string &host, const std::string &rpcPort, const
std::string &username = "user",
@@ -1167,6 +1222,7 @@ public:
this->fetchSize = fetchSize;
this->version = Version::V_1_0;
initZoneId();
+ initNodesSupplier();
}
Session(AbstractSessionBuilder* builder) {
@@ -1179,7 +1235,10 @@ public:
this->version = Version::V_1_0;
this->sqlDialect = builder->sqlDialect;
this->database = builder->database;
+ this->enableAutoFetch = builder->enableAutoFetch;
+ this->enableRedirection = builder->enableRedirections;
initZoneId();
+ initNodesSupplier();
}
~Session();
@@ -1202,6 +1261,10 @@ public:
int64_t getSessionId();
+ shared_ptr<SessionConnection> getQuerySessionConnection();
+
+ shared_ptr<SessionConnection> getSessionConnection(std::string deviceId);
+
void open();
void open(bool enableRPCCompression);
@@ -1214,6 +1277,10 @@ public:
std::string getTimeZone();
+ void handleQueryRedirection(TEndPoint endPoint);
+
+ void handleRedirection(const std::string& deviceId, TEndPoint endPoint);
+
void insertRecord(const std::string &deviceId, int64_t time, const
std::vector<std::string> &measurements,
const std::vector<std::string> &values);
@@ -1359,6 +1426,8 @@ public:
std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string
&sql, int64_t timeoutInMs) ;
+ std::unique_ptr<SessionDataSet> executeQueryStatementMayRedirect(const
std::string &sql, int64_t timeoutInMs);
+
void executeNonQueryStatement(const std::string &sql);
std::unique_ptr<SessionDataSet> executeRawDataQuery(const
std::vector<std::string> &paths, int64_t startTime, int64_t endTime);
diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.cpp
b/iotdb-client/client-cpp/src/main/SessionConnection.cpp
new file mode 100644
index 00000000000..f78e1ff7769
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/SessionConnection.cpp
@@ -0,0 +1,256 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "SessionConnection.h"
+#include "Session.h"
+#include "common_types.h"
+#include <thrift/protocol/TCompactProtocol.h>
+
+#include <utility>
+
+using namespace apache::thrift;
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+
+SessionConnection::SessionConnection(Session* session_ptr, const TEndPoint&
endpoint,
+ const std::string& zoneId,
+ std::shared_ptr<INodesSupplier> nodeSupplier,
+ int maxRetries,
+ int64_t retryInterval,
+ std::string dialect,
+ std::string db)
+ : session(session_ptr),
+ zoneId(zoneId),
+ endPoint(endpoint),
+ availableNodes(std::move(nodeSupplier)),
+ maxRetryCount(maxRetries),
+ retryIntervalMs(retryInterval),
+ sqlDialect(std::move(dialect)),
+ database(std::move(db)) {
+ this->zoneId = zoneId.empty() ? getSystemDefaultZoneId() : zoneId;
+ endPointList.push_back(endpoint);
+ init(endPoint);
+}
+
+void SessionConnection::close() {
+ bool needThrowException = false;
+ string errMsg;
+ session = nullptr;
+ try {
+ TSCloseSessionReq req;
+ req.__set_sessionId(sessionId);
+ TSStatus tsStatus;
+ client->closeSession(tsStatus, req);
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ } catch (const exception &e) {
+ log_debug(e.what());
+ errMsg = errMsg + "Session::close() client->closeSession() error,
maybe remote server is down. " + e.what() + "\n" ;
+ needThrowException = true;
+ }
+
+ try {
+ if (transport->isOpen()) {
+ transport->close();
+ }
+ }
+ catch (const exception &e) {
+ log_debug(e.what());
+ errMsg = errMsg + "Session::close() transport->close() error. " +
e.what() + "\n" ;
+ needThrowException = true;
+ }
+
+ if (needThrowException) {
+ throw IoTDBException(errMsg);
+ }
+}
+
+SessionConnection::~SessionConnection() {
+ try {
+ close();
+ } catch (const exception &e) {
+ log_debug(e.what());
+ }
+}
+
+void SessionConnection::init(const TEndPoint& endpoint) {
+ shared_ptr<TSocket> socket(new TSocket(endpoint.ip, endpoint.port));
+ transport = std::make_shared<TFramedTransport>(socket);
+ socket->setConnTimeout(connectionTimeoutInMs);
+ if (!transport->isOpen()) {
+ try {
+ transport->open();
+ }
+ catch (TTransportException &e) {
+ log_debug(e.what());
+ throw IoTDBConnectionException(e.what());
+ }
+ }
+ if (enableRPCCompression) {
+ shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(transport));
+ client = std::make_shared<IClientRPCServiceClient>(protocol);
+ } else {
+ shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+ client = std::make_shared<IClientRPCServiceClient>(protocol);
+ }
+
+ std::map<std::string, std::string> configuration;
+ configuration["version"] = session->getVersionString(session->version);
+ configuration["sql_dialect"] = sqlDialect;
+ if (database != "") {
+ configuration["db"] = database;
+ }
+ TSOpenSessionReq openReq;
+ openReq.__set_username(session->username);
+ openReq.__set_password(session->password);
+ openReq.__set_zoneId(zoneId);
+ openReq.__set_configuration(configuration);
+ try {
+ TSOpenSessionResp openResp;
+ client->openSession(openResp, openReq);
+ RpcUtils::verifySuccess(openResp.status);
+ if (session->protocolVersion != openResp.serverProtocolVersion) {
+ if (openResp.serverProtocolVersion == 0) {// less than 0.10
+ throw logic_error(string("Protocol not supported, Client
version is ") +
+ to_string(session->protocolVersion) +
+ ", but Server version is " +
to_string(openResp.serverProtocolVersion));
+ }
+ }
+
+ sessionId = openResp.sessionId;
+ statementId = client->requestStatementId(sessionId);
+
+ if (!zoneId.empty()) {
+ setTimeZone(zoneId);
+ }
+ } catch (const TTransportException &e) {
+ log_debug(e.what());
+ transport->close();
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ log_debug(e.what());
+ transport->close();
+ throw;
+ } catch (const exception &e) {
+ log_debug(e.what());
+ transport->close();
+ throw;
+ }
+}
+
+std::unique_ptr<SessionDataSet> SessionConnection::executeQueryStatement(const
std::string& sql, int64_t timeoutInMs) {
+ TSExecuteStatementReq req;
+ req.__set_sessionId(sessionId);
+ req.__set_statementId(statementId);
+ req.__set_statement(sql);
+ req.__set_timeout(timeoutInMs);
+ req.__set_enableRedirectQuery(true);
+ TSExecuteStatementResp resp;
+ try {
+ client->executeStatement(resp, req);
+ RpcUtils::verifySuccessWithRedirection(resp.status);
+ } catch (const TException &e) {
+ log_debug(e.what());
+ if (reconnect()) {
+ try {
+ req.__set_sessionId(sessionId);
+ req.__set_statementId(statementId);
+ client->executeStatement(resp, req);
+ } catch (TException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ } else {
+ throw IoTDBConnectionException(e.what());
+ }
+ }
+ std::shared_ptr<TSQueryDataSet> queryDataSet(new
TSQueryDataSet(resp.queryDataSet));
+ return std::unique_ptr<SessionDataSet>(new SessionDataSet(
+ sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap,
resp.ignoreTimeStamp, resp.queryId,
+ statementId, client, sessionId, queryDataSet));
+}
+
+const TEndPoint& SessionConnection::getEndPoint() {
+ return endPoint;
+}
+
+void SessionConnection::setTimeZone(const std::string& newZoneId) {
+ TSSetTimeZoneReq req;
+ req.__set_sessionId(sessionId);
+ req.__set_timeZone(newZoneId);
+
+ try {
+ TSStatus tsStatus;
+ client->setTimeZone(tsStatus, req);
+ zoneId = newZoneId;
+ } catch (const TException& e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+std::string SessionConnection::getSystemDefaultZoneId() {
+ time_t ts = 0;
+ struct tm tmv{};
+#if defined(_WIN64) || defined (WIN32) || defined (_WIN32)
+ localtime_s(&tmv, &ts);
+#else
+ localtime_r(&ts, &tmv);
+#endif
+ char zoneStr[32];
+ strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
+ return zoneStr;
+}
+
+bool SessionConnection::reconnect() {
+ bool reconnect = false;
+ for (int i = 1; i <= 3; i++) {
+ if (transport != nullptr) {
+ transport->close();
+ endPointList = std::move(availableNodes->getEndPointList());
+ int currHostIndex = rand() % endPointList.size();
+ int tryHostNum = 0;
+ for (int j = currHostIndex; j < endPointList.size(); j++) {
+ if (tryHostNum == endPointList.size()) {
+ break;
+ }
+ this->endPoint = endPointList[j];
+ if (j == endPointList.size() - 1) {
+ j = -1;
+ }
+ tryHostNum++;
+ try {
+ init(this->endPoint);
+ reconnect = true;
+ } catch (const IoTDBConnectionException &e) {
+ log_warn("The current node may have been down, connection
exception: %s", e.what());
+ continue;
+ } catch (exception &e) {
+ log_warn("login in failed, because %s", e.what());
+ }
+ break;
+ }
+ }
+ if (reconnect) {
+ session->removeBrokenSessionConnection(shared_from_this());
+ session->defaultEndPoint = this->endPoint;
+ session->defaultSessionConnection = shared_from_this();
+
session->endPointToSessionConnection.insert(make_pair(this->endPoint,
shared_from_this()));
+ }
+ }
+ return reconnect;
+}
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/SessionConnection.h
b/iotdb-client/client-cpp/src/main/SessionConnection.h
new file mode 100644
index 00000000000..8e8a6d17ac4
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/SessionConnection.h
@@ -0,0 +1,81 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef IOTDB_SESSIONCONNECTION_H
+#define IOTDB_SESSIONCONNECTION_H
+
+#include <memory>
+#include <vector>
+#include <string>
+#include <functional>
+#include <thrift/transport/TTransport.h>
+#include "IClientRPCService.h"
+#include "common_types.h"
+#include "NodesSupplier.h"
+
+class SessionDataSet;
+class Session;
+
+class SessionConnection : std::enable_shared_from_this<SessionConnection> {
+public:
+ SessionConnection(Session* session_ptr, const TEndPoint& endpoint,
+ const std::string& zoneId,
+ std::shared_ptr<INodesSupplier> nodeSupplier,
+ int maxRetries = 60,
+ int64_t retryInterval = 500,
+ std::string dialect = "tree",
+ std::string db = "");
+
+ ~SessionConnection();
+
+ void setTimeZone(const std::string& newZoneId);
+
+
+ const TEndPoint& getEndPoint();
+
+ void init(const TEndPoint& endpoint);
+
+ std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string&
sql, int64_t timeoutInMs = -1);
+
+ std::shared_ptr<IClientRPCServiceClient> getSessionClient() {
+ return client;
+ }
+
+private:
+ void close();
+ std::string getSystemDefaultZoneId();
+ bool reconnect();
+
+ std::shared_ptr<apache::thrift::transport::TTransport> transport;
+ std::shared_ptr<IClientRPCServiceClient> client;
+ Session* session;
+ int64_t sessionId;
+ int64_t statementId;
+ int64_t connectionTimeoutInMs;
+ bool enableRPCCompression = false;
+ std::string zoneId;
+ TEndPoint endPoint;
+ std::vector<TEndPoint> endPointList;
+ std::shared_ptr<INodesSupplier> availableNodes;
+ int maxRetryCount;
+ int64_t retryIntervalMs;
+ std::string sqlDialect;
+ std::string database;
+};
+
+#endif
diff --git a/iotdb-client/client-cpp/src/main/ThriftConnection.cpp
b/iotdb-client/client-cpp/src/main/ThriftConnection.cpp
new file mode 100644
index 00000000000..23c2890c34a
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/ThriftConnection.cpp
@@ -0,0 +1,158 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ThriftConnection.h"
+#include <ctime>
+#include <iostream>
+#include <thrift/transport/TSocket.h>
+
+#include "Session.h"
+
+const int ThriftConnection::THRIFT_DEFAULT_BUFFER_SIZE = 4096;
+const int ThriftConnection::THRIFT_MAX_FRAME_SIZE = 1048576;
+const int ThriftConnection::CONNECTION_TIMEOUT_IN_MS = 1000;
+
+ThriftConnection::ThriftConnection(const TEndPoint& endPoint,
+ int thriftDefaultBufferSize,
+ int thriftMaxFrameSize,
+ int connectionTimeoutInMs)
+ : endPoint(endPoint),
+ thriftDefaultBufferSize(thriftDefaultBufferSize),
+ thriftMaxFrameSize(thriftMaxFrameSize),
+ connectionTimeoutInMs(connectionTimeoutInMs) {}
+
+ThriftConnection::~ThriftConnection() = default;
+
+void ThriftConnection::initZoneId() {
+ if (!zoneId.empty()) {
+ return;
+ }
+
+ time_t ts = 0;
+ struct tm tmv{};
+#if defined(_WIN64) || defined (WIN32) || defined (_WIN32)
+ localtime_s(&tmv, &ts);
+#else
+ localtime_r(&ts, &tmv);
+#endif
+
+ char zoneStr[32];
+ strftime(zoneStr, sizeof(zoneStr), "%z", &tmv);
+ zoneId = zoneStr;
+}
+
+void ThriftConnection::init(const std::string& username,
+ const std::string& password,
+ bool enableRPCCompression,
+ const std::string& zoneId,
+ const std::string& version) {
+ std::shared_ptr<TSocket> socket(new TSocket(endPoint.ip, endPoint.port));
+ socket->setConnTimeout(connectionTimeoutInMs);
+ transport = std::make_shared<TFramedTransport>(socket);
+ if (!transport->isOpen()) {
+ try {
+ transport->open();
+ }
+ catch (TTransportException &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+ }
+ if (zoneId.empty()) {
+ initZoneId();
+ } else {
+ this->zoneId = zoneId;
+ }
+
+ if (enableRPCCompression) {
+ std::shared_ptr<TCompactProtocol> protocol(new
TCompactProtocol(transport));
+ client = std::make_shared<IClientRPCServiceClient>(protocol);
+ } else {
+ std::shared_ptr<TBinaryProtocol> protocol(new
TBinaryProtocol(transport));
+ client = std::make_shared<IClientRPCServiceClient>(protocol);
+ }
+
+ std::map<std::string, std::string> configuration;
+ configuration["version"] = version;
+ TSOpenSessionReq openReq;
+ openReq.__set_username(username);
+ openReq.__set_password(password);
+ openReq.__set_zoneId(this->zoneId);
+ openReq.__set_configuration(configuration);
+ try {
+ TSOpenSessionResp openResp;
+ client->openSession(openResp, openReq);
+ RpcUtils::verifySuccess(openResp.status);
+ sessionId = openResp.sessionId;
+ statementId = client->requestStatementId(sessionId);
+ } catch (const TTransportException &e) {
+ transport->close();
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ transport->close();
+ throw IoTDBException(e.what());
+ } catch (const std::exception &e) {
+ transport->close();
+ throw IoTDBException(e.what());
+ }
+}
+
+std::unique_ptr<SessionDataSet> ThriftConnection::executeQueryStatement(const
std::string& sql, int64_t timeoutInMs) {
+ TSExecuteStatementReq req;
+ req.__set_sessionId(sessionId);
+ req.__set_statementId(statementId);
+ req.__set_statement(sql);
+ req.__set_timeout(timeoutInMs);
+ TSExecuteStatementResp resp;
+ try {
+ client->executeStatement(resp, req);
+ RpcUtils::verifySuccess(resp.status);
+ } catch (const TTransportException &e) {
+ throw IoTDBConnectionException(e.what());
+ } catch (const IoTDBException &e) {
+ throw IoTDBConnectionException(e.what());
+ } catch (const std::exception &e) {
+ throw IoTDBException(e.what());
+ }
+ std::shared_ptr<TSQueryDataSet> queryDataSet(new
TSQueryDataSet(resp.queryDataSet));
+ return std::unique_ptr<SessionDataSet>(new SessionDataSet(
+ sql, resp.columns, resp.dataTypeList, resp.columnNameIndexMap,
resp.ignoreTimeStamp, resp.queryId,
+ statementId, client, sessionId, queryDataSet));
+}
+
+void ThriftConnection::close() {
+ try {
+ if (client) {
+ TSCloseSessionReq req;
+ req.__set_sessionId(sessionId);
+ TSStatus tsStatus;
+ client->closeSession(tsStatus, req);
+ }
+ } catch (const TTransportException &e) {
+ throw IoTDBConnectionException(e.what());
+ } catch (const std::exception &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+
+ try {
+ if (transport->isOpen()) {
+ transport->close();
+ }
+ } catch (const std::exception &e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
\ No newline at end of file
diff --git a/iotdb-client/client-cpp/src/main/ThriftConnection.h
b/iotdb-client/client-cpp/src/main/ThriftConnection.h
new file mode 100644
index 00000000000..2810036078c
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/ThriftConnection.h
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef IOTDB_THRIFTCONNECTION_H
+#define IOTDB_THRIFTCONNECTION_H
+
+#include <memory>
+#include <thrift/transport/TBufferTransports.h>
+#include "IClientRPCService.h"
+
+class SessionDataSet;
+
+class ThriftConnection {
+public:
+ static const int THRIFT_DEFAULT_BUFFER_SIZE;
+ static const int THRIFT_MAX_FRAME_SIZE;
+ static const int CONNECTION_TIMEOUT_IN_MS;
+
+ explicit ThriftConnection(const TEndPoint& endPoint,
+ int thriftDefaultBufferSize = THRIFT_DEFAULT_BUFFER_SIZE,
+ int thriftMaxFrameSize = THRIFT_MAX_FRAME_SIZE,
+ int connectionTimeoutInMs = CONNECTION_TIMEOUT_IN_MS);
+
+ ~ThriftConnection();
+
+ void init(const std::string& username,
+ const std::string& password,
+ bool enableRPCCompression = false,
+ const std::string& zoneId = std::string(),
+ const std::string& version = "V_1_0");
+
+ std::unique_ptr<SessionDataSet> executeQueryStatement(const std::string&
sql, int64_t timeoutInMs = -1);
+
+ void close();
+
+private:
+ TEndPoint endPoint;
+
+ int thriftDefaultBufferSize;
+ int thriftMaxFrameSize;
+ int connectionTimeoutInMs;
+
+ std::shared_ptr<apache::thrift::transport::TTransport> transport;
+ std::shared_ptr<IClientRPCServiceClient> client;
+ int64_t sessionId{};
+ int64_t statementId{};
+ std::string zoneId;
+ int timeFactor{};
+
+ void initZoneId();
+};
+
+#endif