This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d9fdf0c6655 C++ client: add thread-safe SessionPool, enable RPC
compression, and harden buffers (#17800)
d9fdf0c6655 is described below
commit d9fdf0c66557c782c74e6e796e010416b38c9087
Author: ZhangHongYin <[email protected]>
AuthorDate: Tue Jun 2 15:20:05 2026 +0800
C++ client: add thread-safe SessionPool, enable RPC compression, and harden
buffers (#17800)
* Wire RPC compression flag through Session to its connections
The enableRPCCompression option set via Session::open(bool) or the session
builder was never propagated to SessionConnection, whose flag was hardcoded
to false, so the compact Thrift protocol never took effect. Thread the flag
from the builder/open() into both the data SessionConnection and the
node-discovery NodesSupplier client so compression actually applies.
* Use snprintf for Tablet bounds-check error messages
Tablet::addValue and the OBJECT-value overload formatted out-of-range
diagnostics with sprintf into a fixed 100-byte stack buffer, risking an
overflow. Switch to snprintf bounded by sizeof(buffer) and cast the size_t
arguments to long to match the %ld format.
* Append big-endian bytes in MyStringBuffer instead of overwriting
On big-endian hosts MyStringBuffer::putOrderedByte used str.assign, which
replaced the whole buffer with each numeric write and corrupted previously
serialized content. Use str.append so bytes accumulate, matching the
little-endian path.
* Add thread-safe SessionPool to the C++ client
Introduce SessionPool and SessionPoolBuilder so multiple threads can share a
bounded set of connections without external locking. A single Session is not
safe to use concurrently, so the pool lends each Session to one borrower at
a
time via an RAII PooledSession handle and reclaims it on scope exit.
Sessions
are created outside the lock to avoid blocking other borrowers during the
handshake, and getSession() blocks up to a configurable timeout when the
pool
is exhausted.
Query results are returned as a PooledSessionDataSet that keeps the Session
leased until the result set is fully read, since SessionDataSet lazily
fetches
further blocks over the same connection. Connections that raise
IoTDBConnectionException are evicted rather than recycled. Add integration
tests covering basic borrow/insert/query, concurrent writers, and
exhaustion-timeout behavior.
* Reject zero maxSize in SessionPool instead of clamping to 1
Address review feedback: maxSize is size_t, so a non-positive check reduces
to == 0 (and "<= 0" would be a tautological-comparison warning under -Wall).
Rather than silently clamping an invalid 0 to 1, fail fast by throwing
IoTDBException so the misuse surfaces at construction time.
* Tolerate missing timeseries in SessionPool test cleanup
The pre-test cleanup deleted root.test.pool.* timeseries unconditionally,
which threw 508 (does not exist) on a fresh database and failed the new
[sessionPool] cases. Ignore that error since the cleanup is best-effort.
* Revert "Wire RPC compression flag through Session to its connections"
This reverts commit 2f35cc51f7.
Honoring the compression flag makes the client negotiate the compact Thrift
protocol, which the binary-only IoTDB server used by the C++ integration
tests cannot speak, breaking the pre-existing
ts_session_open_with_compression
smoke test (it had only passed because the flag was a no-op). Compression
needs a compact-protocol-enabled test server, so it will be reintroduced in
a
dedicated PR with the matching server-side test support. SessionPool keeps
its
compression option for forward compatibility; it is currently a no-op, as
the
rest of the client has always been.
* fix format
* Discard SessionPool session if pool closed during construction
Address review feedback: acquire() releases the lock while building a new
connection, so a concurrent close() could set closed_ after the slot was
reserved, and the freshly opened session would still be handed out from a
closed pool. Re-check closed_ under the lock after construction; if the pool
was closed meanwhile, release the slot, tear the session down outside the
lock, and throw instead of returning it.
---
iotdb-client/client-cpp/src/main/Common.cpp | 2 +-
iotdb-client/client-cpp/src/main/Session.h | 26 +-
iotdb-client/client-cpp/src/main/SessionPool.cpp | 281 +++++++++++++++
iotdb-client/client-cpp/src/main/SessionPool.h | 392 +++++++++++++++++++++
iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp | 137 ++++++-
5 files changed, 824 insertions(+), 14 deletions(-)
diff --git a/iotdb-client/client-cpp/src/main/Common.cpp
b/iotdb-client/client-cpp/src/main/Common.cpp
index 38b913c2270..842150241da 100644
--- a/iotdb-client/client-cpp/src/main/Common.cpp
+++ b/iotdb-client/client-cpp/src/main/Common.cpp
@@ -418,7 +418,7 @@ const char* MyStringBuffer::getOrderedByte(size_t len) {
void MyStringBuffer::putOrderedByte(char* buf, int len) {
if (isBigEndian) {
- str.assign(buf, len);
+ str.append(buf, len);
} else {
for (int i = len - 1; i > -1; i--) {
str += buf[i];
diff --git a/iotdb-client/client-cpp/src/main/Session.h
b/iotdb-client/client-cpp/src/main/Session.h
index cedfaeba196..58bb0a7d300 100644
--- a/iotdb-client/client-cpp/src/main/Session.h
+++ b/iotdb-client/client-cpp/src/main/Session.h
@@ -31,6 +31,7 @@
#include <thread>
#include <stdexcept>
#include <cstdlib>
+#include <cstdio>
#include <future>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <thrift/protocol/TBinaryProtocol.h>
@@ -258,16 +259,17 @@ public:
template <typename T> void addValue(size_t schemaId, size_t rowIndex, const
T& value) {
if (schemaId >= schemas.size()) {
char tmpStr[100];
- sprintf(tmpStr,
- "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld,
schemas.size()=%ld.",
- schemaId, schemas.size());
+ snprintf(tmpStr, sizeof(tmpStr),
+ "Tablet::addValue(), schemaId >= schemas.size(). schemaId=%ld,
schemas.size()=%ld.",
+ (long)schemaId, (long)schemas.size());
throw std::out_of_range(tmpStr);
}
if (rowIndex >= rowSize) {
char tmpStr[100];
- sprintf(tmpStr, "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld,
rowSize.size()=%ld.",
- rowIndex, rowSize);
+ snprintf(tmpStr, sizeof(tmpStr),
+ "Tablet::addValue(), rowIndex >= rowSize. rowIndex=%ld,
rowSize.size()=%ld.",
+ (long)rowIndex, (long)rowSize);
throw std::out_of_range(tmpStr);
}
@@ -317,19 +319,19 @@ public:
// Check schemaId bounds
if (schemaId >= schemas.size()) {
char tmpStr[100];
- sprintf(tmpStr,
- "Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size().
schemaId=%ld, "
- "schemas.size()=%ld.",
- schemaId, schemas.size());
+ snprintf(tmpStr, sizeof(tmpStr),
+ "Tablet::addBinaryValueWithMeta(), schemaId >= schemas.size().
schemaId=%ld, "
+ "schemas.size()=%ld.",
+ (long)schemaId, (long)schemas.size());
throw std::out_of_range(tmpStr);
}
// Check rowIndex bounds
if (rowIndex >= rowSize) {
char tmpStr[100];
- sprintf(tmpStr,
- "Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize.
rowIndex=%ld, rowSize=%ld.",
- rowIndex, rowSize);
+ snprintf(tmpStr, sizeof(tmpStr),
+ "Tablet::addBinaryValueWithMeta(), rowIndex >= rowSize.
rowIndex=%ld, rowSize=%ld.",
+ (long)rowIndex, (long)rowSize);
throw std::out_of_range(tmpStr);
}
diff --git a/iotdb-client/client-cpp/src/main/SessionPool.cpp
b/iotdb-client/client-cpp/src/main/SessionPool.cpp
new file mode 100644
index 00000000000..a828f0ac2c6
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/SessionPool.cpp
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "SessionPool.h"
+
+void PooledSession::reset() {
+ if (session_ && pool_ != nullptr) {
+ pool_->putBack(session_, broken_);
+ }
+ pool_ = nullptr;
+ session_ = nullptr;
+ broken_ = false;
+}
+
+SessionPool::SessionPool(std::string host, int rpcPort, std::string username,
std::string password,
+ size_t maxSize)
+ : host_(std::move(host)), rpcPort_(rpcPort),
username_(std::move(username)),
+ password_(std::move(password)), maxSize_(maxSize) {
+ if (maxSize_ == 0) {
+ throw IoTDBException("SessionPool maxSize must be greater than 0.");
+ }
+}
+
+SessionPool::SessionPool(std::vector<std::string> nodeUrls, std::string
username,
+ std::string password, size_t maxSize)
+ : rpcPort_(AbstractSessionBuilder::DEFAULT_RPC_PORT),
nodeUrls_(std::move(nodeUrls)),
+ username_(std::move(username)), password_(std::move(password)),
maxSize_(maxSize) {
+ if (maxSize_ == 0) {
+ throw IoTDBException("SessionPool maxSize must be greater than 0.");
+ }
+}
+
+SessionPool::~SessionPool() {
+ try {
+ close();
+ } catch (const std::exception& e) {
+ log_debug(std::string("SessionPool::~SessionPool(), ") + e.what());
+ }
+}
+
+SessionPool& SessionPool::setFetchSize(int fetchSize) {
+ fetchSize_ = fetchSize;
+ return *this;
+}
+
+SessionPool& SessionPool::setZoneId(std::string zoneId) {
+ zoneId_ = std::move(zoneId);
+ return *this;
+}
+
+SessionPool& SessionPool::setSqlDialect(std::string sqlDialect) {
+ sqlDialect_ = std::move(sqlDialect);
+ return *this;
+}
+
+SessionPool& SessionPool::setDatabase(std::string database) {
+ database_ = std::move(database);
+ return *this;
+}
+
+SessionPool& SessionPool::setEnableRedirection(bool enable) {
+ enableRedirection_ = enable;
+ return *this;
+}
+
+SessionPool& SessionPool::setEnableAutoFetch(bool enable) {
+ enableAutoFetch_ = enable;
+ return *this;
+}
+
+SessionPool& SessionPool::setEnableRPCCompression(bool enable) {
+ enableRPCCompression_ = enable;
+ return *this;
+}
+
+SessionPool& SessionPool::setConnectTimeoutMs(int connectTimeoutMs) {
+ connectTimeoutMs_ = connectTimeoutMs;
+ return *this;
+}
+
+SessionPool& SessionPool::setWaitToGetSessionTimeoutMs(int64_t timeoutMs) {
+ waitTimeoutMs_ = timeoutMs;
+ return *this;
+}
+
+SessionPool& SessionPool::setUseSSL(bool useSSL) {
+ useSSL_ = useSSL;
+ return *this;
+}
+
+SessionPool& SessionPool::setTrustCertFilePath(std::string path) {
+ trustCertFilePath_ = std::move(path);
+ return *this;
+}
+
+std::shared_ptr<Session> SessionPool::constructNewSession() {
+ AbstractSessionBuilder builder;
+ builder.host = host_;
+ builder.rpcPort = rpcPort_;
+ builder.nodeUrls = nodeUrls_;
+ builder.username = username_;
+ builder.password = password_;
+ builder.zoneId = zoneId_;
+ builder.fetchSize = fetchSize_;
+ builder.sqlDialect = sqlDialect_;
+ builder.database = database_;
+ builder.enableAutoFetch = enableAutoFetch_;
+ builder.enableRedirections = enableRedirection_;
+ builder.enableRPCCompression = enableRPCCompression_;
+ builder.connectTimeoutMs = connectTimeoutMs_;
+ builder.useSSL = useSSL_;
+ builder.trustCertFilePath = trustCertFilePath_;
+
+ auto session = std::make_shared<Session>(&builder);
+ session->open(enableRPCCompression_, connectTimeoutMs_);
+ return session;
+}
+
+std::shared_ptr<Session> SessionPool::acquire(int64_t timeoutMs) {
+ const int64_t effectiveTimeout = timeoutMs > 0 ? timeoutMs : waitTimeoutMs_;
+ std::unique_lock<std::mutex> lock(mutex_);
+ const auto deadline =
+ std::chrono::steady_clock::now() +
std::chrono::milliseconds(effectiveTimeout);
+
+ while (true) {
+ if (closed_) {
+ throw IoTDBException("SessionPool is closed.");
+ }
+ if (!idleQueue_.empty()) {
+ auto session = idleQueue_.front();
+ idleQueue_.pop_front();
+ return session;
+ }
+ if (size_ < maxSize_) {
+ // Reserve a slot, then build the connection outside the lock so other
+ // borrowers are not blocked by network/handshake latency.
+ ++size_;
+ lock.unlock();
+ std::shared_ptr<Session> session;
+ try {
+ session = constructNewSession();
+ } catch (...) {
+ lock.lock();
+ --size_;
+ cv_.notify_one();
+ throw;
+ }
+ lock.lock();
+ if (closed_) {
+ // The pool was closed while this session was being built; do not hand
it
+ // out. Release its slot and let it be torn down outside the lock.
+ --size_;
+ lock.unlock();
+ throw IoTDBException("SessionPool is closed.");
+ }
+ return session;
+ }
+
+ // Pool exhausted: wait for a Session to be returned.
+ if (effectiveTimeout <= 0) {
+ cv_.wait(lock);
+ } else {
+ if (cv_.wait_until(lock, deadline) == std::cv_status::timeout &&
idleQueue_.empty() &&
+ size_ >= maxSize_ && !closed_) {
+ throw IoTDBException(
+ "Wait to get session timeout in SessionPool, maxSize=" +
std::to_string(maxSize_) +
+ ", waitTimeoutMs=" + std::to_string(effectiveTimeout) + ".");
+ }
+ }
+ }
+}
+
+void SessionPool::putBack(const std::shared_ptr<Session>& session, bool
broken) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (broken || closed_) {
+ // Drop the Session and free its slot so a healthy replacement can be
created
+ // on demand. The caller (PooledSession::reset) still holds the last
reference
+ // and tears the connection down after we return, i.e. outside this lock.
+ --size_;
+ } else {
+ idleQueue_.push_back(session);
+ }
+ cv_.notify_one();
+}
+
+PooledSession SessionPool::getSession() {
+ return getSession(waitTimeoutMs_);
+}
+
+PooledSession SessionPool::getSession(int64_t timeoutMs) {
+ return PooledSession(this, acquire(timeoutMs));
+}
+
+void SessionPool::insertTablet(Tablet& tablet, bool sorted) {
+ execute([&](Session& s) { s.insertTablet(tablet, sorted); });
+}
+
+void SessionPool::insertAlignedTablet(Tablet& tablet, bool sorted) {
+ execute([&](Session& s) { s.insertAlignedTablet(tablet, sorted); });
+}
+
+void SessionPool::insertTablets(std::unordered_map<std::string, Tablet*>&
tablets, bool sorted) {
+ execute([&](Session& s) { s.insertTablets(tablets, sorted); });
+}
+
+void SessionPool::insertRecord(const std::string& deviceId, int64_t time,
+ const std::vector<std::string>& measurements,
+ const std::vector<std::string>& values) {
+ execute([&](Session& s) { s.insertRecord(deviceId, time, measurements,
values); });
+}
+
+void SessionPool::insertRecords(const std::vector<std::string>& deviceIds,
+ const std::vector<int64_t>& times,
+ const std::vector<std::vector<std::string>>&
measurementsList,
+ const std::vector<std::vector<std::string>>&
valuesList) {
+ execute([&](Session& s) { s.insertRecords(deviceIds, times,
measurementsList, valuesList); });
+}
+
+void SessionPool::executeNonQueryStatement(const std::string& sql) {
+ execute([&](Session& s) { s.executeNonQueryStatement(sql); });
+}
+
+PooledSessionDataSet SessionPool::executeQueryStatement(const std::string&
sql) {
+ PooledSession lease = getSession();
+ try {
+ auto dataSet = lease->executeQueryStatement(sql);
+ return PooledSessionDataSet(std::move(lease), std::move(dataSet));
+ } catch (const IoTDBConnectionException&) {
+ lease.markBroken();
+ throw;
+ }
+}
+
+PooledSessionDataSet SessionPool::executeQueryStatement(const std::string& sql,
+ int64_t timeoutInMs) {
+ PooledSession lease = getSession();
+ try {
+ auto dataSet = lease->executeQueryStatement(sql, timeoutInMs);
+ return PooledSessionDataSet(std::move(lease), std::move(dataSet));
+ } catch (const IoTDBConnectionException&) {
+ lease.markBroken();
+ throw;
+ }
+}
+
+void SessionPool::close() {
+ std::deque<std::shared_ptr<Session>> toClose;
+ {
+ std::lock_guard<std::mutex> lock(mutex_);
+ if (closed_) {
+ return;
+ }
+ closed_ = true;
+ toClose.swap(idleQueue_);
+ size_ -= toClose.size();
+ }
+ cv_.notify_all();
+ // Sessions destructed here (outside the lock) close their connections.
+ toClose.clear();
+}
+
+size_t SessionPool::activeCount() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ return size_ - idleQueue_.size();
+}
diff --git a/iotdb-client/client-cpp/src/main/SessionPool.h
b/iotdb-client/client-cpp/src/main/SessionPool.h
new file mode 100644
index 00000000000..4483dab0c51
--- /dev/null
+++ b/iotdb-client/client-cpp/src/main/SessionPool.h
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef IOTDB_SESSIONPOOL_H
+#define IOTDB_SESSIONPOOL_H
+
+#include <chrono>
+#include <condition_variable>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "Session.h"
+
+/*
+ * A thread-safe pool of opened Session objects.
+ *
+ * A Session is NOT safe to use from multiple threads concurrently. SessionPool
+ * solves this by lending each Session to exactly one borrower at a time and
+ * reclaiming it afterwards, so many application threads can share a bounded
set
+ * of physical connections without external locking.
+ *
+ * Two usage styles are supported:
+ *
+ * 1. RAII lease (recommended for arbitrary calls):
+ * {
+ * PooledSession s = pool.getSession(); // blocks up to the timeout
+ * s->insertTablet(tablet); // call any Session method
+ * } // automatically returned
here
+ *
+ * 2. Convenience wrappers / generic execute() (recommended for hot paths):
+ * pool.insertTablet(tablet);
+ * pool.execute([&](Session& s) { s.insertRecord(...); });
+ *
+ * Both styles evict a Session from the pool (instead of recycling it) when the
+ * operation throws IoTDBConnectionException, so a dead connection is never
+ * handed to the next borrower; a fresh one is created lazily on demand.
+ *
+ * Lifetime: a PooledSession returns its Session to the owning SessionPool when
+ * destroyed, so every PooledSession must not outlive the pool it came from.
+ */
+class SessionPool;
+
+class PooledSession {
+public:
+ PooledSession() noexcept : pool_(nullptr), session_(nullptr), broken_(false)
{}
+
+ PooledSession(SessionPool* pool, std::shared_ptr<Session> session)
+ : pool_(pool), session_(std::move(session)), broken_(false) {}
+
+ // Non-copyable: a leased Session is owned by exactly one borrower.
+ PooledSession(const PooledSession&) = delete;
+ PooledSession& operator=(const PooledSession&) = delete;
+
+ PooledSession(PooledSession&& other) noexcept
+ : pool_(other.pool_), session_(std::move(other.session_)),
broken_(other.broken_) {
+ other.pool_ = nullptr;
+ other.session_ = nullptr;
+ other.broken_ = false;
+ }
+
+ PooledSession& operator=(PooledSession&& other) noexcept {
+ if (this != &other) {
+ reset();
+ pool_ = other.pool_;
+ session_ = std::move(other.session_);
+ broken_ = other.broken_;
+ other.pool_ = nullptr;
+ other.session_ = nullptr;
+ other.broken_ = false;
+ }
+ return *this;
+ }
+
+ ~PooledSession() {
+ reset();
+ }
+
+ Session* operator->() const {
+ return session_.get();
+ }
+
+ Session& operator*() const {
+ return *session_;
+ }
+
+ explicit operator bool() const {
+ return static_cast<bool>(session_);
+ }
+
+ // Mark the underlying connection as unusable so it is discarded (not
recycled)
+ // when this lease is returned. Call this if you caught a connection error.
+ void markBroken() {
+ broken_ = true;
+ }
+
+ // Eagerly return the Session to the pool before scope exit.
+ void release() {
+ reset();
+ }
+
+private:
+ void reset();
+
+ SessionPool* pool_;
+ std::shared_ptr<Session> session_;
+ bool broken_;
+};
+
+/*
+ * Couples a query result set with the pooled Session that produced it.
+ *
+ * A SessionDataSet lazily fetches further result blocks over its Session's
+ * connection, so that Session must stay exclusively leased until iteration is
+ * finished. This wrapper holds the lease for exactly that long and returns the
+ * Session to the pool when destroyed.
+ */
+class PooledSessionDataSet {
+public:
+ PooledSessionDataSet(PooledSession session, std::unique_ptr<SessionDataSet>
dataSet)
+ : session_(std::move(session)), dataSet_(std::move(dataSet)) {}
+
+ PooledSessionDataSet(const PooledSessionDataSet&) = delete;
+ PooledSessionDataSet& operator=(const PooledSessionDataSet&) = delete;
+ PooledSessionDataSet(PooledSessionDataSet&&) noexcept = default;
+ PooledSessionDataSet& operator=(PooledSessionDataSet&&) noexcept = default;
+
+ SessionDataSet* operator->() const {
+ return dataSet_.get();
+ }
+ SessionDataSet& operator*() const {
+ return *dataSet_;
+ }
+
+private:
+ PooledSession session_;
+ std::unique_ptr<SessionDataSet> dataSet_;
+};
+
+class SessionPool {
+public:
+ static constexpr size_t DEFAULT_MAX_SIZE = 5;
+ static constexpr int64_t DEFAULT_WAIT_TIMEOUT_MS = 60 * 1000;
+
+ // Single-host constructor.
+ SessionPool(std::string host, int rpcPort, std::string username, std::string
password,
+ size_t maxSize = DEFAULT_MAX_SIZE);
+
+ // Multi-node constructor.
+ SessionPool(std::vector<std::string> nodeUrls, std::string username,
std::string password,
+ size_t maxSize = DEFAULT_MAX_SIZE);
+
+ ~SessionPool();
+
+ // Non-copyable, non-movable: the pool owns mutex/condition state.
+ SessionPool(const SessionPool&) = delete;
+ SessionPool& operator=(const SessionPool&) = delete;
+
+ // ---- configuration (apply before the first getSession()) ----
+ SessionPool& setFetchSize(int fetchSize);
+ SessionPool& setZoneId(std::string zoneId);
+ SessionPool& setSqlDialect(std::string sqlDialect);
+ SessionPool& setDatabase(std::string database);
+ SessionPool& setEnableRedirection(bool enable);
+ SessionPool& setEnableAutoFetch(bool enable);
+ SessionPool& setEnableRPCCompression(bool enable);
+ SessionPool& setConnectTimeoutMs(int connectTimeoutMs);
+ SessionPool& setWaitToGetSessionTimeoutMs(int64_t timeoutMs);
+ SessionPool& setUseSSL(bool useSSL);
+ SessionPool& setTrustCertFilePath(std::string path);
+
+ // Borrow a Session. Blocks until one is free or a new one can be created,
+ // up to timeoutMs (<= 0 means use the pool default). Throws IoTDBException
on
+ // timeout or when the pool is closed.
+ PooledSession getSession();
+ PooledSession getSession(int64_t timeoutMs);
+
+ // Generic helper: borrow a Session, run func(Session&), return/evict it, and
+ // forward the result. Evicts the Session on IoTDBConnectionException.
+ template <typename Func> auto execute(Func&& func) ->
decltype(func(std::declval<Session&>()));
+
+ // ---- convenience wrappers for common operations (with eviction on
failure) ----
+ void insertTablet(Tablet& tablet, bool sorted = false);
+ void insertAlignedTablet(Tablet& tablet, bool sorted = false);
+ void insertTablets(std::unordered_map<std::string, Tablet*>& tablets, bool
sorted = false);
+ void insertRecord(const std::string& deviceId, int64_t time,
+ const std::vector<std::string>& measurements,
+ const std::vector<std::string>& values);
+ void insertRecords(const std::vector<std::string>& deviceIds, const
std::vector<int64_t>& times,
+ const std::vector<std::vector<std::string>>&
measurementsList,
+ const std::vector<std::vector<std::string>>& valuesList);
+ void executeNonQueryStatement(const std::string& sql);
+ // The returned wrapper keeps the underlying Session leased until it is
+ // destroyed, so it is safe to iterate the result set across multiple
fetches.
+ PooledSessionDataSet executeQueryStatement(const std::string& sql);
+ PooledSessionDataSet executeQueryStatement(const std::string& sql, int64_t
timeoutInMs);
+
+ // Close the pool: idle Sessions are closed immediately, in-use Sessions are
+ // closed when they are returned. Idempotent.
+ void close();
+
+ // ---- observability ----
+ size_t getMaxSize() const {
+ return maxSize_;
+ }
+ // Number of Sessions currently borrowed.
+ size_t activeCount();
+
+private:
+ friend class PooledSession;
+
+ std::shared_ptr<Session> constructNewSession();
+ std::shared_ptr<Session> acquire(int64_t timeoutMs);
+ void putBack(const std::shared_ptr<Session>& session, bool broken);
+
+ // connection parameters
+ std::string host_;
+ int rpcPort_;
+ std::vector<std::string> nodeUrls_;
+ std::string username_;
+ std::string password_;
+ std::string zoneId_;
+ int fetchSize_ = AbstractSessionBuilder::DEFAULT_FETCH_SIZE;
+ std::string sqlDialect_ = AbstractSessionBuilder::DEFAULT_SQL_DIALECT;
+ std::string database_;
+ bool enableRedirection_ =
AbstractSessionBuilder::DEFAULT_ENABLE_REDIRECTIONS;
+ bool enableAutoFetch_ = AbstractSessionBuilder::DEFAULT_ENABLE_AUTO_FETCH;
+ bool enableRPCCompression_ =
AbstractSessionBuilder::DEFAULT_ENABLE_RPC_COMPRESSION;
+ int connectTimeoutMs_ = AbstractSessionBuilder::DEFAULT_CONNECT_TIMEOUT_MS;
+ bool useSSL_ = false;
+ std::string trustCertFilePath_;
+
+ // pool sizing / waiting policy
+ size_t maxSize_;
+ int64_t waitTimeoutMs_ = DEFAULT_WAIT_TIMEOUT_MS;
+
+ // pool state, guarded by mutex_
+ std::mutex mutex_;
+ std::condition_variable cv_;
+ std::deque<std::shared_ptr<Session>> idleQueue_;
+ size_t size_ = 0; // total live Sessions (idle + borrowed)
+ bool closed_ = false;
+};
+
+template <typename Func>
+auto SessionPool::execute(Func&& func) ->
decltype(func(std::declval<Session&>())) {
+ PooledSession lease = getSession();
+ try {
+ return func(*lease);
+ } catch (const IoTDBConnectionException&) {
+ lease.markBroken();
+ throw;
+ }
+}
+
+/*
+ * Fluent builder for SessionPool, mirroring SessionBuilder /
TableSessionBuilder.
+ *
+ * auto pool = SessionPoolBuilder()
+ * .host("127.0.0.1")->rpcPort(6667)
+ * ->username("root")->password("root")
+ * ->maxSize(10)->build();
+ */
+class SessionPoolBuilder : public AbstractSessionBuilder {
+public:
+ SessionPoolBuilder* host(const std::string& v) {
+ AbstractSessionBuilder::host = v;
+ return this;
+ }
+ SessionPoolBuilder* rpcPort(int v) {
+ AbstractSessionBuilder::rpcPort = v;
+ return this;
+ }
+ SessionPoolBuilder* nodeUrls(const std::vector<std::string>& v) {
+ AbstractSessionBuilder::nodeUrls = v;
+ return this;
+ }
+ SessionPoolBuilder* username(const std::string& v) {
+ AbstractSessionBuilder::username = v;
+ return this;
+ }
+ SessionPoolBuilder* password(const std::string& v) {
+ AbstractSessionBuilder::password = v;
+ return this;
+ }
+ SessionPoolBuilder* zoneId(const std::string& v) {
+ AbstractSessionBuilder::zoneId = v;
+ return this;
+ }
+ SessionPoolBuilder* fetchSize(int v) {
+ AbstractSessionBuilder::fetchSize = v;
+ return this;
+ }
+ SessionPoolBuilder* database(const std::string& v) {
+ AbstractSessionBuilder::database = v;
+ return this;
+ }
+ SessionPoolBuilder* enableAutoFetch(bool v) {
+ AbstractSessionBuilder::enableAutoFetch = v;
+ return this;
+ }
+ SessionPoolBuilder* enableRedirections(bool v) {
+ AbstractSessionBuilder::enableRedirections = v;
+ return this;
+ }
+ SessionPoolBuilder* enableRPCCompression(bool v) {
+ AbstractSessionBuilder::enableRPCCompression = v;
+ return this;
+ }
+ SessionPoolBuilder* connectTimeoutMs(int v) {
+ AbstractSessionBuilder::connectTimeoutMs = v;
+ return this;
+ }
+ SessionPoolBuilder* useSSL(bool v) {
+ AbstractSessionBuilder::useSSL = v;
+ return this;
+ }
+ SessionPoolBuilder* trustCertFilePath(const std::string& v) {
+ AbstractSessionBuilder::trustCertFilePath = v;
+ return this;
+ }
+ SessionPoolBuilder* maxSize(size_t v) {
+ maxSize_ = v;
+ return this;
+ }
+ SessionPoolBuilder* waitToGetSessionTimeoutMs(int64_t v) {
+ waitTimeoutMs_ = v;
+ return this;
+ }
+ SessionPoolBuilder* sqlDialect(const std::string& v) {
+ AbstractSessionBuilder::sqlDialect = v;
+ return this;
+ }
+
+ std::shared_ptr<SessionPool> build() {
+ if (!AbstractSessionBuilder::nodeUrls.empty() &&
+ (AbstractSessionBuilder::host != DEFAULT_HOST ||
+ AbstractSessionBuilder::rpcPort != DEFAULT_RPC_PORT)) {
+ throw IoTDBException(
+ "SessionPool builder does not support setting node urls and
host/rpcPort at the same "
+ "time.");
+ }
+ std::shared_ptr<SessionPool> pool;
+ if (!AbstractSessionBuilder::nodeUrls.empty()) {
+ pool = std::make_shared<SessionPool>(AbstractSessionBuilder::nodeUrls,
+ AbstractSessionBuilder::username,
+ AbstractSessionBuilder::password,
maxSize_);
+ } else {
+ pool = std::make_shared<SessionPool>(
+ AbstractSessionBuilder::host, AbstractSessionBuilder::rpcPort,
+ AbstractSessionBuilder::username, AbstractSessionBuilder::password,
maxSize_);
+ }
+ pool->setFetchSize(AbstractSessionBuilder::fetchSize)
+ .setZoneId(AbstractSessionBuilder::zoneId)
+ .setSqlDialect(AbstractSessionBuilder::sqlDialect)
+ .setDatabase(AbstractSessionBuilder::database)
+ .setEnableRedirection(AbstractSessionBuilder::enableRedirections)
+ .setEnableAutoFetch(AbstractSessionBuilder::enableAutoFetch)
+ .setEnableRPCCompression(AbstractSessionBuilder::enableRPCCompression)
+ .setConnectTimeoutMs(AbstractSessionBuilder::connectTimeoutMs)
+ .setWaitToGetSessionTimeoutMs(waitTimeoutMs_)
+ .setUseSSL(AbstractSessionBuilder::useSSL)
+ .setTrustCertFilePath(AbstractSessionBuilder::trustCertFilePath);
+ return pool;
+ }
+
+private:
+ size_t maxSize_ = SessionPool::DEFAULT_MAX_SIZE;
+ int64_t waitTimeoutMs_ = SessionPool::DEFAULT_WAIT_TIMEOUT_MS;
+};
+
+#endif // IOTDB_SESSIONPOOL_H
diff --git a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
index 0bf30bbbf1b..ff850d943f1 100644
--- a/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
+++ b/iotdb-client/client-cpp/src/test/cpp/sessionIT.cpp
@@ -21,8 +21,12 @@
#include "Column.h"
#include "Session.h"
#include "SessionBuilder.h"
+#include "SessionPool.h"
#include "TsBlock.h"
+#include <atomic>
+#include <thread>
+
using namespace std;
extern std::shared_ptr<Session> session;
@@ -898,4 +902,135 @@ TEST_CASE("Numeric column widening getters align with
Java TsFile", "[column]")
std::vector<int64_t> longValues = {1000};
auto longColumn = std::make_shared<LongColumn>(0, 1, valueIsNull,
longValues);
REQUIRE(longColumn->getDouble(0) == Approx(1000.0));
-}
\ No newline at end of file
+}
+TEST_CASE("SessionPool basic borrow/insert/query via RAII lease",
"[sessionPool]") {
+ CaseReporter cr("SessionPool basic");
+ auto pool = SessionPoolBuilder()
+ .host("127.0.0.1")
+ ->rpcPort(6667)
+ ->username("root")
+ ->password("root")
+ ->maxSize(3)
+ ->build();
+
+ {
+ PooledSession s = pool->getSession();
+ try {
+ s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*");
+ } catch (const std::exception&) {
+ // Ignore: the timeseries may not exist yet on a fresh database.
+ }
+ }
+
+ const int rows = 50;
+ for (int i = 0; i < rows; i++) {
+ PooledSession s = pool->getSession();
+ s->insertRecord("root.test.pool.d1", i, {"s1"}, {to_string(i)});
+ }
+
+ int count = 0;
+ {
+ PooledSessionDataSet ds = pool->executeQueryStatement("select s1 from
root.test.pool.d1");
+ while (ds->hasNext()) {
+ ds->next();
+ count++;
+ }
+ }
+ REQUIRE(count == rows);
+
+ {
+ PooledSession s = pool->getSession();
+ s->executeNonQueryStatement("delete timeseries root.test.pool.d1.*");
+ }
+ pool->close();
+}
+
+TEST_CASE("SessionPool is safe under concurrent writers", "[sessionPool]") {
+ CaseReporter cr("SessionPool concurrency");
+ auto pool = SessionPoolBuilder()
+ .host("127.0.0.1")
+ ->rpcPort(6667)
+ ->username("root")
+ ->password("root")
+ ->maxSize(4)
+ ->build();
+
+ {
+ PooledSession s = pool->getSession();
+ try {
+ s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*");
+ } catch (const std::exception&) {
+ // Ignore: the timeseries may not exist yet on a fresh database.
+ }
+ }
+
+ const int threadCount = 8;
+ const int rowsPerThread = 100;
+ std::atomic<int> failures(0);
+ std::vector<std::thread> threads;
+ for (int t = 0; t < threadCount; t++) {
+ threads.emplace_back([&pool, t, rowsPerThread, &failures]() {
+ try {
+ for (int i = 0; i < rowsPerThread; i++) {
+ int64_t ts = static_cast<int64_t>(t) * rowsPerThread + i;
+ // Mix RAII and convenience APIs to exercise both borrow paths.
+ if (i % 2 == 0) {
+ pool->insertRecord("root.test.pool.d2", ts, {"s1"},
{to_string(ts)});
+ } else {
+ PooledSession s = pool->getSession();
+ s->insertRecord("root.test.pool.d2", ts, {"s1"}, {to_string(ts)});
+ }
+ }
+ } catch (const std::exception& e) {
+ std::cerr << "writer thread failed: " << e.what() << std::endl;
+ failures++;
+ }
+ });
+ }
+ for (auto& th : threads) {
+ th.join();
+ }
+ REQUIRE(failures.load() == 0);
+ REQUIRE(pool->getMaxSize() == 4);
+
+ int count = 0;
+ {
+ PooledSessionDataSet ds = pool->executeQueryStatement("select s1 from
root.test.pool.d2");
+ while (ds->hasNext()) {
+ ds->next();
+ count++;
+ }
+ }
+ REQUIRE(count == threadCount * rowsPerThread);
+
+ {
+ PooledSession s = pool->getSession();
+ s->executeNonQueryStatement("delete timeseries root.test.pool.d2.*");
+ }
+ pool->close();
+}
+
+TEST_CASE("SessionPool getSession times out when exhausted", "[sessionPool]") {
+ CaseReporter cr("SessionPool exhaustion timeout");
+ auto pool = SessionPoolBuilder()
+ .host("127.0.0.1")
+ ->rpcPort(6667)
+ ->username("root")
+ ->password("root")
+ ->maxSize(1)
+ ->waitToGetSessionTimeoutMs(200)
+ ->build();
+
+ PooledSession held = pool->getSession();
+ REQUIRE(static_cast<bool>(held));
+ REQUIRE(pool->activeCount() == 1);
+ // The only Session is checked out, so a second borrow must time out.
+ REQUIRE_THROWS_AS(pool->getSession(), IoTDBException);
+
+ held.release();
+ // After returning it, a borrow succeeds again.
+ PooledSession reused = pool->getSession();
+ REQUIRE(static_cast<bool>(reused));
+ reused.release();
+ pool->close();
+}