This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f1efc25ce7 IGNITE-19399 Support ODBC transactions (#2324)
f1efc25ce7 is described below
commit f1efc25ce7145fd3912953f32b353054501358eb
Author: Igor Sapego <[email protected]>
AuthorDate: Mon Jul 17 18:16:03 2023 +0400
IGNITE-19399 Support ODBC transactions (#2324)
---
.../platforms/cpp/ignite/odbc/query/data_query.cpp | 61 ++-
.../platforms/cpp/ignite/odbc/sql_connection.cpp | 103 +++-
modules/platforms/cpp/ignite/odbc/sql_connection.h | 41 ++
.../platforms/cpp/tests/odbc-test/CMakeLists.txt | 1 +
.../cpp/tests/odbc-test/odbc_connection.h | 16 +
.../cpp/tests/odbc-test/odbc_test_utils.h | 26 +
.../platforms/cpp/tests/odbc-test/timeout_test.cpp | 3 -
.../cpp/tests/odbc-test/transaction_test.cpp | 600 +++++++++++++++++++++
8 files changed, 812 insertions(+), 39 deletions(-)
diff --git a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
index 270a54201f..f59b116eba 100644
--- a/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
+++ b/modules/platforms/cpp/ignite/odbc/query/data_query.cpp
@@ -314,9 +314,19 @@ sql_result data_query::make_request_execute() {
network::data_buffer_owning response;
auto success = m_diag.catch_errors([&] {
+ auto tx = m_connection.get_transaction_id();
+ if (!tx && !m_connection.is_auto_commit()) {
+ // Starting transaction if it's not started already.
+ m_connection.transaction_start();
+
+ tx = m_connection.get_transaction_id();
+ assert(tx);
+ }
response =
m_connection.sync_request(detail::client_operation::SQL_EXEC,
[&](protocol::writer &writer) {
- // TODO: IGNITE-19399 Implement transactions support.
- writer.write_nil();
+ if (tx)
+ writer.write(*tx);
+ else
+ writer.write_nil();
writer.write(schema);
writer.write(m_connection.get_configuration().get_page_size().get_value());
@@ -335,37 +345,35 @@ sql_result data_query::make_request_execute() {
m_params.write(writer);
});
- });
-
- if (!success)
- return sql_result::AI_ERROR;
- auto reader =
std::make_unique<protocol::reader>(response.get_bytes_view());
+ m_connection.mark_transaction_non_empty();
- m_query_id = reader->read_object_nullable<std::int64_t>();
+ auto reader =
std::make_unique<protocol::reader>(response.get_bytes_view());
+ m_query_id = reader->read_object_nullable<std::int64_t>();
- m_has_rowset = reader->read_bool();
- m_has_more_pages = reader->read_bool();
- m_was_applied = reader->read_bool();
- m_rows_affected = reader->read_int64();
+ m_has_rowset = reader->read_bool();
+ m_has_more_pages = reader->read_bool();
+ m_was_applied = reader->read_bool();
+ m_rows_affected = reader->read_int64();
- if (m_has_rowset) {
- auto columns = read_meta(*reader);
- set_resultset_meta(columns);
- auto page = std::make_unique<result_page>(std::move(response),
std::move(reader));
- m_cursor = std::make_unique<cursor>(std::move(page));
- }
+ if (m_has_rowset) {
+ auto columns = read_meta(*reader);
+ set_resultset_meta(columns);
+ auto page = std::make_unique<result_page>(std::move(response),
std::move(reader));
+ m_cursor = std::make_unique<cursor>(std::move(page));
+ }
- m_executed = true;
+ m_executed = true;
+ });
- return sql_result::AI_SUCCESS;
+ return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR;
}
sql_result data_query::make_request_close() {
if (!m_query_id)
return sql_result::AI_SUCCESS;
- LOG_MSG("Closing m_cursor: " << *m_query_id);
+ LOG_MSG("Closing cursor: " << *m_query_id);
auto success = m_diag.catch_errors([&] {
UNUSED_VALUE m_connection.sync_request(
@@ -385,15 +393,12 @@ sql_result
data_query::make_request_fetch(std::unique_ptr<result_page> &page) {
auto success = m_diag.catch_errors([&] {
response =
m_connection.sync_request(detail::client_operation::SQL_CURSOR_NEXT_PAGE,
[&](protocol::writer &writer) { writer.write(*m_query_id); });
- });
-
- if (!success)
- return sql_result::AI_ERROR;
- auto reader =
std::make_unique<protocol::reader>(response.get_bytes_view());
+ auto reader =
std::make_unique<protocol::reader>(response.get_bytes_view());
+ page = std::make_unique<result_page>(std::move(response),
std::move(reader));
+ });
- page = std::make_unique<result_page>(std::move(response),
std::move(reader));
- return sql_result::AI_SUCCESS;
+ return success ? sql_result::AI_SUCCESS : sql_result::AI_ERROR;
}
sql_result data_query::make_request_resultset_meta() {
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
index cdcc5e8b05..81b14139d9 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
@@ -172,6 +172,9 @@ void sql_connection::close() {
if (m_socket) {
m_socket->close();
m_socket.reset();
+
+ m_transaction_id = std::nullopt;
+ m_transaction_empty = true;
}
}
@@ -344,10 +347,26 @@ void sql_connection::transaction_commit() {
}
sql_result sql_connection::internal_transaction_commit() {
- // TODO: IGNITE-19399: Implement transaction support
+ if (!m_transaction_id) {
+ add_status_record(sql_state::S25000_INVALID_TRANSACTION_STATE, "No
transaction to commit");
+ return sql_result::AI_ERROR;
+ }
- add_status_record(sql_state::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Transactions are not supported.");
- return sql_result::AI_ERROR;
+ LOG_MSG("Committing transaction: " << *m_transaction_id);
+
+ network::data_buffer_owning response;
+ auto success = catch_errors([&] {
+ auto response = sync_request(
+ detail::client_operation::TX_COMMIT, [&](protocol::writer &writer)
{ writer.write(*m_transaction_id); });
+ });
+
+ if (!success)
+ return sql_result::AI_ERROR;
+
+ m_transaction_id = std::nullopt;
+ m_transaction_empty = true;
+
+ return sql_result::AI_SUCCESS;
}
void sql_connection::transaction_rollback() {
@@ -355,10 +374,75 @@ void sql_connection::transaction_rollback() {
}
sql_result sql_connection::internal_transaction_rollback() {
- // TODO: IGNITE-19399: Implement transaction support
+ if (!m_transaction_id) {
+ add_status_record(sql_state::S25000_INVALID_TRANSACTION_STATE, "No
transaction to rollback");
+ return sql_result::AI_ERROR;
+ }
+
+ LOG_MSG("Rolling back transaction: " << *m_transaction_id);
+
+ network::data_buffer_owning response;
+ auto success = catch_errors([&] {
+ auto response = sync_request(
+ detail::client_operation::TX_ROLLBACK, [&](protocol::writer
&writer) { writer.write(*m_transaction_id); });
+ });
+
+ if (!success)
+ return sql_result::AI_ERROR;
+
+ m_transaction_id = std::nullopt;
+ m_transaction_empty = true;
- add_status_record(sql_state::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Transactions are not supported.");
- return sql_result::AI_ERROR;
+ return sql_result::AI_SUCCESS;
+}
+
+void sql_connection::transaction_start() {
+ LOG_MSG("Starting transaction");
+
+ network::data_buffer_owning response =
+ sync_request(detail::client_operation::TX_BEGIN, [&](protocol::writer
&writer) {
+ writer.write_bool(false); // read_only.
+ });
+
+ protocol::reader reader(response.get_bytes_view());
+ m_transaction_id = reader.read_int64();
+
+ LOG_MSG("Transaction ID: " << *m_transaction_id);
+}
+
+sql_result sql_connection::enable_autocommit() {
+ assert(!m_auto_commit);
+
+ if (m_transaction_id) {
+ sql_result res;
+ if (m_transaction_empty)
+ res = internal_transaction_rollback();
+ else
+ res = internal_transaction_commit();
+
+ if (res != sql_result::AI_SUCCESS)
+ return res;
+ }
+
+ m_transaction_id = std::nullopt;
+ m_transaction_empty = true;
+ m_auto_commit = true;
+
+ return sql_result::AI_SUCCESS;
+}
+
+sql_result sql_connection::disable_autocommit() {
+ assert(m_auto_commit);
+ assert(!m_transaction_id);
+
+ auto success = catch_errors([&] { transaction_start(); });
+ if (!success)
+ return sql_result::AI_ERROR;
+
+ m_transaction_empty = true;
+ m_auto_commit = false;
+
+ return sql_result::AI_SUCCESS;
}
void sql_connection::get_attribute(int attr, void *buf, SQLINTEGER buf_len,
SQLINTEGER *value_len) {
@@ -467,9 +551,12 @@ sql_result sql_connection::internal_set_attribute(int
attr, void *value, SQLINTE
return sql_result::AI_ERROR;
}
- m_auto_commit = mode == SQL_AUTOCOMMIT_ON;
+ auto autocommit_now = mode == SQL_AUTOCOMMIT_ON;
- break;
+ if (autocommit_now && !m_auto_commit)
+ return enable_autocommit();
+ else
+ return disable_autocommit();
}
default: {
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.h
b/modules/platforms/cpp/ignite/odbc/sql_connection.h
index 9705e61ec7..c06746a29e 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.h
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.h
@@ -202,6 +202,13 @@ public:
*/
void transaction_rollback();
+ /**
+ * Start transaction.
+ *
+ * @return Operation result.
+ */
+ void transaction_start();
+
/**
* Get connection attribute.
*
@@ -261,6 +268,20 @@ public:
return receive_message(req_id);
}
+ /**
+ * Get transaction ID.
+ *
+ * @return Transaction ID.
+ */
+ [[nodiscard]] std::optional<std::int64_t> get_transaction_id() const {
return m_transaction_id; }
+
+ /**
+ * Mark transaction non-empty.
+ *
+ * After this call connection assumes there is at least one operation
performed with this transaction.
+ */
+ void mark_transaction_non_empty() { m_transaction_empty = false; }
+
private:
/**
* Generate and get next request ID.
@@ -345,6 +366,20 @@ private:
*/
sql_result internal_transaction_rollback();
+ /**
+ * Enable autocommit.
+ *
+ * @return Operation result.
+ */
+ sql_result enable_autocommit();
+
+ /**
+ * Disable autocommit.
+ *
+ * @return Operation result.
+ */
+ sql_result disable_autocommit();
+
/**
* Get connection attribute.
* Internal call.
@@ -446,6 +481,12 @@ private:
/** Autocommit flag. */
bool m_auto_commit{true};
+ /** Current transaction ID. */
+ std::optional<std::int64_t> m_transaction_id;
+
+ /** Current transaction empty. */
+ bool m_transaction_empty{true};
+
/** Configuration. */
configuration m_config;
diff --git a/modules/platforms/cpp/tests/odbc-test/CMakeLists.txt
b/modules/platforms/cpp/tests/odbc-test/CMakeLists.txt
index 3892434df3..ed374831f8 100644
--- a/modules/platforms/cpp/tests/odbc-test/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/odbc-test/CMakeLists.txt
@@ -30,6 +30,7 @@ set(SOURCES
meta_queries_test.cpp
queries_test.cpp
timeout_test.cpp
+ transaction_test.cpp
)
add_executable(${TARGET} ${SOURCES})
diff --git a/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
b/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
index 388739bdb7..0e325290b4 100644
--- a/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
+++ b/modules/platforms/cpp/tests/odbc-test/odbc_connection.h
@@ -80,6 +80,22 @@ public:
return SQLExecDirect(m_statement, sql.data(),
static_cast<SQLINTEGER>(sql.size()));
}
+ /**
+ * Make a certain number of retry of operation while it fails
+ *
+ * @param func Function.
+ * @param attempts Attempts number.
+ * @return @c true on success.
+ */
+ bool retry_on_fail(std::function<SQLRETURN()> func, int attempts = 5) {
+ for (int i = 0; i < attempts; ++i) {
+ auto res = func();
+ if (SQL_SUCCEEDED(res))
+ return true;
+ }
+ return false;
+ }
+
/**
* Get statement error state.
*
diff --git a/modules/platforms/cpp/tests/odbc-test/odbc_test_utils.h
b/modules/platforms/cpp/tests/odbc-test/odbc_test_utils.h
index f91d0d4ac9..231774df96 100644
--- a/modules/platforms/cpp/tests/odbc-test/odbc_test_utils.h
+++ b/modules/platforms/cpp/tests/odbc-test/odbc_test_utils.h
@@ -35,8 +35,34 @@
if (!SQL_SUCCEEDED(ret))
\
FAIL() << get_odbc_error_message(type, handle)
+#define ODBC_THROW_ON_ERROR(ret, type, handle)
\
+ if (!SQL_SUCCEEDED(ret))
\
+ throw odbc_exception {
\
+ get_odbc_error_message(type, handle), get_odbc_error_state(type, handle)
\
+ }
+
namespace ignite {
+/**
+ * Utility error type for testing.
+ */
+struct odbc_exception : public std::exception {
+ /**
+ * Constructor.
+ */
+ odbc_exception(std::string message, std::string sql_state)
+ : message(std::move(message))
+ , sql_state(std::move(sql_state)) {}
+
+ /** Message. */
+ std::string message;
+
+ /** SQL state. */
+ std::string sql_state;
+
+ [[nodiscard]] char const *what() const noexcept override { return
message.c_str(); }
+};
+
constexpr size_t ODBC_BUFFER_SIZE = 1024;
/**
diff --git a/modules/platforms/cpp/tests/odbc-test/timeout_test.cpp
b/modules/platforms/cpp/tests/odbc-test/timeout_test.cpp
index afb5054dd5..3b99619919 100644
--- a/modules/platforms/cpp/tests/odbc-test/timeout_test.cpp
+++ b/modules/platforms/cpp/tests/odbc-test/timeout_test.cpp
@@ -15,13 +15,10 @@
* limitations under the License.
*/
-#include "ignite/common/config.h"
#include "odbc_suite.h"
-#include "test_utils.h"
#include <gtest/gtest.h>
-#include <algorithm>
#include <string>
#include <vector>
diff --git a/modules/platforms/cpp/tests/odbc-test/transaction_test.cpp
b/modules/platforms/cpp/tests/odbc-test/transaction_test.cpp
new file mode 100644
index 0000000000..a40264d5ef
--- /dev/null
+++ b/modules/platforms/cpp/tests/odbc-test/transaction_test.cpp
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "odbc_suite.h"
+
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include <string>
+
+using namespace ignite;
+
+/**
+ * Test setup fixture.
+ */
+struct transaction_test : public odbc_suite {
+ void SetUp() override {
+ odbc_connect(get_basic_connection_string());
+ retry_on_fail([&] { return exec_query("DELETE FROM " +
TABLE_NAME_ALL_COLUMNS_SQL); });
+ odbc_clean_up();
+ }
+
+ /**
+ * Insert test string value in cache and make all the necessary checks.
+ *
+ * @param key Key.
+ * @param value Value.
+ */
+ void insert_test_value(std::int64_t key, const std::string &value) {
insert_test_value(m_statement, key, value); }
+
+ /**
+ * Insert test string value in cache and make all the necessary checks.
+ *
+ * @param statement Statement.
+ * @param key Key.
+ * @param value Value.
+ */
+ static void insert_test_value(SQLHSTMT statement, std::int64_t key, const
std::string &value) {
+ SQLCHAR insert_req[] = "INSERT INTO TBL_ALL_COLUMNS_SQL(key, str)
VALUES(?, ?)";
+
+ SQLRETURN ret = SQLPrepare(statement, insert_req, SQL_NTS);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ char str_field[1024] = {0};
+ SQLLEN str_field_len = 0;
+
+ ret = SQLBindParameter(statement, 1, SQL_PARAM_INPUT, SQL_C_SLONG,
SQL_BIGINT, 0, 0, &key, 0, nullptr);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLBindParameter(statement, 2, SQL_PARAM_INPUT, SQL_C_CHAR,
SQL_VARCHAR, sizeof(str_field),
+ sizeof(str_field), &str_field, sizeof(str_field), &str_field_len);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ strncpy(str_field, value.c_str(), sizeof(str_field) - 1);
+ str_field_len = SQL_NTS;
+
+ ret = SQLExecute(statement);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ SQLLEN affected = 0;
+ ret = SQLRowCount(statement, &affected);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ EXPECT_EQ(affected, 1);
+
+ ret = SQLMoreResults(statement);
+
+ if (ret != SQL_NO_DATA)
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ reset_statement(statement);
+ }
+
+ /**
+ * Update test string value in cache and make all the necessary checks.
+ *
+ * @param key Key.
+ * @param value Value.
+ */
+ void update_test_value(std::int64_t key, const std::string &value) {
update_test_value(m_statement, key, value); }
+
+ /**
+ * Update test string value in cache and make all the necessary checks.
+ *
+ * @param statement Statement.
+ * @param key Key.
+ * @param value Value.
+ */
+ static void update_test_value(SQLHSTMT statement, std::int64_t key, const
std::string &value) {
+ SQLCHAR update_req[] = "UPDATE TBL_ALL_COLUMNS_SQL SET str=? WHERE
key=?";
+
+ SQLRETURN ret = SQLPrepare(statement, update_req, SQL_NTS);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ char str_field[1024] = {0};
+ SQLLEN str_field_len = 0;
+
+ ret = SQLBindParameter(statement, 1, SQL_PARAM_INPUT, SQL_C_CHAR,
SQL_VARCHAR, sizeof(str_field),
+ sizeof(str_field), &str_field, sizeof(str_field), &str_field_len);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLBindParameter(statement, 2, SQL_PARAM_INPUT, SQL_C_SLONG,
SQL_BIGINT, 0, 0, &key, 0, nullptr);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ strncpy(str_field, value.c_str(), sizeof(str_field) - 1);
+ str_field_len = SQL_NTS;
+
+ ret = SQLExecute(statement);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ SQLLEN affected = 0;
+ ret = SQLRowCount(statement, &affected);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ EXPECT_EQ(affected, 1);
+
+ ret = SQLMoreResults(statement);
+
+ if (ret != SQL_NO_DATA)
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ reset_statement(statement);
+ }
+
+ /**
+ * Delete test string value.
+ *
+ * @param key Key.
+ */
+ void delete_test_value(std::int64_t key) { delete_test_value(m_statement,
key); }
+
+ /**
+ * Delete test string value.
+ *
+ * @param statement Statement.
+ * @param key Key.
+ */
+ static void delete_test_value(SQLHSTMT statement, std::int64_t key) {
+ SQLCHAR delete_req[] = "DELETE FROM TBL_ALL_COLUMNS_SQL WHERE key=?";
+
+ SQLRETURN ret = SQLPrepare(statement, delete_req, SQL_NTS);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLBindParameter(statement, 1, SQL_PARAM_INPUT, SQL_C_SLONG,
SQL_BIGINT, 0, 0, &key, 0, nullptr);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLExecute(statement);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ SQLLEN affected = 0;
+ ret = SQLRowCount(statement, &affected);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ EXPECT_EQ(affected, 1);
+
+ ret = SQLMoreResults(statement);
+
+ if (ret != SQL_NO_DATA)
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ reset_statement(statement);
+ }
+
+ /**
+ * Selects and checks the value.
+ *
+ * @param key Key.
+ * @param expect Expected value.
+ */
+ void check_test_value(std::int64_t key, const std::string &expect) {
check_test_value(m_statement, key, expect); }
+
+ /**
+ * Selects and checks the value.
+ *
+ * @param statement Statement.
+ * @param key Key.
+ * @param expect Expected value.
+ */
+ static void check_test_value(SQLHSTMT statement, std::int64_t key,
std::optional<std::string_view> expect) {
+ // Just selecting everything to make sure everything is OK
+ SQLCHAR selectReq[] = "SELECT str FROM TBL_ALL_COLUMNS_SQL WHERE key =
?";
+
+ char str_field[1024] = {0};
+ SQLLEN str_field_len = 0;
+
+ SQLRETURN ret = SQLBindCol(statement, 1, SQL_C_CHAR, &str_field,
sizeof(str_field), &str_field_len);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLBindParameter(statement, 1, SQL_PARAM_INPUT, SQL_C_SLONG,
SQL_BIGINT, 0, 0, &key, 0, nullptr);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLExecDirect(statement, selectReq, sizeof(selectReq));
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLFetch(statement);
+
+ if (expect) {
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+ EXPECT_EQ(std::string(str_field, str_field_len), expect);
+
+ ret = SQLFetch(statement);
+ }
+
+ if (ret != SQL_NO_DATA)
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLMoreResults(statement);
+
+ if (ret != SQL_NO_DATA)
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ reset_statement(statement);
+ }
+
+ /**
+ * Selects and checks that value is absent.
+ *
+ * @param statement Statement.
+ * @param key Key.
+ */
+ static void check_no_test_value(SQLHSTMT statement, std::int64_t key) {
check_test_value(statement, key, {}); }
+
+ /**
+ * Selects and checks that value is absent.
+ *
+ * @param key Key.
+ */
+ void check_no_test_value(std::int64_t key) {
check_no_test_value(m_statement, key); }
+
+ /**
+ * Reset statement state.
+ */
+ void reset_statement() { reset_statement(m_statement); }
+
+ /**
+ * Reset statement state.
+ *
+ * @param statement Statement.
+ */
+ static void reset_statement(SQLHSTMT statement) {
+ SQLRETURN ret = SQLFreeStmt(statement, SQL_RESET_PARAMS);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+
+ ret = SQLFreeStmt(statement, SQL_UNBIND);
+
+ ODBC_THROW_ON_ERROR(ret, SQL_HANDLE_STMT, statement);
+ }
+};
+
+TEST_F(transaction_test, transaction_connection_commit) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_COMMIT);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_connection_rollback_insert) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_no_test_value(42);
+}
+
+TEST_F(transaction_test, transaction_connection_rollback_update_1) {
+ odbc_connect(get_basic_connection_string());
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ update_test_value(42, "Other");
+
+ check_test_value(42, "Other");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_connection_rollback_update_2) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_COMMIT);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(42, "Some");
+
+ update_test_value(42, "Other");
+
+ check_test_value(42, "Other");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_connection_rollback_delete_1) {
+ odbc_connect(get_basic_connection_string());
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ delete_test_value(42);
+
+ check_no_test_value(42);
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_connection_rollback_delete_2) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_COMMIT);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(42, "Some");
+
+ delete_test_value(42);
+
+ check_no_test_value(42);
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_environment_commit) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_COMMIT);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_environment_rollback_insert) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_ENV, m_env, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_no_test_value(42);
+}
+
+TEST_F(transaction_test, transaction_environment_rollback_update_1) {
+ odbc_connect(get_basic_connection_string());
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ update_test_value(42, "Other");
+
+ check_test_value(42, "Other");
+
+ ret = SQLEndTran(SQL_HANDLE_ENV, m_env, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_environment_rollback_update_2) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_COMMIT);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_test_value(42, "Some");
+
+ update_test_value(42, "Other");
+
+ check_test_value(42, "Other");
+
+ ret = SQLEndTran(SQL_HANDLE_ENV, m_env, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_environment_rollback_delete_1) {
+ odbc_connect(get_basic_connection_string());
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ delete_test_value(42);
+
+ check_no_test_value(42);
+
+ ret = SQLEndTran(SQL_HANDLE_ENV, m_env, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_environment_rollback_delete_2) {
+ odbc_connect(get_basic_connection_string());
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ insert_test_value(42, "Some");
+
+ check_test_value(42, "Some");
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_COMMIT);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_test_value(42, "Some");
+
+ delete_test_value(42);
+
+ check_no_test_value(42);
+
+ ret = SQLEndTran(SQL_HANDLE_ENV, m_env, SQL_ROLLBACK);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_ENV, m_env);
+
+ check_test_value(42, "Some");
+}
+
+TEST_F(transaction_test, transaction_error) {
+ odbc_connect(get_basic_connection_string());
+
+ insert_test_value(1, "test_1");
+
+ SQLRETURN ret = SQLSetConnectAttr(m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_test_value(1, "test_1");
+ check_no_test_value(2);
+
+ odbc_connection conn2;
+ conn2.odbc_connect(get_basic_connection_string());
+
+ ret = SQLSetConnectAttr(conn2.m_conn, SQL_ATTR_AUTOCOMMIT,
SQL_AUTOCOMMIT_OFF, 0);
+
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, conn2.m_conn);
+
+ EXPECT_THROW(
+ {
+ try {
+ insert_test_value(conn2.m_statement, 2, "test_2");
+ } catch (const odbc_exception &err) {
+ EXPECT_THAT(err.message, testing::HasSubstr("Failed to acquire
a lock due to a conflict"));
+ // TODO: IGNITE-19944 Propagate SQL errors from engine to
driver
+ EXPECT_EQ(err.sql_state, "HY000");
+ throw;
+ }
+ },
+ odbc_exception);
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, m_conn, SQL_COMMIT);
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, conn2.m_conn);
+
+ reset_statement();
+
+ insert_test_value(conn2.m_statement, 2, "test_2");
+
+ reset_statement(conn2.m_statement);
+
+ ret = SQLEndTran(SQL_HANDLE_DBC, conn2.m_conn, SQL_ROLLBACK);
+ ODBC_FAIL_ON_ERROR(ret, SQL_HANDLE_DBC, m_conn);
+
+ check_no_test_value(2);
+ check_no_test_value(conn2.m_statement, 2);
+}