This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4eb7521fb36 IGNITE-26358 DB API Driver: Heartbeats (#7528)
4eb7521fb36 is described below
commit 4eb7521fb36c7b6908fc3c58ca6e8696b774b676
Author: Igor Sapego <[email protected]>
AuthorDate: Tue Feb 17 15:47:45 2026 +0100
IGNITE-26358 DB API Driver: Heartbeats (#7528)
---
modules/platforms/python/cpp_module/module.cpp | 22 ++-
.../platforms/python/cpp_module/node_connection.h | 194 +++++++++++++--------
.../platforms/python/cpp_module/py_connection.cpp | 47 +++--
.../platforms/python/cpp_module/py_connection.h | 5 +-
modules/platforms/python/cpp_module/statement.cpp | 32 ++--
modules/platforms/python/cpp_module/statement.h | 6 +-
modules/platforms/python/cpp_module/utils.cpp | 31 +++-
modules/platforms/python/cpp_module/utils.h | 2 +-
.../platforms/python/pyignite_dbapi/__init__.py | 2 +
modules/platforms/python/tests/conftest.py | 21 ++-
modules/platforms/python/tests/test_connect.py | 42 +++++
11 files changed, 276 insertions(+), 128 deletions(-)
diff --git a/modules/platforms/python/cpp_module/module.cpp
b/modules/platforms/python/cpp_module/module.cpp
index fd8fb96959c..e4b143a89a3 100644
--- a/modules/platforms/python/cpp_module/module.cpp
+++ b/modules/platforms/python/cpp_module/module.cpp
@@ -58,8 +58,10 @@ PyObject* make_connection()
}
PyObject* make_connection(std::vector<ignite::end_point> addresses, const
char* schema, const char* identity, const char* secret,
- int page_size, int timeout, bool autocommit, ssl_config &&ssl_cfg) {
- auto py_conn = make_py_connection(std::move(addresses), schema, identity,
secret, page_size, timeout, autocommit, std::move(ssl_cfg));
+ int page_size, int timeout, double heartbeat_interval, bool autocommit,
ssl_config &&ssl_cfg) {
+ auto py_conn = make_py_connection(std::move(addresses), schema, identity,
secret, page_size, timeout, heartbeat_interval,
+ autocommit, std::move(ssl_cfg));
+
if (!py_conn)
return nullptr;
@@ -81,12 +83,13 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args,
PyObject* kwargs) {
const_cast<char*>("schema"),
const_cast<char*>("timezone"),
const_cast<char*>("timeout"),
+ const_cast<char*>("heartbeat_interval"),
const_cast<char*>("page_size"),
const_cast<char*>("autocommit"),
- "use_ssl",
- "ssl_keyfile",
- "ssl_certfile",
- "ssl_ca_certfile",
+ const_cast<char*>("use_ssl"),
+ const_cast<char*>("ssl_keyfile"),
+ const_cast<char*>("ssl_certfile"),
+ const_cast<char*>("ssl_ca_certfile"),
nullptr
};
@@ -96,6 +99,7 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args,
PyObject* kwargs) {
const char *schema = nullptr;
const char *timezone = nullptr;
int timeout = 0;
+ double heartbeat_interval = 30.0;
int page_size = 0;
int autocommit = 1;
int use_ssl = 0;
@@ -103,8 +107,8 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args,
PyObject* kwargs) {
const char *ssl_certfile = nullptr;
const char *ssl_ca_certfile = nullptr;
- int parsed = PyArg_ParseTupleAndKeywords(args, kwargs, "O|$ssssiippsss",
kwlist, &address, &identity, &secret,
- &schema, &timezone, &timeout, &page_size, &autocommit, &use_ssl,
&ssl_keyfile, &ssl_certfile, &ssl_ca_certfile);
+ int parsed = PyArg_ParseTupleAndKeywords(args, kwargs, "O|$ssssidippsss",
kwlist, &address, &identity, &secret, &schema,
+ &timezone, &timeout, &heartbeat_interval, &page_size, &autocommit,
&use_ssl, &ssl_keyfile, &ssl_certfile, &ssl_ca_certfile);
if (!parsed)
return nullptr;
@@ -157,7 +161,7 @@ PyObject* pyignite_dbapi_connect(PyObject*, PyObject* args,
PyObject* kwargs) {
ssl_config ssl_cfg(use_ssl != 0, ssl_keyfile, ssl_certfile,
ssl_ca_certfile);
- return make_connection(std::move(addresses), schema, identity, secret,
page_size, timeout, autocommit != 0, std::move(ssl_cfg));
+ return make_connection(std::move(addresses), schema, identity, secret,
page_size, timeout, heartbeat_interval, autocommit != 0, std::move(ssl_cfg));
}
PyMethodDef methods[] = {
diff --git a/modules/platforms/python/cpp_module/node_connection.h
b/modules/platforms/python/cpp_module/node_connection.h
index c372027b922..a97c0508f68 100644
--- a/modules/platforms/python/cpp_module/node_connection.h
+++ b/modules/platforms/python/cpp_module/node_connection.h
@@ -19,6 +19,7 @@
#include "ignite/common/end_point.h"
#include "ignite/common/detail/bytes.h"
+#include "ignite/common/detail/thread_timer.h"
#include "ignite/common/detail/utils.h"
#include "ignite/network/socket_client.h"
#include "ignite/network/network.h"
@@ -32,20 +33,50 @@
#include <cstdint>
#include <cassert>
#include <optional>
+#include <memory>
#include <random>
#include <string>
+#include <mutex>
#include "ssl_config.h"
#include "type_conversion.h"
+#include "ignite/protocol/heartbeat_timeout.h"
+
/**
* A single node connection.
* TODO: https://issues.apache.org/jira/browse/IGNITE-25744 Move connection
logic to the protocol library.
*/
-class node_connection final {
+class node_connection final : public
std::enable_shared_from_this<node_connection> {
public:
static constexpr std::int32_t DEFAULT_TIMEOUT_SECONDS = 30;
+ static constexpr std::chrono::milliseconds DEFAULT_HEARTBEAT_INTERVAL =
std::chrono::seconds(30);
+ static constexpr std::int32_t DEFAULT_PAGE_SIZE = 1024;
+ static constexpr bool DEFAULT_AUTO_COMMIT = true;
+ static constexpr std::string_view DEFAULT_SCHEMA = "PUBLIC";
+
+ struct auth_configuration final {
+ std::string m_identity{};
+ std::string m_secret{};
+ };
+
+ struct configuration final {
+ configuration(std::vector<ignite::end_point> addresses, bool
autocommit, ssl_config ssl_config, std::chrono::milliseconds heartbeat_interval)
+ : m_addresses(std::move(addresses))
+ , m_auto_commit(autocommit)
+ , m_ssl_configuration(std::move(ssl_config))
+ , m_heartbeat_interval(heartbeat_interval) {}
+
+ std::vector<ignite::end_point> m_addresses;
+ std::string m_schema{DEFAULT_SCHEMA};
+ auth_configuration m_auth_configuration{};
+ std::int32_t m_page_size{DEFAULT_PAGE_SIZE};
+ std::int32_t m_timeout{DEFAULT_TIMEOUT_SECONDS};
+ std::chrono::milliseconds
m_heartbeat_interval{DEFAULT_HEARTBEAT_INTERVAL};
+ bool m_auto_commit{DEFAULT_AUTO_COMMIT};
+ ssl_config m_ssl_configuration;
+ };
/**
* Destructor.
@@ -59,48 +90,37 @@ public:
*
* @return Schema.
*/
- [[nodiscard]] const std::string &get_schema() const { return m_schema; }
+ [[nodiscard]] const std::string &get_schema() const { return
m_configuration.m_schema; }
/**
* Get page size.
*
* @return Page size.
*/
- [[nodiscard]] std::int32_t get_page_size() const { return m_page_size; }
+ [[nodiscard]] std::int32_t get_page_size() const { return
m_configuration.m_page_size; }
/**
* Get timeout.
*
* @return Timeout.
*/
- [[nodiscard]] std::int32_t get_timeout() const { return m_timeout; }
+ [[nodiscard]] std::int32_t get_timeout() const { return
m_configuration.m_timeout; }
/**
* Constructor.
*
- * @param addresses Addresses.
- * @param schema Schema. Can be empty.
- * @param auth_identity Auth identity. Can be empty.
- * @param auth_secret Auth secret. Can be empty.
- * @param page_size Page size.
- * @param timeout Timeout.
- * @param auto_commit Auto commit flag.
- * @param ssl_cfg SSL Configuration.
- */
- node_connection(std::vector<ignite::end_point> addresses, std::string
schema, std::string auth_identity,
- std::string auth_secret, std::int32_t page_size, std::int32_t
timeout, bool auto_commit, ssl_config ssl_cfg)
- : m_addresses(std::move(addresses))
- , m_schema(schema.empty() ? "PUBLIC" : std::move(schema))
- , m_auth_identity(std::move(auth_identity))
- , m_auth_secret(std::move(auth_secret))
- , m_page_size(page_size > 0 ? page_size : 1024)
- , m_timeout(timeout > 0 ? timeout : DEFAULT_TIMEOUT_SECONDS)
- , m_auto_commit(auto_commit)
- , m_ssl_config(std::move(ssl_cfg))
+ * @param cfg Configuration.
+ */
+ node_connection(configuration cfg)
+ : m_configuration(std::move(cfg))
+ , m_auto_commit(m_configuration.m_auto_commit)
+ , m_timer_thread(ignite::detail::thread_timer::start([] (auto&&) { /*
Ignore */ }))
{
+ assert(!m_configuration.m_addresses.empty());
+
std::random_device device;
std::mt19937 generator(device());
- std::uniform_int_distribution<std::uint32_t> distribution(0,
m_addresses.size());
+ std::uniform_int_distribution<std::uint32_t> distribution(0,
m_configuration.m_addresses.size() - 1);
m_current_address_idx = distribution(generator);
}
@@ -117,13 +137,6 @@ public:
}
}
- /**
- * Get autocommit flag.
- *
- * @return Autocommit flag.
- */
- bool is_autocommit() const noexcept { return m_auto_commit; }
-
/**
* Set autocommit flag.
*
@@ -208,7 +221,7 @@ public:
*
* @return @c true if the auto commit is enabled.
*/
- [[nodiscard]] bool is_auto_commit() const { return m_auto_commit; }
+ [[nodiscard]] bool is_auto_commit() const noexcept { return m_auto_commit;
}
/**
* Get transaction ID.
@@ -229,8 +242,9 @@ public:
auto req_id = generate_next_req_id();
auto request = make_request(req_id, op, wr);
- send_message(request, m_timeout);
- return receive_message_nothrow(req_id, m_timeout);
+ std::lock_guard lock(m_socket_mutex);
+ send_message(request, m_configuration.m_timeout);
+ return receive_message_nothrow(req_id, m_configuration.m_timeout);
}
private:
@@ -256,6 +270,8 @@ private:
sent += res;
}
+ m_last_message_ts = std::chrono::steady_clock::now();
+
assert(static_cast<std::size_t>(sent) == size);
}
@@ -338,6 +354,7 @@ private:
if (res.second) {
throw std::move(*res.second);
}
+
return std::move(res.first);
}
@@ -389,7 +406,7 @@ private:
* @param op Operation.
* @param func Function.
*/
- std::vector<std::byte> make_request(std::int64_t id,
ignite::protocol::client_operation op,
+ static std::vector<std::byte> make_request(std::int64_t id,
ignite::protocol::client_operation op,
const std::function<void(ignite::protocol::writer &)> &func) {
std::vector<std::byte> req;
ignite::protocol::buffer_adapter buffer(req);
@@ -417,8 +434,9 @@ private:
auto req_id = generate_next_req_id();
auto request = make_request(req_id, op, wr);
- send_message(request, m_timeout);
- return receive_message(req_id, m_timeout);
+ std::lock_guard lock(m_socket_mutex);
+ send_message(request, m_configuration.m_timeout);
+ return receive_message(req_id, m_configuration.m_timeout);
}
/**
@@ -445,7 +463,7 @@ private:
*/
void try_restore_connection() {
if (!m_socket) {
- if (m_ssl_config.m_enabled) {
+ if (m_configuration.m_ssl_configuration.m_enabled) {
try
{
ignite::network::ensure_ssl_loaded();
@@ -461,13 +479,13 @@ private:
}
throw
ignite::ignite_error(ignite::error::code::CLIENT_SSL_CONFIGURATION,
- "Can not load OpenSSL library. [" + openssl_home_str +
"]");
+ "Can not load OpenSSL library. [path=" +
openssl_home_str + ", error=" + err.what_str() + "]");
}
ignite::network::secure_configuration cfg;
- cfg.key_path = m_ssl_config.m_ssl_keyfile;
- cfg.cert_path = m_ssl_config.m_ssl_certfile;
- cfg.ca_path = m_ssl_config.m_ssl_ca_certfile;
+ cfg.key_path =
m_configuration.m_ssl_configuration.m_ssl_keyfile;
+ cfg.cert_path =
m_configuration.m_ssl_configuration.m_ssl_certfile;
+ cfg.ca_path =
m_configuration.m_ssl_configuration.m_ssl_ca_certfile;
m_socket =
ignite::network::make_secure_socket_client(std::move(cfg));
} else {
@@ -477,12 +495,12 @@ private:
std::stringstream msgs;
bool connected = false;
- for (std::int32_t i = 0; i < m_addresses.size(); ++i) {
- uint32_t idx = (m_current_address_idx + i) % m_addresses.size();
- const ignite::end_point &address = m_addresses[idx];
+ for (std::int32_t i = 0; i < m_configuration.m_addresses.size(); ++i) {
+ uint32_t idx = (m_current_address_idx + i) %
m_configuration.m_addresses.size();
+ const ignite::end_point &address =
m_configuration.m_addresses[idx];
try {
- bool success = m_socket->connect(address.host.c_str(),
address.port, m_timeout);
+ bool success = m_socket->connect(address.host.c_str(),
address.port, m_configuration.m_timeout);
if (!success) {
continue;
}
@@ -514,20 +532,22 @@ private:
static constexpr std::int8_t CLIENT_CODE = 4;
m_protocol_version = ignite::protocol::protocol_version::get_current();
+ std::lock_guard lock(m_socket_mutex);
+
std::map<std::string, std::string> extensions;
- if (!m_auth_identity.empty()) {
+ if (!m_configuration.m_auth_configuration.m_identity.empty()) {
static const std::string AUTH_TYPE{"basic"};
extensions.emplace("authn-type", AUTH_TYPE);
- extensions.emplace("authn-identity", m_auth_identity);
- extensions.emplace("authn-secret", m_auth_secret);
+ extensions.emplace("authn-identity",
m_configuration.m_auth_configuration.m_identity);
+ extensions.emplace("authn-secret",
m_configuration.m_auth_configuration.m_secret);
}
std::vector<std::byte> message = make_handshake_request(CLIENT_CODE,
m_protocol_version, extensions);
- send_all(message.data(), message.size(), m_timeout);
- receive_and_check_magic(message, m_timeout);
- receive_message(message, m_timeout);
+ send_all(message.data(), message.size(), m_configuration.m_timeout);
+ receive_and_check_magic(message, m_configuration.m_timeout);
+ receive_message(message, m_configuration.m_timeout);
auto response = ignite::protocol::parse_handshake_response(message);
auto const &ver = response.context.get_version();
@@ -540,6 +560,13 @@ private:
if (response.error) {
throw ignite::ignite_error(ignite::error::code::HANDSHAKE_HEADER,
"Server rejected handshake with error: " + response.error->what_str());
}
+
+ m_heartbeat_interval = ignite::calculate_heartbeat_interval(
+ m_configuration.m_heartbeat_interval,
std::chrono::milliseconds(response.idle_timeout_ms));
+
+ if (m_heartbeat_interval.count()) {
+ plan_heartbeat(m_heartbeat_interval);
+ }
}
/**
@@ -603,36 +630,50 @@ private:
void on_observable_timestamp(std::int64_t timestamp) {
auto expected = m_observable_timestamp.load();
while (expected < timestamp) {
- auto success =
m_observable_timestamp.compare_exchange_weak(expected, timestamp);
- if (success)
+ if (m_observable_timestamp.compare_exchange_weak(expected,
timestamp))
return;
expected = m_observable_timestamp.load();
}
}
- /** Addresses. */
- const std::vector<ignite::end_point> m_addresses;
+ void send_heartbeat() {
+ auto [data, err] =
sync_request_nothrow(ignite::protocol::client_operation::HEARTBEAT,
[](auto&){});
+ if (!err) {
+ plan_heartbeat(m_heartbeat_interval);
+ }
- /** Schema. */
- const std::string m_schema;
+ // There is no useful payload for us in the heartbeat response.
+ UNUSED_VALUE(data);
+ }
- /** Identity. */
- const std::string m_auth_identity;
+ void on_heartbeat_timeout() {
+ auto idle_for = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - m_last_message_ts);
- /** Secret. */
- const std::string m_auth_secret;
+ if (idle_for > m_heartbeat_interval) {
+ send_heartbeat();
+ } else {
+ auto sleep_for = m_heartbeat_interval - idle_for;
+ plan_heartbeat(sleep_for);
+ }
+ }
- /** Page size. */
- const std::int32_t m_page_size;
+ void plan_heartbeat(std::chrono::milliseconds timeout) {
+ m_timer_thread->add(timeout, [self_weak = weak_from_this()] {
+ if (auto self = self_weak.lock()) {
+ self->on_heartbeat_timeout();
+ }
+ });
+ }
- /** Current address index. */
- std::uint32_t m_current_address_idx{0};
+ /** Configuration. */
+ const configuration m_configuration;
- /** Operation timeout in seconds. */
- std::int32_t m_timeout{DEFAULT_TIMEOUT_SECONDS};
+ /** Auto-commit. */
+ bool m_auto_commit;
- /** Autocommit flag. */
- bool m_auto_commit{true};
+ /** Current address index. */
+ std::uint32_t m_current_address_idx{0};
/** Current transaction ID. */
std::optional<std::int64_t> m_transaction_id;
@@ -652,6 +693,15 @@ private:
/** Observable timestamp. */
std::atomic_int64_t m_observable_timestamp{0};
- /** SSL Configuration. */
- const ssl_config m_ssl_config;
+ /** Heartbeat interval. */
+ std::chrono::milliseconds m_heartbeat_interval{0};
+
+ /** Last message timestamp. */
+ std::chrono::steady_clock::time_point m_last_message_ts{};
+
+ /** Timer thread. */
+ std::shared_ptr<ignite::detail::thread_timer> m_timer_thread;
+
+ /** Socket mutex. */
+ std::recursive_mutex m_socket_mutex;
};
diff --git a/modules/platforms/python/cpp_module/py_connection.cpp
b/modules/platforms/python/cpp_module/py_connection.cpp
index 68e468d760d..3c1ff9abb55 100644
--- a/modules/platforms/python/cpp_module/py_connection.cpp
+++ b/modules/platforms/python/cpp_module/py_connection.cpp
@@ -34,7 +34,7 @@ namespace {
struct py_connection {
PyObject_HEAD
- node_connection *m_connection;
+ std::shared_ptr<node_connection> *m_connection;
};
/**
@@ -54,7 +54,7 @@ bool py_connection_expect_open(const py_connection* self) {
PyObject* py_connection_close(py_connection* self, PyObject*)
{
if (self->m_connection) {
- self->m_connection->close();
+ (*self->m_connection)->close();
delete self->m_connection;
self->m_connection = nullptr;
@@ -85,7 +85,7 @@ PyObject* py_connection_autocommit(py_connection* self,
PyObject*)
if (!py_connection_expect_open(self))
return nullptr;
- if (self->m_connection->is_autocommit()) {
+ if ((*self->m_connection)->is_auto_commit()) {
Py_RETURN_TRUE;
}
Py_RETURN_FALSE;
@@ -101,7 +101,7 @@ PyObject* py_connection_set_autocommit(py_connection* self,
PyObject* value)
return nullptr;
}
- self->m_connection->set_autocommit(value == Py_True);
+ (*self->m_connection)->set_autocommit(value == Py_True);
Py_RETURN_NONE;
}
@@ -112,7 +112,7 @@ PyObject* py_connection_commit(py_connection* self,
PyObject*)
return nullptr;
try {
- self->m_connection->transaction_commit();
+ (*self->m_connection)->transaction_commit();
} catch (const ignite::ignite_error& err) {
set_error(err);
return nullptr;
@@ -126,7 +126,7 @@ PyObject* py_connection_rollback(py_connection* self,
PyObject*)
return nullptr;
try {
- self->m_connection->transaction_rollback();
+ (*self->m_connection)->transaction_rollback();
} catch (const ignite::ignite_error& err) {
set_error(err);
return nullptr;
@@ -194,21 +194,34 @@ int register_py_connection_type(PyObject* mod) {
}
PyObject *make_py_connection(std::vector<ignite::end_point> addresses, const
char* schema, const char* identity,
- const char* secret, int page_size, int timeout, bool autocommit,
ssl_config &&ssl_cfg) {
+ const char* secret, int page_size, int timeout, double heartbeat_interval,
bool autocommit, ssl_config &&ssl_cfg) {
if (addresses.empty()) {
PyErr_SetString(py_get_module_interface_error_class(), "No addresses
provided to connect");
return nullptr;
}
- auto node_connection = std::make_unique<class node_connection>(
- addresses,
- schema ? schema : "",
- identity ? identity : "",
- secret ? secret : "",
- page_size ? page_size : 1024,
- timeout,
- autocommit,
- std::move(ssl_cfg));
+ if (heartbeat_interval < 0.0)
+ heartbeat_interval = 0.0;
+
+ std::chrono::milliseconds heartbeat_interval_chrono =
std::chrono::milliseconds(static_cast<int>(std::ceil(heartbeat_interval *
1000)));
+ node_connection::configuration cfg{addresses, autocommit, ssl_cfg,
heartbeat_interval_chrono};
+
+ if (schema)
+ cfg.m_schema = schema;
+
+ if (identity)
+ cfg.m_auth_configuration.m_identity = identity;
+
+ if (secret)
+ cfg.m_auth_configuration.m_secret = secret;
+
+ if (page_size)
+ cfg.m_page_size = page_size;
+
+ if (timeout)
+ cfg.m_timeout = timeout;
+
+ auto node_connection = std::make_shared<class node_connection>(cfg);
try {
node_connection->establish();
@@ -221,7 +234,7 @@ PyObject *make_py_connection(std::vector<ignite::end_point>
addresses, const cha
if (!py_conn_obj)
return nullptr;
- py_conn_obj->m_connection = node_connection.release();
+ py_conn_obj->m_connection = new std::shared_ptr<class
node_connection>(std::move(node_connection));
return reinterpret_cast<PyObject *>(py_conn_obj);
}
diff --git a/modules/platforms/python/cpp_module/py_connection.h
b/modules/platforms/python/cpp_module/py_connection.h
index bcd885d1231..a1522c35cea 100644
--- a/modules/platforms/python/cpp_module/py_connection.h
+++ b/modules/platforms/python/cpp_module/py_connection.h
@@ -34,13 +34,14 @@
* @param identity Identity.
* @param secret Secret.
* @param page_size Page size.
- * @param timeout Timeout.
+ * @param timeout Timeout in seconds.
+ * @param heartbeat_interval Heartbeat interval in seconds.
* @param autocommit Autocommit flag.
* @param ssl_cfg SSL Config.
* @return A new connection class instance.
*/
PyObject* make_py_connection(std::vector<ignite::end_point> addresses, const
char* schema, const char* identity,
- const char* secret, int page_size, int timeout, bool autocommit,
ssl_config &&ssl_cfg);
+ const char* secret, int page_size, int timeout, double heartbeat_interval,
bool autocommit, ssl_config &&ssl_cfg);
/**
* Prepare PyConnection type for registration.
diff --git a/modules/platforms/python/cpp_module/statement.cpp
b/modules/platforms/python/cpp_module/statement.cpp
index 00840e1e71e..6076ee0ba68 100644
--- a/modules/platforms/python/cpp_module/statement.cpp
+++ b/modules/platforms/python/cpp_module/statement.cpp
@@ -132,7 +132,7 @@ void statement::close() noexcept {
if (!m_query_id)
return;
- auto res =
m_connection.sync_request_nothrow(ignite::protocol::client_operation::SQL_CURSOR_CLOSE,
+ auto res =
m_connection->sync_request_nothrow(ignite::protocol::client_operation::SQL_CURSOR_CLOSE,
[&](ignite::protocol::writer &writer) { writer.write(*m_query_id);
});
UNUSED_VALUE res;
@@ -156,16 +156,16 @@ void statement::execute(const char *query,
py_parameter_set ¶ms) {
close();
m_query = query;
- auto &schema = m_connection.get_schema();
+ auto &schema = m_connection->get_schema();
bool single = !params.is_batch_query();
- auto tx = m_connection.get_transaction_id();
- if (!tx && !m_connection.is_auto_commit()) {
+ auto tx = m_connection->get_transaction_id();
+ if (!tx && !m_connection->is_auto_commit()) {
// Starting transaction if it's not started already.
- m_connection.transaction_start();
+ m_connection->transaction_start();
- tx = m_connection.get_transaction_id();
+ tx = m_connection->get_transaction_id();
assert(tx);
}
@@ -173,15 +173,15 @@ void statement::execute(const char *query,
py_parameter_set ¶ms) {
? ignite::protocol::client_operation::SQL_EXEC
: ignite::protocol::client_operation::SQL_EXEC_BATCH;
- auto [resp, err] = m_connection.sync_request_nothrow(client_op,
[&](ignite::protocol::writer &writer) {
+ auto [resp, err] = m_connection->sync_request_nothrow(client_op,
[&](ignite::protocol::writer &writer) {
if (tx)
writer.write(*tx);
else
writer.write_nil();
writer.write(schema);
- writer.write(m_connection.get_page_size());
- writer.write(std::int64_t(m_connection.get_timeout()) * 1000);
+ writer.write(m_connection->get_page_size());
+ writer.write(std::int64_t(m_connection->get_timeout()) * 1000);
writer.write_nil(); // Session timeout (unused, session is closed by
the server immediately).
writer.write_nil(); // Timezone
@@ -195,7 +195,7 @@ void statement::execute(const char *query, py_parameter_set
¶ms) {
writer.write(m_query);
params.write(writer);
- writer.write(m_connection.get_observable_timestamp());
+ writer.write(m_connection->get_observable_timestamp());
});
// Check error
@@ -211,7 +211,7 @@ void statement::execute(const char *query, py_parameter_set
¶ms) {
throw std::move(*err);
}
- m_connection.mark_transaction_non_empty();
+ m_connection->mark_transaction_non_empty();
auto &response = resp;
ignite::protocol::reader reader(response.get_bytes_view());
@@ -279,7 +279,7 @@ bool statement::fetch_next_row() {
ignite::error::code::CURSOR_ALREADY_CLOSED, "Cursor already
closed.");
}
- auto [response, err] = m_connection.sync_request_nothrow(
+ auto [response, err] = m_connection->sync_request_nothrow(
ignite::protocol::client_operation::SQL_CURSOR_NEXT_PAGE,
[&](ignite::protocol::writer &writer) { writer.write(*m_query_id);
});
@@ -301,10 +301,10 @@ bool statement::fetch_next_row() {
}
void statement::update_meta() {
- auto &schema = m_connection.get_schema();
+ auto &schema = m_connection->get_schema();
- auto tx = m_connection.get_transaction_id();
- auto [response, err] =
m_connection.sync_request_nothrow(ignite::protocol::client_operation::SQL_QUERY_META,
+ auto tx = m_connection->get_transaction_id();
+ auto [response, err] =
m_connection->sync_request_nothrow(ignite::protocol::client_operation::SQL_QUERY_META,
[&](ignite::protocol::writer &writer) {
if (tx)
writer.write(*tx);
@@ -316,7 +316,7 @@ void statement::update_meta() {
});
if (tx) {
- m_connection.mark_transaction_non_empty();
+ m_connection->mark_transaction_non_empty();
}
if (err) {
diff --git a/modules/platforms/python/cpp_module/statement.h
b/modules/platforms/python/cpp_module/statement.h
index 9634fa36309..87fc00ba089 100644
--- a/modules/platforms/python/cpp_module/statement.h
+++ b/modules/platforms/python/cpp_module/statement.h
@@ -111,8 +111,8 @@ public:
*
* @param connection Connection.
*/
- explicit statement(node_connection &connection)
- : m_connection(connection) { }
+ explicit statement(std::shared_ptr<node_connection> connection)
+ : m_connection(std::move(connection)) { }
/**
* Destructor.
@@ -215,7 +215,7 @@ private:
void set_params_meta(std::vector<sql_parameter> value);
/** Connection associated with the statement. */
- node_connection &m_connection;
+ std::shared_ptr<node_connection> m_connection;
/** SQL query. */
std::string m_query;
diff --git a/modules/platforms/python/cpp_module/utils.cpp
b/modules/platforms/python/cpp_module/utils.cpp
index 7b7f08c8dac..a9248ab8fc1 100644
--- a/modules/platforms/python/cpp_module/utils.cpp
+++ b/modules/platforms/python/cpp_module/utils.cpp
@@ -156,7 +156,7 @@ void set_error(const ignite::ignite_error &error) {
}
case ignite::error::code::UNRESOLVABLE_CONSISTENT_ID:
- case ignite::error::code::PORT_IN_USE:
+ case ignite::error::code::BIND:
case ignite::error::code::FILE_TRANSFER:
case ignite::error::code::FILE_VALIDATION:
case ignite::error::code::RECIPIENT_LEFT:
@@ -210,6 +210,7 @@ void set_error(const ignite::ignite_error &error) {
case ignite::error::code::NODE_NOT_FOUND:
case ignite::error::code::MARSHALLING_TYPE_MISMATCH:
case ignite::error::code::COMPUTE_JOB_CANCELLED:
+ case ignite::error::code::RESOURCE_NOT_FOUND:
case ignite::error::code::COMPUTE_PLATFORM_EXECUTOR: {
error_class = py_get_module_database_error_class();
break;
@@ -259,9 +260,31 @@ void set_error(const ignite::ignite_error &error) {
error_class = py_get_module_database_error_class();
break;
}
- }
- PyErr_SetString(error_class, error.what());
+ case ignite::error::code::UNSUPPORTED_TABLE_BASED_REPLICATION:
+ case ignite::error::code::OPERATION_TIMEOUT:
+ case ignite::error::code::TX_DELAYED_ACK:
+ case ignite::error::code::GROUP_OVERLOADED:
+ case ignite::error::code::GROUP_UNAVAILABLE:
+ case ignite::error::code::EMPTY_DATA_NODES:
+ case ignite::error::code::JOIN_DENIED:
+ case ignite::error::code::EMPTY_ASSIGNMENTS:
+ case ignite::error::code::NOT_ENOUGH_ALIVE_NODES:
+ case ignite::error::code::ILLEGAL_NODES_SET:
+ case ignite::error::code::REQUEST_FORWARD:
+ case ignite::error::code::REMOTE_NODE:
+ case ignite::error::code::CONFIGURATION_APPLY:
+ case ignite::error::code::CONFIGURATION_PARSE:
+ case ignite::error::code::CONFIGURATION_VALIDATION: {
+ error_class = py_get_module_not_supported_error_class();
+ break;
+ }
+ }
+ std::string error_str{error.what_str()};
+ if (error.get_java_stack_trace()) {
+ error_str += "\n" + *error.get_java_stack_trace();
+ }
+ PyErr_SetString(error_class, error_str.c_str());
}
std::string get_current_exception_as_string() {
@@ -278,7 +301,7 @@ std::string get_current_exception_as_string() {
return {data, std::size_t(len)};
}
-const char* py_object_get_typename(PyObject* obj) {
+const char* py_object_get_typename(const PyObject* obj) {
if (!obj || !obj->ob_type || !obj->ob_type->tp_name) {
return "Unknown";
}
diff --git a/modules/platforms/python/cpp_module/utils.h
b/modules/platforms/python/cpp_module/utils.h
index 56320c89af3..76a00f08a1c 100644
--- a/modules/platforms/python/cpp_module/utils.h
+++ b/modules/platforms/python/cpp_module/utils.h
@@ -61,7 +61,7 @@ std::string get_current_exception_as_string();
* @param obj Object.
* @return Typename if available, and "Unknown" otherwise.
*/
-const char* py_object_get_typename(PyObject* obj);
+const char* py_object_get_typename(const PyObject* obj);
/**
* Get the module instance.
diff --git a/modules/platforms/python/pyignite_dbapi/__init__.py
b/modules/platforms/python/pyignite_dbapi/__init__.py
index 0d7894db738..7d6b70afde4 100644
--- a/modules/platforms/python/pyignite_dbapi/__init__.py
+++ b/modules/platforms/python/pyignite_dbapi/__init__.py
@@ -693,6 +693,8 @@ def connect(address: [str], **kwargs) -> Connection:
Maximum number of rows that can be received or sent in a single
request. Default value: 1024.
timeout: int, optional
A timeout for network operations, in seconds. Default value: 30.
+ heartbeat_interval: float, optional
+ An interval between heartbeat probes, in seconds. Zero or negative
means heartbeats disabled. Default value: 30.
autocommit: bool, optional
Connection autocommit mode. Default value: True (enabled).
use_ssl: bool, optional
diff --git a/modules/platforms/python/tests/conftest.py
b/modules/platforms/python/tests/conftest.py
index 20b9dc94288..16a4cc908cb 100644
--- a/modules/platforms/python/tests/conftest.py
+++ b/modules/platforms/python/tests/conftest.py
@@ -31,7 +31,13 @@ def table_name(request):
@pytest.fixture()
def connection():
- conn = pyignite_dbapi.connect(address=server_addresses_basic,
page_size=TEST_PAGE_SIZE)
+ conn = pyignite_dbapi.connect(address=server_addresses_basic,
page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
+ yield conn
+ conn.close()
+
[email protected]()
+def service_connection():
+ conn = pyignite_dbapi.connect(address=server_addresses_basic,
page_size=TEST_PAGE_SIZE, heartbeat_interval=2)
yield conn
conn.close()
@@ -44,10 +50,17 @@ def cursor(connection):
@pytest.fixture()
-def drop_table_cleanup(cursor, table_name):
+def service_cursor(service_connection):
+ cursor = service_connection.cursor()
+ yield cursor
+ cursor.close()
+
+
[email protected]()
+def drop_table_cleanup(service_cursor, table_name):
+ service_cursor.execute(f'drop table if exists {table_name}')
yield None
- cursor.connection.setautocommit(True)
- cursor.execute(f'drop table if exists {table_name}')
+ service_cursor.execute(f'drop table if exists {table_name}')
@pytest.fixture(autouse=True, scope="session")
diff --git a/modules/platforms/python/tests/test_connect.py
b/modules/platforms/python/tests/test_connect.py
index 916fff72563..c6853ae2739 100644
--- a/modules/platforms/python/tests/test_connect.py
+++ b/modules/platforms/python/tests/test_connect.py
@@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import time
+
import pytest
import pyignite_dbapi
@@ -57,3 +59,43 @@ def test_connection_wrong_arg(address, err_msg):
with pytest.raises(pyignite_dbapi.InterfaceError) as err:
pyignite_dbapi.connect(address=address, timeout=1)
assert err.match(err_msg)
+
+
[email protected]("interval", [2.0, 20.0, 0.0001])
+def test_heartbeat_enabled(table_name, drop_table_cleanup, interval):
+ row_count = 10
+ with pyignite_dbapi.connect(address=server_addresses_basic[0],
heartbeat_interval=interval) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(f"create table {table_name}(id int primary key,
data varchar)")
+ for key in range(row_count):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ data_out = {}
+ for key in range(row_count):
+ cursor.execute(f"select id, data from {table_name} WHERE id =
?", [key])
+ data_out[key] = cursor.fetchone()
+ if len(data_out) == 5:
+ time.sleep(7)
+
+ assert len(data_out) == row_count
+
+
+def test_heartbeat_disabled(table_name, drop_table_cleanup):
+ row_count = 10
+ with pyignite_dbapi.connect(address=server_addresses_basic[0],
heartbeat_interval=0) as conn:
+ with conn.cursor() as cursor:
+ cursor.execute(f"create table {table_name}(id int primary key,
data varchar)")
+ for key in range(row_count):
+ cursor.execute(f"insert into {table_name} values({key},
'data-{key*2}')")
+ assert cursor.rowcount == 1
+
+ data_out = {}
+ with pytest.raises(pyignite_dbapi.OperationalError) as err:
+ for key in range(row_count):
+ cursor.execute(f"select id, data from {table_name} where
id = ?", [key])
+ data_out[key] = cursor.fetchone()
+ if len(data_out) == 5:
+ time.sleep(7)
+
+ assert err.match("(Connection closed by the server|Can not send a
message to the server due to connection error)")