This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new dab1c8d3ad IGNITE-19204 Implement ODBC connection establishment (#2149)
dab1c8d3ad is described below
commit dab1c8d3ad074b329e2313db95a7d9c7c58c753e
Author: Igor Sapego <[email protected]>
AuthorDate: Wed Jun 14 11:05:45 2023 +0400
IGNITE-19204 Implement ODBC connection establishment (#2149)
---
modules/platforms/cpp/CMakeLists.txt | 4 +
.../ignite/client/detail/cluster_connection.cpp | 4 +-
.../cpp/ignite/client/detail/cluster_connection.h | 4 +-
.../ignite/client/detail/ignite_client_impl.cpp | 2 +-
.../cpp/ignite/client/detail/node_connection.cpp | 8 +-
.../ignite/client/ignite_client_configuration.h | 28 +-
.../cpp/ignite/client/network/cluster_node.h | 6 +-
modules/platforms/cpp/ignite/common/bits.h | 1 +
modules/platforms/cpp/ignite/common/end_point.h | 2 +-
modules/platforms/cpp/ignite/common/ignite_error.h | 4 +-
.../cpp/ignite/network/detail/linux/sockets.cpp | 40 ++-
.../cpp/ignite/network/detail/linux/sockets.h | 15 +-
.../network/detail/linux/tcp_socket_client.h | 248 ++++++++++++++++
.../platforms/cpp/ignite/network/detail/utils.h | 23 ++
.../cpp/ignite/network/detail/win/sockets.cpp | 50 +++-
.../cpp/ignite/network/detail/win/sockets.h | 21 +-
.../ignite/network/detail/win/tcp_socket_client.h | 245 +++++++++++++++
modules/platforms/cpp/ignite/network/network.cpp | 7 +
modules/platforms/cpp/ignite/network/network.h | 6 +
.../platforms/cpp/ignite/network/socket_client.h | 92 ++++++
modules/platforms/cpp/ignite/odbc/CMakeLists.txt | 5 +-
.../ignite/odbc/app/application_data_buffer.cpp | 1 +
.../cpp/ignite/odbc/config/config_tools.cpp | 84 +-----
.../cpp/ignite/odbc/config/config_tools.h | 13 +-
.../cpp/ignite/odbc/config/configuration.cpp | 7 +-
.../cpp/ignite/odbc/config/configuration.h | 51 +---
.../odbc/config/connection_string_parser.cpp | 4 +-
.../cpp/ignite/odbc/install/install_linux.ini | 6 +
.../cpp/ignite/odbc/install/install_win.cmd | 26 ++
modules/platforms/cpp/ignite/odbc/log.cpp | 9 +-
modules/platforms/cpp/ignite/odbc/log.h | 24 +-
modules/platforms/cpp/ignite/odbc/message.cpp | 60 ----
modules/platforms/cpp/ignite/odbc/message.h | 175 -----------
.../platforms/cpp/ignite/odbc/meta/column_meta.cpp | 2 +-
modules/platforms/cpp/ignite/odbc/odbc_error.h | 11 +-
.../platforms/cpp/ignite/odbc/sql_connection.cpp | 328 ++++++++++++++-------
modules/platforms/cpp/ignite/odbc/sql_connection.h | 132 ++-------
.../platforms/cpp/ignite/odbc/sql_statement.cpp | 31 +-
modules/platforms/cpp/ignite/odbc/utility.cpp | 2 +
.../cpp/tests/client-test/ignite_runner_suite.h | 25 +-
modules/platforms/cpp/tests/client-test/main.cpp | 119 +++-----
.../platforms/cpp/tests/odbc-test/CMakeLists.txt | 39 +++
.../cpp/tests/odbc-test/connection_test.cpp | 104 +++++++
.../cpp/tests/{client-test => odbc-test}/main.cpp | 31 +-
.../cpp/tests/test-common/ignite_runner.cpp | 8 +-
.../cpp/tests/test-common/ignite_runner.h | 39 ++-
.../platforms/cpp/tests/test-common/test_utils.cpp | 32 +-
.../platforms/cpp/tests/test-common/test_utils.h | 27 +-
48 files changed, 1459 insertions(+), 746 deletions(-)
diff --git a/modules/platforms/cpp/CMakeLists.txt
b/modules/platforms/cpp/CMakeLists.txt
index 190f01eade..15d84dd87c 100644
--- a/modules/platforms/cpp/CMakeLists.txt
+++ b/modules/platforms/cpp/CMakeLists.txt
@@ -178,6 +178,10 @@ if (${ENABLE_TESTS})
if (${ENABLE_CLIENT})
add_subdirectory(tests/client-test)
endif()
+
+ if (${ENABLE_ODBC})
+ add_subdirectory(tests/odbc-test)
+ endif()
endif()
# Source code formatting with clang-format.
diff --git a/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
b/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
index 330119afd4..a85af97d30 100644
--- a/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/cluster_connection.cpp
@@ -72,7 +72,7 @@ void cluster_connection::stop() {
pool->stop();
}
-void cluster_connection::on_connection_success(const network::end_point &addr,
uint64_t id) {
+void cluster_connection::on_connection_success(const end_point &addr, uint64_t
id) {
m_logger->log_info("Established connection with remote host " +
addr.to_string());
m_logger->log_debug("Connection ID: " + std::to_string(id));
@@ -100,7 +100,7 @@ void cluster_connection::on_connection_success(const
network::end_point &addr, u
}
}
-void cluster_connection::on_connection_error(const network::end_point &addr,
ignite_error err) {
+void cluster_connection::on_connection_error(const end_point &addr,
ignite_error err) {
m_logger->log_warning(
"Failed to establish connection with remote host " + addr.to_string()
+ ", reason: " + err.what());
diff --git a/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
b/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
index 4fa48f2941..aceba4a122 100644
--- a/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
+++ b/modules/platforms/cpp/ignite/client/detail/cluster_connection.h
@@ -281,7 +281,7 @@ private:
* @param addr Address of the new connection.
* @param id Connection ID.
*/
- void on_connection_success(const network::end_point &addr, uint64_t id)
override;
+ void on_connection_success(const end_point &addr, uint64_t id) override;
/**
* Callback that called on error during connection establishment.
@@ -289,7 +289,7 @@ private:
* @param addr Connection address.
* @param err Error.
*/
- void on_connection_error(const network::end_point &addr, ignite_error err)
override;
+ void on_connection_error(const end_point &addr, ignite_error err) override;
/**
* Callback that called on error during connection establishment.
diff --git a/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.cpp
index 9e1e324e0e..22d68e6e57 100644
--- a/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/ignite_client_impl.cpp
@@ -36,7 +36,7 @@ void
ignite_client_impl::get_cluster_nodes_async(ignite_callback<std::vector<clu
auto port = protocol::unpack_object<std::int32_t>(fields.ptr[3]);
nodes.emplace_back(
- std::move(id), std::move(name),
network::end_point{std::move(host), std::uint16_t(port)});
+ std::move(id), std::move(name), end_point{std::move(host),
std::uint16_t(port)});
});
return nodes;
diff --git a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
index fc232c93d3..1f232d8026 100644
--- a/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/node_connection.cpp
@@ -118,11 +118,11 @@ ignite_result<void>
node_connection::process_handshake_rsp(bytes_view msg) {
protocol::reader reader(msg);
- auto verMajor = reader.read_int16();
- auto verMinor = reader.read_int16();
- auto verPatch = reader.read_int16();
+ auto ver_major = reader.read_int16();
+ auto ver_minor = reader.read_int16();
+ auto ver_patch = reader.read_int16();
- protocol_version ver(verMajor, verMinor, verPatch);
+ protocol_version ver(ver_major, ver_minor, ver_patch);
m_logger->log_debug("Server-side protocol version: " + ver.to_string());
// We now only support a single version
diff --git a/modules/platforms/cpp/ignite/client/ignite_client_configuration.h
b/modules/platforms/cpp/ignite/client/ignite_client_configuration.h
index 0e28a30556..749fad3984 100644
--- a/modules/platforms/cpp/ignite/client/ignite_client_configuration.h
+++ b/modules/platforms/cpp/ignite/client/ignite_client_configuration.h
@@ -48,6 +48,14 @@ public:
ignite_client_configuration(std::initializer_list<std::string_view>
endpoints)
: m_endpoints(endpoints.begin(), endpoints.end()) {}
+ /**
+ * Constructor.
+ *
+ * @param endpoints Endpoints list.
+ */
+ ignite_client_configuration(std::vector<std::string> endpoints) //
NOLINT(google-explicit-constructor)
+ : m_endpoints(std::move(endpoints)) {}
+
/**
* Get endpoints.
*
@@ -63,11 +71,8 @@ public:
* Examples of supported formats:
* - 192.168.1.25 - Default port is used, see DEFAULT_PORT;
* - 192.168.1.25:780 - Custom port;
- * - 192.168.1.25:780..787 - Custom port range - ports are checked from
- * lesser to greater until an open port is found;
* - my-host.com - Default port is used, see DEFAULT_PORT;
* - my-host.com:780 - Custom port;
- * - my-host.com:780..787 - Custom port range.
*
* Default is "localhost"
*
@@ -77,6 +82,23 @@ public:
ignite_client_configuration::m_endpoints.assign(endpoints.begin(),
endpoints.end());
}
+ /**
+ * Set endpoints.
+ *
+ * Examples of supported formats:
+ * - 192.168.1.25 - Default port is used, see DEFAULT_PORT;
+ * - 192.168.1.25:780 - Custom port;
+ * - my-host.com - Default port is used, see DEFAULT_PORT;
+ * - my-host.com:780 - Custom port;
+ *
+ * Default is "localhost"
+ *
+ * @param endpoints Endpoints.
+ */
+ void set_endpoints(std::vector<std::string> endpoints) {
+ ignite_client_configuration::m_endpoints = std::move(endpoints);
+ }
+
/**
* Get logger.
*
diff --git a/modules/platforms/cpp/ignite/client/network/cluster_node.h
b/modules/platforms/cpp/ignite/client/network/cluster_node.h
index 408bf000f2..e2e23cfb29 100644
--- a/modules/platforms/cpp/ignite/client/network/cluster_node.h
+++ b/modules/platforms/cpp/ignite/client/network/cluster_node.h
@@ -41,7 +41,7 @@ public:
* @param name Name.
* @param address Address.
*/
- cluster_node(std::string id, std::string name, network::end_point address)
+ cluster_node(std::string id, std::string name, end_point address)
: m_id(std::move(id))
, m_name(std::move(name))
, m_address(std::move(address)) {}
@@ -65,7 +65,7 @@ public:
*
* @return Node address.
*/
- [[nodiscard]] const network::end_point &get_address() const { return
m_address; }
+ [[nodiscard]] const end_point &get_address() const { return m_address; }
/**
* compare to another instance.
@@ -94,7 +94,7 @@ private:
std::string m_name{};
/** Address. */
- network::end_point m_address{};
+ end_point m_address{};
};
/**
diff --git a/modules/platforms/cpp/ignite/common/bits.h
b/modules/platforms/cpp/ignite/common/bits.h
index 1b374d5eaf..0d06973fd3 100644
--- a/modules/platforms/cpp/ignite/common/bits.h
+++ b/modules/platforms/cpp/ignite/common/bits.h
@@ -22,6 +22,7 @@
#include <array>
#include <climits>
#include <cassert>
+#include <cstdint>
#include <type_traits>
/**
diff --git a/modules/platforms/cpp/ignite/common/end_point.h
b/modules/platforms/cpp/ignite/common/end_point.h
index 5592b657b0..7482b1cdd4 100644
--- a/modules/platforms/cpp/ignite/common/end_point.h
+++ b/modules/platforms/cpp/ignite/common/end_point.h
@@ -20,7 +20,7 @@
#include <cstdint>
#include <string>
-namespace ignite::network {
+namespace ignite {
/**
* Connection end point structure.
diff --git a/modules/platforms/cpp/ignite/common/ignite_error.h
b/modules/platforms/cpp/ignite/common/ignite_error.h
index cbcd1d18d6..15ce9df543 100644
--- a/modules/platforms/cpp/ignite/common/ignite_error.h
+++ b/modules/platforms/cpp/ignite/common/ignite_error.h
@@ -74,10 +74,10 @@ public:
* @param message Message.
* @param cause Error cause.
*/
- explicit ignite_error(status_code statusCode, std::string message,
std::exception_ptr cause) noexcept
+ explicit ignite_error(status_code statusCode, std::string message, const
std::exception_ptr& cause) noexcept
: m_status_code(statusCode)
, m_message(std::move(message))
- , m_cause(std::move(cause)) {} //
NOLINT(bugprone-throw-keyword-missing)
+ , m_cause(cause) {} // NOLINT(bugprone-throw-keyword-missing)
/**
* Get error message.
diff --git a/modules/platforms/cpp/ignite/network/detail/linux/sockets.cpp
b/modules/platforms/cpp/ignite/network/detail/linux/sockets.cpp
index 0ffda4f768..cc959050b2 100644
--- a/modules/platforms/cpp/ignite/network/detail/linux/sockets.cpp
+++ b/modules/platforms/cpp/ignite/network/detail/linux/sockets.cpp
@@ -15,7 +15,8 @@
* limitations under the License.
*/
-#include "sockets.h"
+#include "ignite/network/detail/linux/sockets.h"
+#include "ignite/network/socket_client.h"
#include <cerrno>
#include <cstring>
@@ -25,6 +26,7 @@
#include <netdb.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
+#include <poll.h>
namespace ignite::network::detail {
@@ -118,6 +120,42 @@ void try_set_socket_options(int socket_fd, int buf_size,
bool no_delay, bool out
socket_fd, IPPROTO_TCP, TCP_KEEPINTVL, reinterpret_cast<char
*>(&idle_retry_opt), sizeof(idle_retry_opt));
}
+int wait_on_socket(int socket, std::int32_t timeout, bool rd)
+{
+ int32_t timeout0 = timeout == 0 ? -1 : timeout;
+
+ int lastError = 0;
+ int ret;
+
+ do
+ {
+ struct pollfd fds[1];
+
+ fds[0].fd = socket;
+ fds[0].events = rd ? POLLIN : POLLOUT;
+
+ ret = poll(fds, 1, timeout0 * 1000);
+
+ if (ret == SOCKET_ERROR)
+ lastError = errno;
+
+ } while (ret == SOCKET_ERROR && lastError == EINTR);
+
+ if (ret == SOCKET_ERROR)
+ return -lastError;
+
+ socklen_t size = sizeof(lastError);
+ int res = getsockopt(socket, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char*>(&lastError), &size);
+
+ if (res != SOCKET_ERROR && lastError != 0)
+ return -lastError;
+
+ if (ret == 0)
+ return socket_client::wait_result::TIMEOUT;
+
+ return socket_client::wait_result::SUCCESS;
+}
+
bool set_non_blocking_mode(int socket_fd, bool non_blocking) {
int flags = fcntl(socket_fd, F_GETFL, 0);
if (flags == -1)
diff --git a/modules/platforms/cpp/ignite/network/detail/linux/sockets.h
b/modules/platforms/cpp/ignite/network/detail/linux/sockets.h
index 64380b56b9..d128f10843 100644
--- a/modules/platforms/cpp/ignite/network/detail/linux/sockets.h
+++ b/modules/platforms/cpp/ignite/network/detail/linux/sockets.h
@@ -20,7 +20,9 @@
#include <cstdint>
#include <string>
-#define SOCKET_ERROR (-1)
+#ifndef SOCKET_ERROR
+# define SOCKET_ERROR (-1)
+#endif // SOCKET_ERROR
namespace ignite::network::detail {
@@ -48,6 +50,17 @@ std::string get_last_socket_error_message();
*/
void try_set_socket_options(int socket_fd, int buf_size, bool no_delay, bool
out_of_band, bool keep_alive);
+/**
+ * Wait on the socket for any event for specified time.
+ * This function uses poll to achieve timeout functionality for every separate
socket operation.
+ *
+ * @param socket Socket handle.
+ * @param timeout Timeout.
+ * @param rd Wait for read if @c true, or for write if @c false.
+ * @return -errno on error, wait_result::TIMEOUT on timeout and
wait_result::SUCCESS on success.
+ */
+int wait_on_socket(int socket, std::int32_t timeout, bool rd);
+
/**
* Set non blocking mode for socket.
*
diff --git
a/modules/platforms/cpp/ignite/network/detail/linux/tcp_socket_client.h
b/modules/platforms/cpp/ignite/network/detail/linux/tcp_socket_client.h
new file mode 100644
index 0000000000..36aa83be8b
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/linux/tcp_socket_client.h
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/network/detail/linux/sockets.h"
+#include "ignite/network/detail/utils.h"
+#include "ignite/network/socket_client.h"
+
+#include <cstdint>
+#include <sstream>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+
+
+namespace ignite::network
+{
+
+/**
+ * Socket client implementation.
+ */
+class tcp_socket_client : public socket_client
+{
+public:
+ // Delete
+ tcp_socket_client(tcp_socket_client &&) = delete;
+ tcp_socket_client(const tcp_socket_client &) = delete;
+ tcp_socket_client &operator=(tcp_socket_client &&) = delete;
+ tcp_socket_client &operator=(const tcp_socket_client &) = delete;
+
+ /** Buffers size */
+ enum { BUFFER_SIZE = 0x10000 };
+
+ // Default
+ tcp_socket_client() = default;
+
+ /**
+ * Destructor.
+ */
+ ~tcp_socket_client() override {
+ internal_close();
+ }
+
+ /**
+ * Establish connection with remote TCP service.
+ *
+ * @param hostname Remote host name.
+ * @param port TCP service port.
+ * @param timeout Timeout.
+ * @return True on success.
+ */
+ bool connect(const char* hostname, std::uint16_t port, std::int32_t
timeout) override
+ {
+ internal_close();
+
+ addrinfo hints{};
+
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+
+ std::stringstream converter;
+ converter << port;
+ std::string str_port = converter.str();
+
+ // Resolve the server address and port
+ addrinfo *result{NULL}; // NOLINT(modernize-use-nullptr)
+ int res = getaddrinfo(hostname, str_port.c_str(), &hints, &result);
+
+ if (res != 0)
+ throw ignite_error(status_code::NETWORK, "Can not resolve host: "
+ std::string(hostname) + ":" + str_port
+ + ", error_code=" + std::to_string(res));
+
+ std::vector<addrinfo*> shuffled = detail::shuffle_addresses(result);
+
+ std::string last_err_msg = "Failed to resolve host";
+ bool is_timeout = false;
+
+ // Attempt to connect to an address until one succeeds
+ for (auto *addr : shuffled)
+ {
+ last_err_msg = "Failed to establish connection with the host";
+ is_timeout = false;
+
+ // Create a SOCKET for connecting to server
+ m_socket_handle = socket(addr->ai_family, addr->ai_socktype,
addr->ai_protocol);
+
+ if (m_socket_handle == SOCKET_ERROR)
+ throw ignite_error(status_code::OS, "Socket creation failed: "
+ detail::get_last_socket_error_message());
+
+ detail::try_set_socket_options(m_socket_handle, BUFFER_SIZE, true,
true, true);
+
+ m_blocking = !detail::set_non_blocking_mode(m_socket_handle, true);
+
+ // Connect to server.
+ res = ::connect(m_socket_handle, addr->ai_addr,
static_cast<int>(addr->ai_addrlen));
+ if (SOCKET_ERROR == res)
+ {
+ int last_error = errno;
+
+ if (last_error != EWOULDBLOCK && last_error != EINPROGRESS)
+ {
+ last_err_msg.append(":
").append(detail::get_socket_error_message(last_error));
+ close();
+
+ continue;
+ }
+
+ res = wait_on_socket(timeout, false);
+
+ if (res < 0 || res == wait_result::TIMEOUT)
+ {
+ is_timeout = true;
+ close();
+
+ continue;
+ }
+ }
+
+ break;
+ }
+
+ freeaddrinfo(result);
+
+ if (m_socket_handle == SOCKET_ERROR)
+ {
+ if (is_timeout)
+ return false;
+
+ throw ignite_error(status_code::NETWORK, last_err_msg);
+ }
+
+ return true;
+ }
+
+ /**
+ * Close established connection.
+ */
+ void close() override
+ {
+ internal_close();
+ }
+
+ /**
+ * Send data by established connection.
+ *
+ * @param data Pointer to data to be sent.
+ * @param size Size of the data in bytes.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been sent on success,
+ * wait_result::TIMEOUT on timeout and -errno on failure.
+ */
+ int send(const std::byte* data, std::size_t size, std::int32_t timeout)
override
+ {
+ if (!m_blocking)
+ {
+ int res = wait_on_socket(timeout, false);
+
+ if (res < 0 || res == wait_result::TIMEOUT)
+ return res;
+ }
+
+ return int(::send(m_socket_handle, reinterpret_cast<const
char*>(data), static_cast<int>(size), 0));
+ }
+
+ /**
+ * Receive data from established connection.
+ *
+ * @param buffer Pointer to data buffer.
+ * @param size Size of the buffer in bytes.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been received on success,
+ * wait_result::TIMEOUT on timeout and -errno on failure.
+ */
+ int receive(std::byte* buffer, std::size_t size, std::int32_t timeout)
override
+ {
+ if (!m_blocking)
+ {
+ int res = wait_on_socket(timeout, true);
+
+ if (res < 0 || res == wait_result::TIMEOUT)
+ return res;
+ }
+
+ return int(::recv(m_socket_handle, reinterpret_cast<char*>(buffer),
static_cast<int>(size), 0));
+ }
+
+ /**
+ * Check if the socket is m_blocking or not.
+ * @return @c true if the socket is m_blocking and false otherwise.
+ */
+ [[nodiscard]] bool is_blocking() const override
+ {
+ return m_blocking;
+ }
+
+private:
+ /**
+ * Close established connection.
+ */
+ void internal_close()
+ {
+ if (m_socket_handle != SOCKET_ERROR)
+ {
+ ::close(m_socket_handle);
+ m_socket_handle = SOCKET_ERROR;
+ }
+ }
+
+ /**
+ * Wait on the socket for any event for specified time.
+ * This function uses poll to achieve timeout functionality for every
separate socket operation.
+ *
+ * @param timeout Timeout.
+ * @param rd Wait for read if @c true, or for write if @c false.
+ * @return -errno on error, wait_result::TIMEOUT on timeout and
wait_result::SUCCESS on success.
+ */
+ int wait_on_socket(std::int32_t timeout, bool rd) //
NOLINT(readability-make-member-function-const)
+ {
+ return detail::wait_on_socket(m_socket_handle, timeout, rd);
+ }
+
+ /** Handle. */
+ int m_socket_handle{SOCKET_ERROR};
+
+ /** Blocking flag. */
+ bool m_blocking{true};
+};
+
+}
diff --git a/modules/platforms/cpp/ignite/network/detail/utils.h
b/modules/platforms/cpp/ignite/network/detail/utils.h
index 0909b19a7b..9cbb5e5a4b 100644
--- a/modules/platforms/cpp/ignite/network/detail/utils.h
+++ b/modules/platforms/cpp/ignite/network/detail/utils.h
@@ -19,9 +19,12 @@
#include <ignite/common/ignite_error.h>
+#include <algorithm>
#include <sstream>
#include <string>
#include <string_view>
+#include <vector>
+#include <random>
namespace ignite::network::detail {
@@ -104,4 +107,24 @@ inline void throw_last_system_error(std::string_view
description, std::string_vi
throw ignite_error(status_code::OS, get_last_system_error(description,
advice));
}
+/**
+ * Shuffle addresses randomly.
+ *
+ * @param addrsIn Addresses.
+ * @return Randomly shuffled addresses.
+ */
+template<typename Addrinfo>
+std::vector<Addrinfo*> shuffle_addresses(Addrinfo* addrsIn) {
+ std::vector<Addrinfo*> res;
+
+ for (Addrinfo *it = addrsIn; it != NULL; it = it->ai_next)
+ res.push_back(it);
+
+ std::random_device device;
+ std::mt19937 generator(device());
+ std::shuffle(res.begin(), res.end(), generator);
+
+ return res;
+}
+
} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/win/sockets.cpp
b/modules/platforms/cpp/ignite/network/detail/win/sockets.cpp
index ca4931ed97..7d3560bb79 100644
--- a/modules/platforms/cpp/ignite/network/detail/win/sockets.cpp
+++ b/modules/platforms/cpp/ignite/network/detail/win/sockets.cpp
@@ -17,7 +17,8 @@
#include "sockets.h"
-#include "../utils.h"
+#include "ignite/network/detail/utils.h"
+#include "ignite/network/socket_client.h"
#include <mutex>
#include <sstream>
@@ -85,7 +86,7 @@ void try_set_socket_options(SOCKET socket, int buf_size, BOOL
no_delay, BOOL out
// TODO: IGNITE-17606 Disable keep-alive once heartbeats are implemented.
if (keep_alive) {
if (SOCKET_ERROR == res) {
- // There is no sense in configuring keep alive params if we failed
to set up keep alive mode.
+ // There is no sense in configuring keep alive params if we failed
to set up keep-alive mode.
return;
}
@@ -123,6 +124,12 @@ void try_set_socket_options(SOCKET socket, int buf_size,
BOOL no_delay, BOOL out
}
}
+bool set_non_blocking_mode(SOCKET socket_handle, bool non_blocking)
+{
+ ULONG opt = non_blocking ? TRUE : FALSE;
+ return ::ioctlsocket(socket_handle, FIONBIO, &opt) != SOCKET_ERROR;
+}
+
void init_wsa() {
static std::mutex init_mutex;
static bool network_inited = false;
@@ -141,4 +148,43 @@ void init_wsa() {
}
}
+int wait_on_socket(SOCKET socket, std::int32_t timeout, bool rd)
+{
+ int ready;
+ int last_error{0};
+
+ fd_set fds;
+
+ do {
+ timeval tv{};
+
+ tv.tv_sec = timeout;
+
+ FD_ZERO(&fds);
+ FD_SET(socket, &fds);
+
+ fd_set* readFds = 0;
+ fd_set* writeFds = 0;
+
+ if (rd)
+ readFds = &fds;
+ else
+ writeFds = &fds;
+
+ ready = select(static_cast<int>(socket) + 1, readFds, writeFds, NULL,
timeout == 0 ? NULL : &tv);
+
+ if (ready == SOCKET_ERROR)
+ last_error = WSAGetLastError();
+
+ } while (ready == SOCKET_ERROR && last_error == WSAEINTR);
+
+ if (ready == SOCKET_ERROR)
+ return -last_error;
+
+ if (ready == 0)
+ return socket_client::wait_result::TIMEOUT;
+
+ return socket_client::wait_result::SUCCESS;
+}
+
} // namespace ignite::network::detail
diff --git a/modules/platforms/cpp/ignite/network/detail/win/sockets.h
b/modules/platforms/cpp/ignite/network/detail/win/sockets.h
index 348a989bb4..0586cb3166 100644
--- a/modules/platforms/cpp/ignite/network/detail/win/sockets.h
+++ b/modules/platforms/cpp/ignite/network/detail/win/sockets.h
@@ -18,7 +18,7 @@
#pragma once
#define WIN32_LEAN_AND_MEAN
-#define _WINSOCKAPI_
+#define _WINSOCKAPI_ // NOLINT(bugprone-reserved-identifier)
// clang-format off
#include <windows.h>
@@ -55,6 +55,25 @@ std::string get_last_socket_error_message();
*/
void try_set_socket_options(SOCKET socket, int buf_size, BOOL no_delay, BOOL
out_of_band, BOOL keep_alive);
+/**
+ * Set non blocking mode for socket.
+ *
+ * @param socket_handle Socket file descriptor.
+ * @param non_blocking Non-blocking mode.
+ */
+bool set_non_blocking_mode(SOCKET socket_handle, bool non_blocking);
+
+/**
+ * Wait on the socket for any event for specified time.
+ * This function uses poll to achieve timeout functionality for every separate
socket operation.
+ *
+ * @param socket Socket handle.
+ * @param timeout Timeout.
+ * @param rd Wait for read if @c true, or for write if @c false.
+ * @return -errno on error, wait_result::TIMEOUT on timeout and
wait_result::SUCCESS on success.
+ */
+int wait_on_socket(SOCKET socket, std::int32_t timeout, bool rd);
+
/**
* Init windows sockets.
*
diff --git
a/modules/platforms/cpp/ignite/network/detail/win/tcp_socket_client.h
b/modules/platforms/cpp/ignite/network/detail/win/tcp_socket_client.h
new file mode 100644
index 0000000000..0ba35c07b4
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/detail/win/tcp_socket_client.h
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/network/detail/win/sockets.h"
+
+#include <cstdint>
+#include <sstream>
+
+#include "ignite/network/detail/utils.h"
+#include "ignite/network/socket_client.h"
+
+namespace ignite::network
+{
+
+/**
+ * Socket client implementation.
+ */
+class tcp_socket_client : public socket_client
+{
+public:
+ // Delete
+ tcp_socket_client(tcp_socket_client &&) = delete;
+ tcp_socket_client(const tcp_socket_client &) = delete;
+ tcp_socket_client &operator=(tcp_socket_client &&) = delete;
+ tcp_socket_client &operator=(const tcp_socket_client &) = delete;
+
+ /** Buffers size */
+ enum { BUFFER_SIZE = 0x10000 };
+
+ // Default
+ tcp_socket_client() = default;
+
+ /**
+ * Destructor.
+ */
+ ~tcp_socket_client() override {
+ internal_close();
+ }
+
+ /**
+ * Establish connection with remote TCP service.
+ *
+ * @param hostname Remote host name.
+ * @param port TCP service port.
+ * @param timeout Timeout.
+ * @return True on success.
+ */
+ bool connect(const char* hostname, std::uint16_t port, std::int32_t
timeout) override
+ {
+ detail::init_wsa();
+
+ internal_close();
+
+ addrinfo hints = { 0 };
+
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = IPPROTO_TCP;
+
+ std::stringstream converter;
+ converter << port;
+ std::string str_port = converter.str();
+
+ // Resolve the server address and port
+ addrinfo *result{NULL}; // NOLINT(modernize-use-nullptr)
+ int res = getaddrinfo(hostname, str_port.c_str(), &hints, &result);
+
+ if (res != 0)
+ throw ignite_error(status_code::NETWORK, "Can not resolve host: "
+ std::string(hostname) + ":" + str_port
+ + ", error_code=" + std::to_string(res));
+
+ std::vector<addrinfo*> shuffled = detail::shuffle_addresses(result);
+
+ std::string last_err_msg = "Failed to resolve host";
+ bool is_timeout = false;
+
+ // Attempt to connect to an address until one succeeds
+ for (auto *addr : shuffled)
+ {
+ last_err_msg = "Failed to establish connection with the host";
+ is_timeout = false;
+
+ // Create a SOCKET for connecting to server
+ m_socket_handle = socket(addr->ai_family, addr->ai_socktype,
addr->ai_protocol);
+
+ if (m_socket_handle == INVALID_SOCKET)
+ throw ignite_error(status_code::OS, "Socket creation failed: "
+ detail::get_last_socket_error_message());
+
+ detail::try_set_socket_options(m_socket_handle, BUFFER_SIZE, TRUE,
TRUE, TRUE);
+
+ m_blocking = !detail::set_non_blocking_mode(m_socket_handle, true);
+
+ // Connect to server.
+ res = ::connect(m_socket_handle, addr->ai_addr,
static_cast<int>(addr->ai_addrlen));
+ if (SOCKET_ERROR == res)
+ {
+ int last_error = WSAGetLastError();
+
+ if (last_error != WSAEWOULDBLOCK)
+ {
+ last_err_msg.append(":
").append(detail::get_socket_error_message(last_error));
+ close();
+
+ continue;
+ }
+
+ res = wait_on_socket(timeout, false);
+
+ if (res < 0 || res == wait_result::TIMEOUT)
+ {
+ is_timeout = true;
+ close();
+
+ continue;
+ }
+ }
+
+ break;
+ }
+
+ freeaddrinfo(result);
+
+ if (m_socket_handle == INVALID_SOCKET)
+ {
+ if (is_timeout)
+ return false;
+
+ throw ignite_error(status_code::NETWORK, last_err_msg);
+ }
+
+ return true;
+ }
+
+ /**
+ * Close established connection.
+ */
+ void close() override
+ {
+ internal_close();
+ }
+
+ /**
+ * Send data by established connection.
+ *
+ * @param data Pointer to data to be sent.
+ * @param size Size of the data in bytes.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been sent on success,
+ * wait_result::TIMEOUT on timeout and -errno on failure.
+ */
+ int send(const std::byte* data, std::size_t size, std::int32_t timeout)
override
+ {
+ if (!m_blocking)
+ {
+ int res = wait_on_socket(timeout, false);
+
+ if (res < 0 || res == wait_result::TIMEOUT)
+ return res;
+ }
+
+ return ::send(m_socket_handle, reinterpret_cast<const char*>(data),
static_cast<int>(size), 0);
+ }
+
+ /**
+ * Receive data from established connection.
+ *
+ * @param buffer Pointer to data buffer.
+ * @param size Size of the buffer in bytes.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been received on success,
+ * wait_result::TIMEOUT on timeout and -errno on failure.
+ */
+ int receive(std::byte* buffer, std::size_t size, std::int32_t timeout)
override
+ {
+ if (!m_blocking)
+ {
+ int res = wait_on_socket(timeout, true);
+
+ if (res < 0 || res == wait_result::TIMEOUT)
+ return res;
+ }
+
+ return ::recv(m_socket_handle, reinterpret_cast<char*>(buffer),
static_cast<int>(size), 0);
+ }
+
+ /**
+ * Check if the socket is m_blocking or not.
+ * @return @c true if the socket is m_blocking and false otherwise.
+ */
+ [[nodiscard]] bool is_blocking() const override
+ {
+ return m_blocking;
+ }
+
+private:
+ /**
+ * Close established connection.
+ */
+ void internal_close()
+ {
+ if (m_socket_handle != INVALID_SOCKET)
+ {
+ closesocket(m_socket_handle);
+
+ m_socket_handle = INVALID_SOCKET;
+ }
+ }
+
+ /**
+ * Wait on the socket for any event for specified time.
+ * This function uses poll to achieve timeout functionality for every
separate socket operation.
+ *
+ * @param timeout Timeout.
+ * @param rd Wait for read if @c true, or for write if @c false.
+ * @return -errno on error, wait_result::TIMEOUT on timeout and
wait_result::SUCCESS on success.
+ */
+ int wait_on_socket(std::int32_t timeout, bool rd) //
NOLINT(readability-make-member-function-const)
+ {
+ return detail::wait_on_socket(m_socket_handle, timeout, rd);
+ }
+
+ /** Handle. */
+ SOCKET m_socket_handle{INVALID_SOCKET};
+
+ /** Blocking flag. */
+ bool m_blocking{true};
+};
+
+}
diff --git a/modules/platforms/cpp/ignite/network/network.cpp
b/modules/platforms/cpp/ignite/network/network.cpp
index a466875377..530746d574 100644
--- a/modules/platforms/cpp/ignite/network/network.cpp
+++ b/modules/platforms/cpp/ignite/network/network.cpp
@@ -23,12 +23,19 @@
#ifdef _WIN32
# include "detail/win/win_async_client_pool.h"
+# include "detail/win/tcp_socket_client.h"
#else
# include "detail/linux/linux_async_client_pool.h"
+# include "detail/linux/tcp_socket_client.h"
#endif
namespace ignite::network {
+std::unique_ptr<socket_client> make_tcp_socket_client()
+{
+ return std::make_unique<tcp_socket_client>();
+}
+
std::shared_ptr<async_client_pool> make_async_client_pool(data_filters
filters) {
auto pool =
std::make_shared<IGNITE_SWITCH_WIN_OTHER(detail::win_async_client_pool,
detail::linux_async_client_pool)>();
diff --git a/modules/platforms/cpp/ignite/network/network.h
b/modules/platforms/cpp/ignite/network/network.h
index 5a92f3c41d..5722e9710e 100644
--- a/modules/platforms/cpp/ignite/network/network.h
+++ b/modules/platforms/cpp/ignite/network/network.h
@@ -19,11 +19,17 @@
#include <ignite/network/async_client_pool.h>
#include <ignite/network/data_filter.h>
+#include <ignite/network/socket_client.h>
#include <string>
namespace ignite::network {
+/**
+ * Make basic TCP socket.
+ */
+std::unique_ptr<socket_client> make_tcp_socket_client();
+
/**
* Make asynchronous client pool.
*
diff --git a/modules/platforms/cpp/ignite/network/socket_client.h
b/modules/platforms/cpp/ignite/network/socket_client.h
new file mode 100644
index 0000000000..f0f3b55c26
--- /dev/null
+++ b/modules/platforms/cpp/ignite/network/socket_client.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+
+namespace ignite::network
+{
+
+/**
+ * Socket client implementation.
+ */
+class socket_client
+{
+public:
+ /**
+ * Non-negative timeout operation result.
+ */
+ enum wait_result
+ {
+ /** Timeout. */
+ TIMEOUT = 0,
+
+ /** Success. */
+ SUCCESS = 1
+ };
+
+ // Default
+ virtual ~socket_client() = default;
+
+ /**
+ * Establish connection with remote service.
+ *
+ * @param hostname Remote host name.
+ * @param port Service port.
+ * @param timeout Timeout.
+ * @return @c true on success and @c false on timeout.
+ */
+ virtual bool connect(const char* hostname, std::uint16_t port,
std::int32_t timeout) = 0;
+
+ /**
+ * Close established connection.
+ */
+ virtual void close() = 0;
+
+ /**
+ * Send data by established connection.
+ *
+ * @param data Pointer to data to be sent.
+ * @param size Size of the data in bytes.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been sent on success,
+ * wait_result::TIMEOUT on timeout and -errno on failure.
+ */
+ virtual int send(const std::byte* data, std::size_t size, std::int32_t
timeout) = 0;
+
+ /**
+ * Receive data from established connection.
+ *
+ * @param buffer Pointer to data buffer.
+ * @param size Size of the buffer in bytes.
+ * @param timeout Timeout.
+ * @return Number of bytes that have been received on success,
+ * wait_result::TIMEOUT on timeout and -errno on failure.
+ */
+ virtual int receive(std::byte* buffer, std::size_t size, std::int32_t
timeout) = 0;
+
+ /**
+ * Check if the socket is blocking or not.
+ *
+ * @return @c true if the socket is blocking and false otherwise.
+ */
+ [[nodiscard]] virtual bool is_blocking() const = 0;
+};
+
+} // namespace ignite::network
diff --git a/modules/platforms/cpp/ignite/odbc/CMakeLists.txt
b/modules/platforms/cpp/ignite/odbc/CMakeLists.txt
index 970b36960c..3018076822 100644
--- a/modules/platforms/cpp/ignite/odbc/CMakeLists.txt
+++ b/modules/platforms/cpp/ignite/odbc/CMakeLists.txt
@@ -15,7 +15,7 @@
# limitations under the License.
#
-project(ignite-odbc)
+project(ignite3-odbc)
set(TARGET ${PROJECT_NAME})
@@ -42,7 +42,6 @@ set(SOURCES
sql_environment.cpp
sql_statement.cpp
protocol_version.cpp
- message.cpp
type_traits.cpp
utility.cpp
log.cpp
@@ -68,8 +67,6 @@ if (WIN32)
if (MSVC_VERSION GREATER_EQUAL 1900)
target_link_libraries(${TARGET} legacy_stdio_definitions)
endif()
-
- set_target_properties(${TARGET} PROPERTIES OUTPUT_NAME "ignite-odbc")
else()
target_link_libraries(${TARGET} odbcinst)
endif()
diff --git a/modules/platforms/cpp/ignite/odbc/app/application_data_buffer.cpp
b/modules/platforms/cpp/ignite/odbc/app/application_data_buffer.cpp
index c7ee729f53..2191a5d1fb 100644
--- a/modules/platforms/cpp/ignite/odbc/app/application_data_buffer.cpp
+++ b/modules/platforms/cpp/ignite/odbc/app/application_data_buffer.cpp
@@ -25,6 +25,7 @@
#include <algorithm>
#include <string>
#include <sstream>
+#include <cstring>
namespace
{
diff --git a/modules/platforms/cpp/ignite/odbc/config/config_tools.cpp
b/modules/platforms/cpp/ignite/odbc/config/config_tools.cpp
index 55aff44923..5393db9eb8 100644
--- a/modules/platforms/cpp/ignite/odbc/config/config_tools.cpp
+++ b/modules/platforms/cpp/ignite/odbc/config/config_tools.cpp
@@ -26,7 +26,7 @@
namespace ignite {
-std::string addresses_to_string(const std::vector<network::tcp_range>&
addresses)
+std::string addresses_to_string(const std::vector<end_point>& addresses)
{
std::stringstream stream;
@@ -45,12 +45,12 @@ std::string addresses_to_string(const
std::vector<network::tcp_range>& addresses
return stream.str();
}
-void parse_address(const std::string& value, std::vector<network::tcp_range>&
end_points,
- diagnostic_record_storage* diag)
+std::vector<end_point> parse_address(const std::string& value,
diagnostic_record_storage* diag)
{
std::size_t addr_num = std::count(value.begin(), value.end(), ',') + 1;
- end_points.reserve(end_points.size() + addr_num);
+ std::vector<end_point> end_points;
+ end_points.reserve(addr_num);
std::string parsed_addr(value);
@@ -70,12 +70,10 @@ void parse_address(const std::string& value,
std::vector<network::tcp_range>& en
if (!addr.empty())
{
- network::tcp_range end_point;
-
- bool success = parse_single_address(addr, end_point, diag);
-
+ end_point ep;
+ bool success = parse_single_address(addr, ep, diag);
if (success)
- end_points.push_back(end_point);
+ end_points.push_back(ep);
}
if (!addr_begin_pos)
@@ -83,16 +81,18 @@ void parse_address(const std::string& value,
std::vector<network::tcp_range>& en
parsed_addr.erase(addr_begin_pos - 1);
}
+
+ return end_points;
}
-bool parse_single_address(const std::string& value, network::tcp_range&
end_point, diagnostic_record_storage* diag)
+bool parse_single_address(const std::string& value, end_point &addr,
diagnostic_record_storage* diag)
{
std::int64_t colon_num = std::count(value.begin(), value.end(), ':');
if (colon_num == 0)
{
- end_point.host = value;
- end_point.port = configuration::default_value::port;
+ addr.host = value;
+ addr.port = configuration::default_value::port;
return true;
}
@@ -111,7 +111,7 @@ bool parse_single_address(const std::string& value,
network::tcp_range& end_poin
}
std::size_t colon_pos = value.find(':');
- end_point.host = value.substr(0, colon_pos);
+ addr.host = value.substr(0, colon_pos);
if (colon_pos == value.size() - 1)
{
@@ -124,63 +124,11 @@ bool parse_single_address(const std::string& value,
network::tcp_range& end_poin
return false;
}
- std::string port_range = value.substr(colon_pos + 1);
-
- if (!parse_port_range(port_range, end_point.port, end_point.range, diag))
- return false;
-
- return true;
-}
-
-bool parse_port_range(const std::string& value, std::uint16_t& port,
std::uint16_t& range,
- diagnostic_record_storage* diag)
-{
- std::size_t sep_pos = value.find('.');
- if (sep_pos == std::string::npos)
- {
- range = 0;
- port = parse_port(value, diag);
-
- if (!port)
- return false;
-
- return true;
- }
-
- if (sep_pos + 2 > value.size() || value[sep_pos + 1] != '.')
- {
- std::stringstream stream;
- stream << "Unexpected number of '.' characters in the following
address: '" << value << "'. Ignoring address.";
-
- if (diag)
- diag->add_status_record(sql_state::S01S02_OPTION_VALUE_CHANGED,
stream.str());
-
- return false;
- }
-
- std::uint16_t range_begin = parse_port(value.substr(0, sep_pos), diag);
- if (!range_begin)
- return false;
-
- std::uint16_t range_end = parse_port(value.substr(sep_pos + 2), diag);
- if (!range_end)
+ std::string port_str = value.substr(colon_pos + 1);
+ addr.port = parse_port(port_str, diag);
+ if (!addr.port)
return false;
- if (range_end < range_begin)
- {
- std::stringstream stream;
- stream << "Port range end is less than port range begin in the
following address: '"
- << value << "'. Ignoring address.";
-
- if (diag)
- diag->add_status_record(sql_state::S01S02_OPTION_VALUE_CHANGED,
stream.str());
-
- return false;
- }
-
- port = range_begin;
- range = range_end - range_begin;
-
return true;
}
diff --git a/modules/platforms/cpp/ignite/odbc/config/config_tools.h
b/modules/platforms/cpp/ignite/odbc/config/config_tools.h
index 98e05c4429..7be5409791 100644
--- a/modules/platforms/cpp/ignite/odbc/config/config_tools.h
+++ b/modules/platforms/cpp/ignite/odbc/config/config_tools.h
@@ -17,7 +17,7 @@
#pragma once
-#include "ignite/network/tcp_range.h"
+#include "ignite/common/end_point.h"
#include <string>
#include <vector>
@@ -34,27 +34,26 @@ class diagnostic_record_storage;
* @param addresses Addresses.
* @return Resulting string.
*/
-std::string addresses_to_string(const std::vector<network::tcp_range>&
addresses);
+std::string addresses_to_string(const std::vector<end_point>& addresses);
/**
* Parse address.
*
* @param value String value to parse.
- * @param end_points End points list.
* @param diag Diagnostics collector.
+ * @return End points list.
*/
-void parse_address(const std::string& value, std::vector<network::tcp_range>&
end_points,
- diagnostic_record_storage* diag);
+std::vector<end_point> parse_address(const std::string& value,
diagnostic_record_storage* diag);
/**
* Parse single address.
*
* @param value String value to parse.
- * @param end_point End pont.
+ * @param addr End pont.
* @param diag Diagnostics collector.
* @return @c true, if parsed successfully, and @c false otherwise.
*/
-bool parse_single_address(const std::string& value, network::tcp_range&
end_point, diagnostic_record_storage* diag);
+bool parse_single_address(const std::string& value, end_point& addr,
diagnostic_record_storage* diag);
/**
* Parse single network port.
diff --git a/modules/platforms/cpp/ignite/odbc/config/configuration.cpp
b/modules/platforms/cpp/ignite/odbc/config/configuration.cpp
index b7d4a77aab..211cca27ff 100644
--- a/modules/platforms/cpp/ignite/odbc/config/configuration.cpp
+++ b/modules/platforms/cpp/ignite/odbc/config/configuration.cpp
@@ -25,7 +25,6 @@
namespace ignite {
-const protocol_version
configuration::default_value::protocol_version{protocol_version::get_current()};
const std::string configuration::default_value::driver{"Apache Ignite"};
const std::string configuration::default_value::address{};
const std::int32_t configuration::default_value::page_size{1024};
@@ -66,12 +65,12 @@ void configuration::set_driver(const std::string& driver)
m_driver.set_value(driver);
}
-const std::vector<network::tcp_range>& configuration::get_addresses() const
+const std::vector<end_point>& configuration::get_addresses() const
{
return m_end_points.get_value();
}
-void configuration::set_addresses(const std::vector<network::tcp_range>&
end_points)
+void configuration::set_addresses(const std::vector<end_point>& end_points)
{
m_end_points.set_value(end_points);
}
@@ -119,7 +118,7 @@ void configuration::add_to_map(argument_map& map, const
std::string& key, const
template<>
void configuration::add_to_map(argument_map& map, const std::string& key,
- const settable_value< std::vector<network::tcp_range> >& value)
+ const settable_value< std::vector<end_point> >& value)
{
if (value.is_set())
map[key] = addresses_to_string(value.get_value());
diff --git a/modules/platforms/cpp/ignite/odbc/config/configuration.h
b/modules/platforms/cpp/ignite/odbc/config/configuration.h
index 84253fb8ee..e81a5f0118 100644
--- a/modules/platforms/cpp/ignite/odbc/config/configuration.h
+++ b/modules/platforms/cpp/ignite/odbc/config/configuration.h
@@ -21,7 +21,7 @@
#include "ignite/odbc/protocol_version.h"
#include "ignite/odbc/ssl_mode.h"
-#include "ignite/network/tcp_range.h"
+#include "ignite/common/end_point.h"
#include <cstdint>
#include <string>
@@ -43,9 +43,6 @@ public:
/** Default values for configuration. */
struct default_value
{
- /** Default value for Driver attribute. */
- static const protocol_version protocol_version;
-
/** Default value for Driver attribute. */
static const std::string driver;
@@ -63,8 +60,7 @@ public:
* Default constructor.
*/
configuration()
- : m_protocol_version(default_value::protocol_version)
- , m_driver(default_value::driver)
+ : m_driver(default_value::driver)
, m_page_size(default_value::page_size)
, m_end_points({}) { }
@@ -75,36 +71,6 @@ public:
*/
[[nodiscard]] std::string to_connection_string() const;
- /**
- * Get protocol version.
- *
- * @return Protocol version.
- */
- [[nodiscard]] protocol_version get_protocol_version() const
- {
- return m_protocol_version.get_value();
- }
-
- /**
- * Set protocol version.
- *
- * @param version Version to set.
- */
- void set_protocol_version(const protocol_version& version)
- {
- this->m_protocol_version.set_value(version);
- }
-
- /**
- * Check if the value set.
- *
- * @return @true if the value set.
- */
- [[nodiscard]] bool is_protocol_version_set() const
- {
- return m_protocol_version.is_set();
- }
-
/**
* Get Driver.
*
@@ -124,14 +90,14 @@ public:
*
* @return Addresses.
*/
- [[nodiscard]] const std::vector<network::tcp_range>& get_addresses() const;
+ [[nodiscard]] const std::vector<end_point>& get_addresses() const;
/**
* Set addresses to connect to.
*
* @param end_points Addresses.
*/
- void set_addresses(const std::vector<network::tcp_range>& end_points);
+ void set_addresses(const std::vector<end_point>& end_points);
/**
* Check if the value set.
@@ -179,9 +145,6 @@ private:
template<typename T>
static void add_to_map(argument_map& map, const std::string& key, const
settable_value<T>& value);
- /** Protocol version. */
- settable_value<protocol_version> m_protocol_version;
-
/** Driver name. */
settable_value<std::string> m_driver;
@@ -189,7 +152,7 @@ private:
settable_value<int32_t> m_page_size;
/** Connection end-points. */
- settable_value< std::vector<network::tcp_range> > m_end_points;
+ settable_value< std::vector<end_point> > m_end_points;
};
template<>
@@ -201,7 +164,7 @@ void configuration::add_to_map<int32_t>(argument_map& map,
const std::string& ke
const settable_value<int32_t>& value);
template<>
-void configuration::add_to_map< std::vector<network::tcp_range>
>(argument_map& map, const std::string& key,
- const settable_value< std::vector<network::tcp_range> >& value);
+void configuration::add_to_map< std::vector<end_point> >(argument_map& map,
const std::string& key,
+ const settable_value< std::vector<end_point> >& value);
} // namespace ignite
diff --git
a/modules/platforms/cpp/ignite/odbc/config/connection_string_parser.cpp
b/modules/platforms/cpp/ignite/odbc/config/connection_string_parser.cpp
index e9d0e90991..22ae515dde 100644
--- a/modules/platforms/cpp/ignite/odbc/config/connection_string_parser.cpp
+++ b/modules/platforms/cpp/ignite/odbc/config/connection_string_parser.cpp
@@ -100,9 +100,7 @@ void connection_string_parser::handle_attribute_pair(const
std::string &key, con
if (lower_key == key::address)
{
- std::vector<network::tcp_range> end_points;
-
- parse_address(value, end_points, diag);
+ std::vector<end_point> end_points = parse_address(value, diag);
m_cfg.set_addresses(end_points);
}
diff --git a/modules/platforms/cpp/ignite/odbc/install/install_linux.ini
b/modules/platforms/cpp/ignite/odbc/install/install_linux.ini
new file mode 100644
index 0000000000..d59e72c508
--- /dev/null
+++ b/modules/platforms/cpp/ignite/odbc/install/install_linux.ini
@@ -0,0 +1,6 @@
+[Apache Ignite 3]
+Description=Apache Ignite 3
+Driver=/usr/local/lib/libignite3-odbc.so
+Setup=/usr/local/lib/libignite3-odbc.so
+DriverODBCVer=03.00
+FileUsage=0
diff --git a/modules/platforms/cpp/ignite/odbc/install/install_win.cmd
b/modules/platforms/cpp/ignite/odbc/install/install_win.cmd
new file mode 100644
index 0000000000..03a2be40ee
--- /dev/null
+++ b/modules/platforms/cpp/ignite/odbc/install/install_win.cmd
@@ -0,0 +1,26 @@
+@echo off
+
+set ODBC_AMD64=%1
+
+if [%ODBC_AMD64%] == [] (
+ echo error: driver path is not specified. Call format: install_win
abs_path_to_64_bit_driver
+ pause
+ exit /b 1
+)
+
+if exist %ODBC_AMD64% (
+ for %%i IN (%ODBC_AMD64%) DO IF EXIST %%~si\NUL (
+ echo warning: The path you have specified seems to be a
directory. Note that you have to specify path to driver file itself instead.
+ )
+ echo Installing 64-bit driver: %ODBC_AMD64%
+ reg add "HKEY_LOCAL_MACHINE\SOFTWARE\ODBC\ODBCINST.INI\Apache Ignite"
/v DriverODBCVer /t REG_SZ /d "03.80" /f
+ reg add "HKEY_LOCAL_MACHINE\SOFTWARE\ODBC\ODBCINST.INI\Apache Ignite"
/v UsageCount /t REG_DWORD /d 00000001 /f
+ reg add "HKEY_LOCAL_MACHINE\SOFTWARE\ODBC\ODBCINST.INI\Apache Ignite"
/v Driver /t REG_SZ /d %ODBC_AMD64% /f
+ reg add "HKEY_LOCAL_MACHINE\SOFTWARE\ODBC\ODBCINST.INI\Apache Ignite"
/v Setup /t REG_SZ /d %ODBC_AMD64% /f
+ reg add "HKEY_LOCAL_MACHINE\SOFTWARE\ODBC\ODBCINST.INI\ODBC Drivers" /v
"Apache Ignite 3" /t REG_SZ /d "Installed" /f
+) else (
+ echo 64-bit driver can not be found: %ODBC_AMD64%
+ echo Call format: install_win abs_path_to_64_bit_driver
+ pause
+ exit /b 1
+)
diff --git a/modules/platforms/cpp/ignite/odbc/log.cpp
b/modules/platforms/cpp/ignite/odbc/log.cpp
index d9e5555506..495e4036bc 100644
--- a/modules/platforms/cpp/ignite/odbc/log.cpp
+++ b/modules/platforms/cpp/ignite/odbc/log.cpp
@@ -27,7 +27,9 @@ log_stream::~log_stream() {
m_logger->write_message(m_string_buf.str());
}
-odbc_logger::odbc_logger(const char* path) {
+odbc_logger::odbc_logger(const char* path, bool trace_enabled)
+ : m_trace_enabled(trace_enabled)
+{
if (path)
m_stream.open(path);
}
@@ -44,8 +46,9 @@ void odbc_logger::write_message(std::string const& message) {
}
odbc_logger* odbc_logger::get() {
- const char* env_var_name = "IGNITE3_ODBC_LOG_PATH";
- static odbc_logger logger(getenv(env_var_name));
+ const char* env_var_path = "IGNITE3_ODBC_LOG_PATH";
+ const char* env_var_trace = "IGNITE3_ODBC_LOG_TRACE_ENABLED";
+ static odbc_logger logger(getenv(env_var_path), getenv(env_var_trace) !=
nullptr);
return logger.is_enabled() ? &logger : nullptr;
}
diff --git a/modules/platforms/cpp/ignite/odbc/log.h
b/modules/platforms/cpp/ignite/odbc/log.h
index 8fc86bd70a..88a0ee5761 100644
--- a/modules/platforms/cpp/ignite/odbc/log.h
+++ b/modules/platforms/cpp/ignite/odbc/log.h
@@ -30,6 +30,15 @@
} \
} while (false)
+#define TRACE_MSG(param) \
+ do { \
+ ignite::odbc_logger* p = ignite::odbc_logger::get(); \
+ if (p && p->is_trace_enabled()) { \
+ ignite::log_stream lstream(p); \
+ lstream << __FUNCTION__ << ": " << param; \
+ } \
+ } while (false)
+
namespace ignite
{
@@ -108,6 +117,15 @@ public:
*/
[[nodiscard]] bool is_enabled() const;
+ /**
+ * Checks if tracing is enabled.
+ *
+ * @return True, if tracing is enabled.
+ */
+ [[nodiscard]] bool is_trace_enabled() const {
+ return m_trace_enabled;
+ }
+
/**
* Outputs the message to log file
* @param message The message to write
@@ -119,8 +137,9 @@ private:
* Constructor.
*
* @param path to log file.
+ * @param trace_enabled Tracing enabled.
*/
- explicit odbc_logger(const char* path);
+ odbc_logger(const char* path, bool trace_enabled);
/**
* Destructor.
@@ -132,6 +151,9 @@ private:
/** File stream. */
std::ofstream m_stream{};
+
+ /** Trace enabled. */
+ bool m_trace_enabled{false};
};
} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/odbc/message.cpp
b/modules/platforms/cpp/ignite/odbc/message.cpp
deleted file mode 100644
index d45bea42a2..0000000000
--- a/modules/platforms/cpp/ignite/odbc/message.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "ignite/odbc/message.h"
-
-namespace ignite
-{
-
-void handshake_request::write(protocol::writer &writer, const
protocol_version&) const
-{
- writer.write(std::int8_t(request_type::HANDSHAKE));
-
- protocol_version version = m_config.get_protocol_version();
- writer.write(version.get_major());
- writer.write(version.get_minor());
- writer.write(version.get_maintenance());
-
- writer.write(std::int8_t(client_type::ODBC));
-}
-
-void response::read(protocol::reader &reader, const protocol_version& ver)
-{
- m_status = response_status(reader.read_int32());
-
- if (m_status == response_status::SUCCESS)
- read_on_success(reader, ver);
- else
- m_error = reader.read_string();
-}
-
-void handshake_response::read(protocol::reader &reader, const
protocol_version&)
-{
- m_accepted = reader.read_bool();
-
- if (!m_accepted)
- {
- auto major = reader.read_int16();
- auto minor = reader.read_int16();
- auto maintenance = reader.read_int16();
-
- m_current_ver = protocol_version(major, minor, maintenance);
- m_error = reader.read_string();
- }
-}
-
-} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/odbc/message.h
b/modules/platforms/cpp/ignite/odbc/message.h
deleted file mode 100644
index dbd87bc806..0000000000
--- a/modules/platforms/cpp/ignite/odbc/message.h
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#pragma once
-
-#include "ignite/odbc/app/parameter_set.h"
-#include "ignite/odbc/config/configuration.h"
-#include "ignite/odbc/meta/column_meta.h"
-#include "ignite/odbc/meta/table_meta.h"
-#include "ignite/odbc/protocol_version.h"
-
-#include "ignite/protocol/reader.h"
-#include "ignite/protocol/writer.h"
-
-#include <cstdint>
-#include <string>
-
-namespace ignite
-{
-
-enum class client_type
-{
- ODBC = 2
-};
-
-enum class request_type
-{
- HANDSHAKE = 1
-};
-
-/**
- * Handshake request.
- */
-class handshake_request
-{
-public:
- /**
- * Constructor.
- *
- * @param config Configuration.
- */
- explicit handshake_request(const configuration& config)
- : m_config(config) {}
-
- /**
- * Write request using provided writer.
- *
- * @param writer Writer.
- */
- void write(protocol::writer &writer, const protocol_version&) const;
-
-private:
- /** Configuration. */
- const configuration& m_config;
-};
-
-/**
- * General response.
- */
-class response
-{
-public:
- // Default
- response() = default;
- virtual ~response() = default;
-
- /**
- * Read response using provided reader.
- *
- * @param reader Reader.
- * @param ver Protocol version.
- */
- void read(protocol::reader& reader, const protocol_version& ver);
-
- /**
- * Get request processing status.
- *
- * @return Status.
- */
- [[nodiscard]] response_status get_state() const
- {
- return m_status;
- }
-
- /**
- * Get resulting error.
- * @return Error.
- */
- [[nodiscard]] const std::string& get_error() const
- {
- return m_error;
- }
-
-protected:
- /**
- * Read data if response status is response_status::SUCCESS.
- */
- virtual void read_on_success(protocol::reader&, const protocol_version&) {
}
-
-private:
- /** Request processing status. */
- response_status m_status{response_status::UNKNOWN_ERROR};
-
- /** Error message. */
- std::string m_error;
-};
-
-/**
- * Handshake response.
- */
-class handshake_response
-{
-public:
- // Default
- handshake_response() = default;
-
- /**
- * Check if the handshake has been accepted.
- * @return True if the handshake has been accepted.
- */
- [[nodiscard]] bool is_accepted() const
- {
- return m_accepted;
- }
-
- /**
- * Get optional error.
- * @return Optional error message.
- */
- [[nodiscard]] const std::string& get_error() const
- {
- return m_error;
- }
-
- /**
- * Current host Apache Ignite version.
- * @return Current host Apache Ignite version.
- */
- [[nodiscard]] const protocol_version& get_current_ver() const
- {
- return m_current_ver;
- }
-
- /**
- * Read response using provided reader.
- * @param reader Reader.
- */
- void read(protocol::reader &reader, const protocol_version&);
-
-private:
- /** Handshake accepted. */
- bool m_accepted{false};
-
- /** Node's protocol version. */
- protocol_version m_current_ver;
-
- /** Optional error message. */
- std::string m_error;
-};
-
-} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/odbc/meta/column_meta.cpp
b/modules/platforms/cpp/ignite/odbc/meta/column_meta.cpp
index 52177e26a8..fe4568a8bc 100644
--- a/modules/platforms/cpp/ignite/odbc/meta/column_meta.cpp
+++ b/modules/platforms/cpp/ignite/odbc/meta/column_meta.cpp
@@ -101,7 +101,7 @@ SQLLEN nullability_to_sql(nullability value)
return SQL_NULLABLE_UNKNOWN;
}
-void column_meta::read(protocol::reader &reader, const protocol_version &ver)
+void column_meta::read(protocol::reader &reader, const protocol_version &)
{
m_schema_name = reader.read_string();
m_table_name = reader.read_string();
diff --git a/modules/platforms/cpp/ignite/odbc/odbc_error.h
b/modules/platforms/cpp/ignite/odbc/odbc_error.h
index 4f89955ad3..cb6a87f847 100644
--- a/modules/platforms/cpp/ignite/odbc/odbc_error.h
+++ b/modules/platforms/cpp/ignite/odbc/odbc_error.h
@@ -19,6 +19,7 @@
#include <string>
#include <utility>
+#include <exception>
#include "common_types.h"
@@ -27,9 +28,8 @@ namespace ignite
/**
* ODBC error.
- * TODO: Derive from std::exception
*/
-class odbc_error
+class odbc_error : public std::exception
{
public:
// Default
@@ -41,7 +41,7 @@ public:
* @param state SQL state.
* @param message Error message.
*/
- odbc_error(sql_state state, std::string message)
+ odbc_error(sql_state state, std::string message) noexcept
: m_state(state)
, m_message(std::move(message)) { }
@@ -65,6 +65,11 @@ public:
return m_message;
}
+ /**
+ * Get error message.
+ */
+ [[nodiscard]] char const *what() const noexcept override { return
m_message.c_str(); }
+
private:
/** Status. */
sql_state m_state{sql_state::UNKNOWN};
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
index 7d95c5fe9e..e33f293236 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.cpp
@@ -19,33 +19,59 @@
#include "ignite/odbc/config/configuration.h"
#include "ignite/odbc/config/connection_string_parser.h"
#include "ignite/odbc/log.h"
-#include "ignite/odbc/message.h"
#include "ignite/odbc/sql_environment.h"
#include "ignite/odbc/sql_statement.h"
#include "ignite/odbc/ssl_mode.h"
#include "ignite/odbc/utility.h"
#include <ignite/network/network.h>
+#include <ignite/common/bytes.h>
-#include <sstream>
#include <algorithm>
#include <cstring>
#include <cstddef>
+#include <random>
+#include <sstream>
-// Uncomment for per-byte debug.
-// TODO: Change to m_env variable
-//#define PER_BYTE_DEBUG
+constexpr const std::size_t PROTOCOL_HEADER_SIZE = 4;
namespace
{
-#pragma pack(push, 1)
- struct odbc_protocol_header
+
+/**
+ * Get hex dump of binary data in string form.
+ * @param data Data.
+ * @param count Number of bytes.
+ * @return Hex dump string.
+ */
+std::string hex_dump(const void* data, std::size_t count)
+{
+ std::stringstream dump;
+ std::size_t cnt = 0;
+ auto bytes = (const std::uint8_t*)data;
+ for (auto *p = bytes, *e = bytes + count; p != e; ++p)
{
- std::int32_t len;
- };
-#pragma pack(pop)
+ if (cnt++ % 16 == 0)
+ {
+ dump << std::endl;
+ }
+ dump << std::hex << std::setfill('0') << std::setw(2) << (int)*p << "
";
+ }
+ return dump.str();
+}
+
}
+std::vector<ignite::end_point> collect_addresses(const ignite::configuration&
cfg)
+{
+ std::vector<ignite::end_point> end_points = cfg.get_addresses();
+
+ std::random_device device;
+ std::mt19937 generator(device());
+ std::shuffle(end_points.begin(), end_points.end(), generator);
+
+ return end_points;
+}
namespace ignite {
@@ -74,18 +100,17 @@ sql_result
sql_connection::internal_get_info(connection_info::info_type type, vo
return res;
}
-void sql_connection::establish(const std::string& connectStr, void*
parentWindow)
+void sql_connection::establish(const std::string& connect_str, void*
parent_window)
{
- IGNITE_ODBC_API_CALL(internal_establish(connectStr, parentWindow));
+ IGNITE_ODBC_API_CALL(internal_establish(connect_str, parent_window));
}
-sql_result sql_connection::internal_establish(const std::string& connectStr,
void* parentWindow)
+sql_result sql_connection::internal_establish(const std::string& connect_str,
void* parent_window)
{
- configuration config;
- connection_string_parser parser(config);
- parser.parse_connection_string(connectStr, &get_diagnostic_records());
+ connection_string_parser parser(m_config);
+ parser.parse_connection_string(connect_str, &get_diagnostic_records());
- if (parentWindow)
+ if (parent_window)
{
// TODO: IGNITE-19210 Implement UI for connection
add_status_record(sql_state::SHYC00_OPTIONAL_FEATURE_NOT_IMPLEMENTED,
"Connection using UI is not supported");
@@ -103,11 +128,10 @@ void sql_connection::establish(const configuration& cfg)
IGNITE_ODBC_API_CALL(internal_establish(cfg));
}
-sql_result sql_connection::init_socket()
+void sql_connection::init_socket()
{
- // TODO: IGNITE-19204 Implement connection establishment.
- add_status_record(sql_state::S08001_CANNOT_CONNECT, "Connection is not
implemented");
- return sql_result::AI_ERROR;
+ if (!m_socket)
+ m_socket = network::make_tcp_socket_client();
}
sql_result sql_connection::internal_establish(const configuration& cfg)
@@ -147,7 +171,14 @@ void sql_connection::deregister()
sql_result sql_connection::internal_release()
{
- // TODO: IGNITE-19204 Implement connection management.
+ if (!m_socket)
+ {
+ add_status_record(sql_state::S08003_NOT_CONNECTED, "Connection is not
open.");
+
+ // It is important to return SUCCESS_WITH_INFO and not ERROR here, as
if we return an error, Windows
+ // Driver Manager may decide that connection is not valid anymore
which results in memory leak.
+ return sql_result::AI_SUCCESS_WITH_INFO;
+ }
close();
@@ -156,7 +187,11 @@ sql_result sql_connection::internal_release()
void sql_connection::close()
{
- // TODO: IGNITE-19204 Implement connection management.
+ if (m_socket)
+ {
+ m_socket->close();
+ m_socket.reset();
+ }
}
sql_statement *sql_connection::create_statement()
@@ -182,49 +217,59 @@ sql_result
sql_connection::internal_create_statement(sql_statement *& statement)
return sql_result::AI_SUCCESS;
}
-bool sql_connection::send(const std::int8_t* data, size_t len, std::int32_t
timeout)
+bool sql_connection::send(const std::byte* data, std::size_t len, std::int32_t
timeout)
{
- // TODO: IGNITE-19204 Implement connection management.
+ if (!m_socket)
+ throw odbc_error(sql_state::S08003_NOT_CONNECTED, "Connection is not
established");
- auto new_len = static_cast<std::int32_t>(len +
sizeof(odbc_protocol_header));
- std::vector<std::int8_t> msg(new_len);
- auto *hdr = reinterpret_cast<odbc_protocol_header*>(msg.data());
+ auto new_len = len + PROTOCOL_HEADER_SIZE;
+ std::vector<std::byte> msg(new_len);
- // TODO: Re-factor
- hdr->len = static_cast<std::int32_t>(len);
-
- memcpy(msg.data() + sizeof(odbc_protocol_header), data, len);
+ bytes::store<endian::BIG, std::int32_t>(msg.data(), std::int32_t(len));
+ memcpy(msg.data() + PROTOCOL_HEADER_SIZE, data, len);
operation_result res = send_all(msg.data(), msg.size(), timeout);
-
if (res == operation_result::TIMEOUT)
return false;
if (res == operation_result::FAIL)
throw odbc_error(sql_state::S08S01_LINK_FAILURE, "Can not send message
due to connection failure");
-#ifdef PER_BYTE_DEBUG
- LOG_MSG("message sent: (" << msg.get_size() << " bytes)" <<
HexDump(msg.get_data(), msg.get_size()));
-#endif //PER_BYTE_DEBUG
+ TRACE_MSG("message sent: (" << msg.size() << " bytes)" <<
hex_dump(msg.data(), msg.size()));
return true;
}
-sql_connection::operation_result sql_connection::send_all(const std::int8_t*
data, size_t len, std::int32_t timeout)
+sql_connection::operation_result sql_connection::send_all(const std::byte*
data, std::size_t len, std::int32_t timeout)
{
- // TODO: IGNITE-19204 Implement connection operations.
- return operation_result::FAIL;
+ std::int64_t sent = 0;
+ while (sent != static_cast<std::int64_t>(len))
+ {
+ int res = m_socket->send(data + sent, len - sent, timeout);
+ LOG_MSG("Send result: " << res);
+
+ if (res < 0 || res == network::socket_client::wait_result::TIMEOUT)
+ {
+ close();
+ return res < 0 ? operation_result::FAIL :
operation_result::TIMEOUT;
+ }
+ sent += res;
+ }
+
+ assert(static_cast<std::size_t>(sent) == len);
+
+ return operation_result::SUCCESS;
}
-bool sql_connection::receive(std::vector<std::int8_t>& msg, std::int32_t
timeout)
+bool sql_connection::receive(std::vector<std::byte>& msg, std::int32_t timeout)
{
- // TODO: IGNITE-19204 Implement connection management.
+ if (!m_socket)
+ throw odbc_error(sql_state::S08003_NOT_CONNECTED, "Connection is not
established");
msg.clear();
- odbc_protocol_header hdr{};
-
- operation_result res = receive_all(reinterpret_cast<std::int8_t*>(&hdr),
sizeof(hdr), timeout);
+ std::byte len_buffer[PROTOCOL_HEADER_SIZE];
+ operation_result res = receive_all(&len_buffer, sizeof(len_buffer),
timeout);
if (res == operation_result::TIMEOUT)
return false;
@@ -232,38 +277,52 @@ bool sql_connection::receive(std::vector<std::int8_t>&
msg, std::int32_t timeout
if (res == operation_result::FAIL)
throw odbc_error(sql_state::S08S01_LINK_FAILURE, "Can not receive
message header");
- if (hdr.len < 0)
+ static_assert(sizeof(std::int32_t) == PROTOCOL_HEADER_SIZE);
+ std::int32_t len = bytes::load<endian::BIG, std::int32_t>(len_buffer);
+ if (len < 0)
{
close();
-
throw odbc_error(sql_state::SHY000_GENERAL_ERROR, "Protocol error:
Message length is negative");
}
- if (hdr.len == 0)
+ if (len == 0)
return false;
- msg.resize(hdr.len);
-
- res = receive_all(&msg[0], hdr.len, timeout);
-
+ msg.resize(len);
+ res = receive_all(&msg[0], len, timeout);
if (res == operation_result::TIMEOUT)
return false;
if (res == operation_result::FAIL)
throw odbc_error(sql_state::S08S01_LINK_FAILURE, "Can not receive
message body");
-#ifdef PER_BYTE_DEBUG
- LOG_MSG("Message received: " << HexDump(&msg[0], msg.size()));
-#endif //PER_BYTE_DEBUG
+ TRACE_MSG("Message received: " << hex_dump(&msg[0], msg.size()));
return true;
}
-sql_connection::operation_result sql_connection::receive_all(void* dst, size_t
len, std::int32_t timeout)
+sql_connection::operation_result sql_connection::receive_all(void* dst,
std::size_t len, std::int32_t timeout)
{
- // TODO: IGNITE-19204 Implement connection operations.
+ std::size_t remain = len;
+ auto* buffer = reinterpret_cast<std::byte*>(dst);
+
+ while (remain)
+ {
+ std::size_t received = len - remain;
+
+ int res = m_socket->receive(buffer + received, remain, timeout);
+ LOG_MSG("Receive res: " << res << ", remain: " << remain);
+
+ if (res < 0 || res == network::socket_client::wait_result::TIMEOUT)
+ {
+ close();
+ return res < 0 ? operation_result::FAIL :
operation_result::TIMEOUT;
+ }
- return operation_result::FAIL;
+ remain -= static_cast<std::size_t>(res);
+ }
+
+ return operation_result::SUCCESS;
}
const configuration&sql_connection::get_configuration() const
@@ -322,9 +381,7 @@ sql_result sql_connection::internal_get_attribute(int attr,
void* buf, SQLINTEGE
{
auto *val = reinterpret_cast<SQLUINTEGER*>(buf);
- // TODO: IGNITE-19204 Implement connection management.
- *val = SQL_CD_TRUE;
-
+ *val = m_socket ? SQL_CD_FALSE : SQL_CD_TRUE;
if (value_len)
*value_len = SQL_IS_INTEGER;
@@ -446,32 +503,88 @@ sql_result sql_connection::internal_set_attribute(int
attr, void* value, SQLINTE
sql_result sql_connection::make_request_handshake()
{
- protocol_version m_protocol_version = m_config.get_protocol_version();
+ static constexpr int8_t ODBC_CLIENT = 3;
+ m_protocol_version = protocol_version::get_current();
- if (!m_protocol_version.is_supported())
+ try
{
-
add_status_record(sql_state::S01S00_INVALID_CONNECTION_STRING_ATTRIBUTE,
- "Protocol version is not supported: " +
m_protocol_version.to_string());
+ std::vector<std::byte> message;
+ {
+ protocol::buffer_adapter buffer(message);
+ buffer.write_raw(bytes_view(protocol::MAGIC_BYTES));
- return sql_result::AI_ERROR;
- }
+ protocol::write_message_to_buffer(buffer, [&ver =
m_protocol_version](protocol::writer &writer) {
+ writer.write(ver.get_major());
+ writer.write(ver.get_minor());
+ writer.write(ver.get_maintenance());
- handshake_request req(m_config);
- handshake_response rsp;
+ writer.write(ODBC_CLIENT);
- try
- {
- // Workaround for some Linux systems that report connection on
non-blocking
- // sockets as successful but fail to establish real connection.
- bool sent = internal_sync_message(req, rsp, m_login_timeout);
+ // Features.
+ writer.write_binary_empty();
+
+ // Extensions.
+ writer.write_map_empty();
+ });
+ }
+
+ auto res = send_all(message.data(), message.size(), m_login_timeout);
+ if (res != operation_result::SUCCESS)
+ {
+ add_status_record(sql_state::S08001_CANNOT_CONNECT, "Failed to
send handshake request");
+ return sql_result::AI_ERROR;
+ }
- if (!sent)
+ message.clear();
+ message.resize(protocol::MAGIC_BYTES.size());
+
+ res = receive_all(message.data(), message.size(), m_login_timeout);
+ if (res != operation_result::SUCCESS)
{
add_status_record(sql_state::S08001_CANNOT_CONNECT,
"Failed to get handshake response (Did you forget to enable
SSL?).");
return sql_result::AI_ERROR;
}
+
+ if (!std::equal(message.begin(), message.end(),
protocol::MAGIC_BYTES.begin(), protocol::MAGIC_BYTES.end()))
+ {
+ add_status_record(sql_state::S08001_CANNOT_CONNECT, "Failed to
receive magic bytes in handshake response. "
+ "Possible reasons: wrong port number used, TLS is enabled on
server but not on client.");
+ return sql_result::AI_ERROR;
+ }
+
+ bool received = receive(message, m_login_timeout);
+ if (!received)
+ {
+ add_status_record(sql_state::S08001_CANNOT_CONNECT, "Failed to get
handshake response.");
+ return sql_result::AI_ERROR;
+ }
+
+ protocol::reader reader(message);
+
+ auto ver_major = reader.read_int16();
+ auto ver_minor = reader.read_int16();
+ auto ver_patch = reader.read_int16();
+
+ protocol_version ver(ver_major, ver_minor, ver_patch);
+ LOG_MSG("Server-side protocol version: " << ver.to_string());
+
+ // We now only support a single version
+ if (ver != protocol_version::get_current())
+ {
+ add_status_record(sql_state::S08004_CONNECTION_REJECTED,
+ "Unsupported server version: " + ver.to_string() + ".");
+ return sql_result::AI_ERROR;
+ }
+
+ auto err = protocol::read_error(reader);
+ if (err)
+ {
+ add_status_record(sql_state::S08004_CONNECTION_REJECTED,
+ "Server rejected handshake with error: " + err->what_str());
+ return sql_result::AI_ERROR;
+ }
}
catch (const odbc_error& err)
{
@@ -486,43 +599,60 @@ sql_result sql_connection::make_request_handshake()
return sql_result::AI_ERROR;
}
- if (!rsp.is_accepted())
- {
- LOG_MSG("Handshake message has been rejected.");
-
- std::stringstream constructor;
-
- constructor << "Node rejected handshake message. ";
-
- if (!rsp.get_error().empty())
- constructor << "Additional info: " << rsp.get_error() << " ";
-
- constructor << "Current version of the protocol, used by the server
node is "
- << rsp.get_current_ver().to_string() << ", "
- << "driver protocol version introduced in version "
- << m_protocol_version.to_string() << ".";
-
- add_status_record(sql_state::S08004_CONNECTION_REJECTED,
constructor.str());
-
- return sql_result::AI_ERROR;
- }
-
return sql_result::AI_SUCCESS;
}
void sql_connection::ensure_connected()
{
- // TODO: IGNITE-19204 Implement connection management.
- bool success = try_restore_connection();
+ if (!m_socket)
+ return;
+ bool success = try_restore_connection();
if (!success)
throw odbc_error(sql_state::S08001_CANNOT_CONNECT, "Failed to
establish connection with any provided hosts");
}
bool sql_connection::try_restore_connection()
{
- // TODO: IGNITE-19204 Implement connection management.
- return false;
+ std::vector<end_point> addrs = collect_addresses(m_config);
+
+ if (!m_socket)
+ init_socket();
+
+ bool connected = false;
+ while (!addrs.empty())
+ {
+ const end_point& addr = addrs.back();
+
+ connected = safe_connect(addr);
+ if (connected)
+ {
+ sql_result res = make_request_handshake();
+
+ connected = res != sql_result::AI_ERROR;
+ if (connected)
+ break;
+ }
+
+ addrs.pop_back();
+ }
+
+ if (!connected)
+ close();
+
+ return connected;
+}
+
+bool sql_connection::safe_connect(const end_point &addr)
+{
+ try {
+ return m_socket->connect(addr.host.c_str(), addr.port,
m_login_timeout);
+ } catch (const ignite_error& err) {
+ std::stringstream msgs;
+ msgs << "Error while trying connect to " << addr.host << ":" <<
addr.port <<", " << err.what_str();
+ add_status_record(sql_state::S08001_CANNOT_CONNECT, msgs.str());
+ return false;
+ }
}
std::int32_t sql_connection::retrieve_timeout(void* value)
diff --git a/modules/platforms/cpp/ignite/odbc/sql_connection.h
b/modules/platforms/cpp/ignite/odbc/sql_connection.h
index eaa0e52e35..241d4a961f 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_connection.h
+++ b/modules/platforms/cpp/ignite/odbc/sql_connection.h
@@ -22,7 +22,10 @@
#include "ignite/odbc/diagnostic/diagnosable_adapter.h"
#include "ignite/odbc/odbc_error.h"
+#include "ignite/network/socket_client.h"
#include "ignite/network/tcp_range.h"
+#include "ignite/protocol/buffer_adapter.h"
+#include "ignite/protocol/writer.h"
#include <cstdint>
#include <vector>
@@ -123,7 +126,7 @@ public:
* @return @c true on success, @c false on timeout.
* @throw odbc_error on error.
*/
- bool send(const std::int8_t* data, std::size_t len)
+ bool send(const std::byte* data, std::size_t len)
{
return send(data, len, m_timeout);
}
@@ -137,7 +140,7 @@ public:
* @return @c true on success, @c false on timeout.
* @throw odbc_error on error.
*/
- bool send(const std::int8_t* data, std::size_t len, std::int32_t timeout);
+ bool send(const std::byte* data, std::size_t len, std::int32_t timeout);
/**
* Receive next message.
@@ -147,7 +150,7 @@ public:
* @return @c true on success, @c false on timeout.
* @throw odbc_error on error.
*/
- bool receive(std::vector<std::int8_t>& msg, std::int32_t timeout);
+ bool receive(std::vector<std::byte>& msg, std::int32_t timeout);
/**
* Get configuration.
@@ -163,74 +166,6 @@ public:
*/
[[nodiscard]] bool is_auto_commit() const;
- /**
- * Synchronously send request message and receive response.
- * Uses provided timeout.
- *
- * @param req Request message.
- * @param rsp response message.
- * @param timeout Timeout. 0 means disabled.
- * @return @c true on success, @c false on timeout.
- * @throw odbc_error on error.
- */
- template<typename ReqT, typename RspT>
- bool sync_message(const ReqT& req, RspT& rsp, std::int32_t timeout)
- {
- ensure_connected();
-
- std::vector<std::int8_t> temp_buffer;
-
- // TODO: IGNITE-19204 Implement encoding and decoding of messages.
- //parser.Encode(req, temp_buffer);
-
- bool success = send(temp_buffer.data(), temp_buffer.size(), timeout);
-
- if (!success)
- return false;
-
- success = receive(temp_buffer, timeout);
-
- if (!success)
- return false;
-
- // TODO: IGNITE-19204 Implement encoding and decoding of messages.
- //parser.Decode(rsp, temp_buffer);
-
- return true;
- }
-
- /**
- * Synchronously send request message and receive response.
- * Uses connection timeout.
- *
- * @param req Request message.
- * @param rsp response message.
- * @throw odbc_error on error.
- */
- template<typename ReqT, typename RspT>
- void sync_message(const ReqT& req, RspT& rsp)
- {
- ensure_connected();
-
- std::vector<std::int8_t> temp_buffer;
-
- // TODO: IGNITE-19204 Implement encoding and decoding of messages.
- //parser.Encode(req, temp_buffer);
-
- bool success = send(temp_buffer.data(), temp_buffer.size(), m_timeout);
-
- if (!success)
- throw odbc_error(sql_state::SHYT01_CONNECTION_TIMEOUT, "Send
operation timed out");
-
- success = receive(temp_buffer, m_timeout);
-
- if (!success)
- throw odbc_error(sql_state::SHYT01_CONNECTION_TIMEOUT, "Receive
operation timed out");
-
- // TODO: IGNITE-19204 Implement encoding and decoding of messages.
- //parser.Decode(rsp, temp_buffer);
- }
-
/**
* Perform transaction commit.
*/
@@ -267,42 +202,7 @@ private:
*
* @return Operation result.
*/
- sql_result init_socket();
-
- /**
- * Synchronously send request message and receive response.
- * Uses provided timeout. Does not try to restore connection on
- * fail.
- *
- * @param req Request message.
- * @param rsp response message.
- * @param timeout Timeout.
- * @return @c true on success, @c false on timeout.
- * @throw odbc_error on error.
- */
- template<typename ReqT, typename RspT>
- bool internal_sync_message(const ReqT& req, RspT& rsp, std::int32_t
timeout)
- {
- std::vector<std::int8_t> temp_buffer;
-
- // TODO: IGNITE-19204 Implement encoding and decoding of messages.
- //parser.Encode(req, temp_buffer);
-
- bool success = send(temp_buffer.data(), temp_buffer.size(), timeout);
-
- if (!success)
- return false;
-
- success = receive(temp_buffer, timeout);
-
- if (!success)
- return false;
-
- // TODO: IGNITE-19204 Implement encoding and decoding of messages.
- //parser.Decode(rsp, temp_buffer);
-
- return true;
- }
+ void init_socket();
/**
* Establish connection to ODBC server.
@@ -414,7 +314,7 @@ private:
* @param timeout Timeout.
* @return Operation result.
*/
- operation_result send_all(const std::int8_t* data, std::size_t len,
std::int32_t timeout);
+ operation_result send_all(const std::byte* data, std::size_t len,
std::int32_t timeout);
/**
* Perform handshake request.
@@ -446,6 +346,14 @@ private:
*/
std::int32_t retrieve_timeout(void* value);
+ /**
+ * Safe connect.
+ *
+ * @param addr Address.
+ * @return @c true if connection was successful.
+ */
+ bool safe_connect(const end_point &addr);
+
/**
* Constructor.
*/
@@ -455,7 +363,7 @@ private:
, m_info(m_config) {}
/** Parent. */
- sql_environment * m_env;
+ sql_environment *m_env;
/** Connection timeout in seconds. */
std::int32_t m_timeout{0};
@@ -471,6 +379,12 @@ private:
/** Connection info. */
connection_info m_info;
+
+ /** Socket client. */
+ std::unique_ptr<network::socket_client> m_socket;
+
+ /** Protocol version. */
+ protocol_version m_protocol_version;
};
} // namespace ignite
\ No newline at end of file
diff --git a/modules/platforms/cpp/ignite/odbc/sql_statement.cpp
b/modules/platforms/cpp/ignite/odbc/sql_statement.cpp
index 8b53c309c7..804cf493c8 100644
--- a/modules/platforms/cpp/ignite/odbc/sql_statement.cpp
+++ b/modules/platforms/cpp/ignite/odbc/sql_statement.cpp
@@ -19,7 +19,6 @@
#include "ignite/odbc/system/odbc_constants.h"
#include "ignite/odbc/log.h"
-#include "ignite/odbc/message.h"
#include "ignite/odbc/odbc_error.h"
#include "ignite/odbc/sql_connection.h"
#include "ignite/odbc/sql_statement.h"
@@ -628,6 +627,10 @@ void sql_statement::execute_get_columns_meta_query(const
std::string& schema,
sql_result sql_statement::internal_execute_get_columns_meta_query(const
std::string& schema,
const std::string& table, const std::string& column)
{
+ UNUSED_VALUE schema;
+ UNUSED_VALUE table;
+ UNUSED_VALUE column;
+
if (m_current_query)
m_current_query->close();
@@ -646,6 +649,11 @@ void sql_statement::execute_get_tables_meta_query(const
std::string& catalog,
sql_result sql_statement::internal_execute_get_tables_meta_query(const
std::string& catalog,
const std::string& schema, const std::string& table, const std::string&
table_type)
{
+ UNUSED_VALUE catalog;
+ UNUSED_VALUE schema;
+ UNUSED_VALUE table;
+ UNUSED_VALUE table_type;
+
if (m_current_query)
m_current_query->close();
@@ -666,6 +674,13 @@ sql_result
sql_statement::internal_execute_get_foreign_keys_query(const std::str
const std::string& primary_schema, const std::string& primary_table, const
std::string& foreign_catalog,
const std::string& foreign_schema, const std::string& foreign_table)
{
+ UNUSED_VALUE primary_catalog;
+ UNUSED_VALUE primary_schema;
+ UNUSED_VALUE primary_table;
+ UNUSED_VALUE foreign_catalog;
+ UNUSED_VALUE foreign_schema;
+ UNUSED_VALUE foreign_table;
+
if (m_current_query)
m_current_query->close();
@@ -683,6 +698,10 @@ void sql_statement::execute_get_primary_keys_query(const
std::string& catalog, c
sql_result sql_statement::internal_execute_get_primary_keys_query(const
std::string& catalog, const std::string& schema,
const std::string& table)
{
+ UNUSED_VALUE catalog;
+ UNUSED_VALUE schema;
+ UNUSED_VALUE table;
+
if (m_current_query)
m_current_query->close();
@@ -701,6 +720,12 @@ void sql_statement::execute_special_columns_query(uint16_t
type, const std::stri
sql_result sql_statement::internal_execute_special_columns_query(uint16_t
type, const std::string& catalog,
const std::string& schema, const std::string& table, uint16_t scope,
uint16_t nullable)
{
+ UNUSED_VALUE catalog;
+ UNUSED_VALUE schema;
+ UNUSED_VALUE table;
+ UNUSED_VALUE scope;
+ UNUSED_VALUE nullable;
+
if (type != SQL_BEST_ROWID && type != SQL_ROWVER)
{
add_status_record(sql_state::SHY097_COLUMN_TYPE_OUT_OF_RANGE, "An
invalid IdentifierType value was specified.");
@@ -1114,7 +1139,7 @@ sql_result
sql_statement::internal_describe_param(uint16_t param_num, int16_t* d
return sql_result::AI_ERROR;
}
- auto type = m_parameters.get_param_type(param_num, ignite_type::UNDEFINED);
+ auto type = m_parameters.get_param_type(std::int16_t(param_num),
ignite_type::UNDEFINED);
LOG_MSG("Type: " << type);
@@ -1125,7 +1150,7 @@ sql_result
sql_statement::internal_describe_param(uint16_t param_num, int16_t* d
if (res != sql_result::AI_SUCCESS)
return res;
- type = m_parameters.get_param_type(param_num, ignite_type::UNDEFINED);
+ type = m_parameters.get_param_type(std::int16_t(param_num),
ignite_type::UNDEFINED);
}
if (data_type)
diff --git a/modules/platforms/cpp/ignite/odbc/utility.cpp
b/modules/platforms/cpp/ignite/odbc/utility.cpp
index e1abcb3369..feaee86499 100644
--- a/modules/platforms/cpp/ignite/odbc/utility.cpp
+++ b/modules/platforms/cpp/ignite/odbc/utility.cpp
@@ -18,6 +18,8 @@
#include "ignite/odbc/system/odbc_constants.h"
#include "ignite/odbc/utility.h"
+#include <cstring>
+
namespace ignite {
size_t copy_string_to_buffer(const std::string& str, char* buf, std::size_t
buffer_len)
diff --git a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
index d86878322c..951e35b64a 100644
--- a/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
+++ b/modules/platforms/cpp/tests/client-test/ignite_runner_suite.h
@@ -21,6 +21,7 @@
#include "ignite/client/ignite_client_configuration.h"
#include "gtest_logger.h"
+#include "ignite_runner.h"
#include "test_utils.h"
#include <gtest/gtest.h>
@@ -37,9 +38,6 @@ using namespace std::string_view_literals;
*/
class ignite_runner_suite : public ::testing::Test {
public:
- static constexpr std::initializer_list<std::string_view> SINGLE_NODE_ADDR
= {"127.0.0.1:10942"sv};
- static constexpr std::initializer_list<std::string_view> NODE_ADDRS =
{"127.0.0.1:10942"sv, "127.0.0.1:10943"sv};
-
static constexpr std::string_view TABLE_1 = "tbl1"sv;
static constexpr std::string_view TABLE_NAME_ALL_COLUMNS =
"tbl_all_columns"sv;
static constexpr std::string_view TABLE_NAME_ALL_COLUMNS_SQL =
"tbl_all_columns_sql"sv;
@@ -60,18 +58,6 @@ public:
static constexpr const char *KEY_COLUMN = "key";
static constexpr const char *VAL_COLUMN = "val";
- /**
- * Get node addresses to use for tests.
- *
- * @return Addresses.
- */
- static std::initializer_list<std::string_view> get_node_addrs() {
- if (single_node_mode())
- return SINGLE_NODE_ADDR;
-
- return NODE_ADDRS;
- }
-
/**
* Get logger.
*
@@ -106,6 +92,15 @@ public:
*/
static ignite_tuple get_tuple(std::string val) { return {{VAL_COLUMN,
std::move(val)}}; }
+ /**
+ * Get node addresses to use for tests.
+ *
+ * @return Addresses.
+ */
+ static std::vector<std::string> get_node_addrs() {
+ return ignite_runner::get_node_addrs();
+ }
+
/**
* Clear table @c TABLE_1.
*/
diff --git a/modules/platforms/cpp/tests/client-test/main.cpp
b/modules/platforms/cpp/tests/client-test/main.cpp
index 64d115eef2..e45ff4e76e 100644
--- a/modules/platforms/cpp/tests/client-test/main.cpp
+++ b/modules/platforms/cpp/tests/client-test/main.cpp
@@ -16,7 +16,6 @@
*/
#include "ignite_runner.h"
-#include "ignite_runner_suite.h"
#include "test_utils.h"
#include <ignite/common/ignite_error.h>
@@ -25,89 +24,69 @@
#include <chrono>
#include <csignal>
-#include <thread>
namespace {
+
/** Shutdown handler that cleans up resources. */
std::function<void(int)> shutdown_handler;
/**
- * Receives OS signal and handles it.
- *
- * @param signum Signal value.
- */
+* Receives OS signal and handles it.
+*
+* @param signum Signal value.
+*/
void signal_handler(int signum) {
- shutdown_handler(signum);
-
- signal(signum, SIG_DFL);
-
- raise(signum);
+ shutdown_handler(signum);
+ signal(signum, SIG_DFL);
+ raise(signum);
}
+
} // namespace
/**
- * Sets process abortion (SIGABRT, SIGINT, SIGSEGV signals) handler.
- *
- * @param handler Abortion handler.
- */
+* Sets process abortion (SIGABRT, SIGINT, SIGSEGV signals) handler.
+*
+* @param handler Abortion handler.
+*/
void set_process_abort_handler(std::function<void(int)> handler) {
- shutdown_handler = std::move(handler);
-
- // Install signal handlers to clean up resources on early exit.
- signal(SIGABRT, signal_handler);
- signal(SIGINT, signal_handler);
- signal(SIGSEGV, signal_handler);
-}
-
-bool check_test_node_connectable(std::chrono::seconds timeout) {
- using namespace ignite;
- try {
- for (auto addr : ignite_runner_suite::get_node_addrs()) {
- ignite_client_configuration cfg{addr};
- auto client = ignite_client::start(cfg, timeout);
- }
- return true;
- } catch (...) {
- return false;
- }
-}
+ shutdown_handler = std::move(handler);
-void ensure_node_connectable(std::chrono::seconds timeout) {
- using namespace ignite;
- for (auto addr : ignite_runner_suite::NODE_ADDRS) {
- ignite_client_configuration cfg{addr};
- auto client = ignite_client::start(cfg, timeout);
- }
+ // Install signal handlers to clean up resources on early exit.
+ signal(SIGABRT, signal_handler);
+ signal(SIGINT, signal_handler);
+ signal(SIGSEGV, signal_handler);
}
int main(int argc, char **argv) {
- if (ignite::single_node_mode())
- std::cout << "Tests run in a single-node mode." << std::endl;
- else
- std::cout << "Tests run in a multi-node mode." << std::endl;
-
- ignite::IgniteRunner runner;
- set_process_abort_handler([&](int signal) {
- std::cout << "Caught signal " << signal << " during tests" <<
std::endl;
-
- runner.stop();
- });
-
- if (!check_test_node_connectable(std::chrono::seconds(5))) {
- runner.start();
- ensure_node_connectable(std::chrono::seconds(60));
- }
-
- try {
- ::testing::InitGoogleTest(&argc, argv);
- [[maybe_unused]] int run_res = RUN_ALL_TESTS();
- } catch (const std::exception &err) {
- std::cout << "Uncaught error: " << err.what() << std::endl;
- return 1;
- } catch (...) {
- std::cout << "Unknown uncaught error" << std::endl;
- return 2;
- }
-
- return 0;
+ using namespace ignite;
+
+ if (ignite_runner::single_node_mode())
+ std::cout << "Tests run in a single-node mode." << std::endl;
+ else
+ std::cout << "Tests run in a multi-node mode." << std::endl;
+
+ ignite_runner runner;
+ set_process_abort_handler([&](int signal) {
+ std::cout << "Caught signal " << signal << " during tests" << std::endl;
+
+ runner.stop();
+ });
+
+ if (!check_test_node_connectable(std::chrono::seconds(5))) {
+ runner.start();
+ ensure_node_connectable(std::chrono::seconds(60));
+ }
+
+ try {
+ ::testing::InitGoogleTest(&argc, argv);
+ [[maybe_unused]] int run_res = RUN_ALL_TESTS();
+ } catch (const std::exception &err) {
+ std::cout << "Uncaught error: " << err.what() << std::endl;
+ return 1;
+ } catch (...) {
+ std::cout << "Unknown uncaught error" << std::endl;
+ return 2;
+ }
+
+ return 0;
}
diff --git a/modules/platforms/cpp/tests/odbc-test/CMakeLists.txt
b/modules/platforms/cpp/tests/odbc-test/CMakeLists.txt
new file mode 100644
index 0000000000..1b725c3d9f
--- /dev/null
+++ b/modules/platforms/cpp/tests/odbc-test/CMakeLists.txt
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+project(ignite-odbc-test)
+
+set(TARGET ${PROJECT_NAME})
+
+find_package(ODBC REQUIRED)
+
+set(SOURCES
+ connection_test.cpp
+ main.cpp
+)
+
+add_executable(${TARGET} ${SOURCES})
+target_link_libraries(${TARGET} ignite-test-common ignite-client
${ODBC_LIBRARY} GTest::GTest)
+
+if (WIN32)
+ remove_definitions(-DUNICODE=1)
+else()
+ add_definitions(-DBOOST_TEST_DYN_LINK)
+endif()
+
+set(TEST_TARGET IgniteOdbcTest)
+add_test(NAME ${TEST_TARGET} COMMAND ${TARGET})
diff --git a/modules/platforms/cpp/tests/odbc-test/connection_test.cpp
b/modules/platforms/cpp/tests/odbc-test/connection_test.cpp
new file mode 100644
index 0000000000..3a50d5c2e2
--- /dev/null
+++ b/modules/platforms/cpp/tests/odbc-test/connection_test.cpp
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2019 GridGain Systems, Inc. and Contributors.
+ *
+ * Licensed under the GridGain Community Edition License (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
https://www.gridgain.com/products/software/community-edition/gridgain-community-edition-license
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+# include <windows.h>
+#endif
+
+#include <sql.h>
+#include <sqlext.h>
+
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "test_utils.h"
+#include "ignite_runner.h"
+
+
+constexpr size_t ODBC_BUFFER_SIZE = 1024;
+
+std::string get_odbc_error_message(SQLSMALLINT handleType, SQLHANDLE handle,
SQLSMALLINT idx = 1)
+{
+ SQLCHAR sqlstate[7] = {};
+ SQLINTEGER nativeCode;
+
+ SQLCHAR message[ODBC_BUFFER_SIZE];
+ SQLSMALLINT real_len = 0;
+
+ SQLGetDiagRec(handleType, handle, idx, sqlstate, &nativeCode, message,
ODBC_BUFFER_SIZE, &real_len);
+
+ std::string res(reinterpret_cast<char*>(sqlstate));
+
+ if (!res.empty())
+ res.append(": ").append(reinterpret_cast<char*>(message), real_len);
+ else
+ res = "No results";
+
+ return res;
+}
+
+void odbc_connect(const std::string& connect_str) {
+ // Allocate an environment handle
+ SQLHENV env;
+ SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
+
+ EXPECT_TRUE(env != nullptr);
+
+ // We want ODBC 3 support
+ SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION,
reinterpret_cast<void*>(SQL_OV_ODBC3), 0);
+
+ // Allocate a connection handle
+ SQLHDBC conn;
+ SQLAllocHandle(SQL_HANDLE_DBC, env, &conn);
+
+ EXPECT_TRUE(conn != nullptr);
+
+ // Connect string
+ std::vector<SQLCHAR> connect_str0(connect_str.begin(), connect_str.end());
+
+ SQLCHAR outstr[ODBC_BUFFER_SIZE];
+ SQLSMALLINT outstrlen;
+
+ // Connecting to ODBC server.
+ SQLRETURN ret = SQLDriverConnect(conn, NULL, &connect_str0[0],
static_cast<SQLSMALLINT>(connect_str0.size()),
+ outstr, sizeof(outstr), &outstrlen, SQL_DRIVER_COMPLETE);
+
+ if (!SQL_SUCCEEDED(ret)) {
+ FAIL() << get_odbc_error_message(SQL_HANDLE_DBC, conn);
+ }
+
+ // Allocate a statement handle
+ SQLHSTMT statement;
+ SQLAllocHandle(SQL_HANDLE_STMT, conn, &statement);
+
+ EXPECT_TRUE(statement != nullptr);
+}
+
+/**
+ * Test suite.
+ */
+class connection_test : public ::testing::Test {};
+
+TEST_F(connection_test, connection_success) {
+ std::string addr_str;
+ auto addrs = ignite::ignite_runner::get_node_addrs();
+ for (auto &addr : addrs)
+ addr_str += addr + ",";
+ addr_str.pop_back();
+
+ odbc_connect("DRIVER={Apache Ignite 3};ADDRESS=" + addr_str);
+}
diff --git a/modules/platforms/cpp/tests/client-test/main.cpp
b/modules/platforms/cpp/tests/odbc-test/main.cpp
similarity index 78%
copy from modules/platforms/cpp/tests/client-test/main.cpp
copy to modules/platforms/cpp/tests/odbc-test/main.cpp
index 64d115eef2..bc34324a8f 100644
--- a/modules/platforms/cpp/tests/client-test/main.cpp
+++ b/modules/platforms/cpp/tests/odbc-test/main.cpp
@@ -16,7 +16,6 @@
*/
#include "ignite_runner.h"
-#include "ignite_runner_suite.h"
#include "test_utils.h"
#include <ignite/common/ignite_error.h>
@@ -25,9 +24,9 @@
#include <chrono>
#include <csignal>
-#include <thread>
namespace {
+
/** Shutdown handler that cleans up resources. */
std::function<void(int)> shutdown_handler;
@@ -38,11 +37,10 @@ std::function<void(int)> shutdown_handler;
*/
void signal_handler(int signum) {
shutdown_handler(signum);
-
signal(signum, SIG_DFL);
-
raise(signum);
}
+
} // namespace
/**
@@ -59,34 +57,15 @@ void set_process_abort_handler(std::function<void(int)>
handler) {
signal(SIGSEGV, signal_handler);
}
-bool check_test_node_connectable(std::chrono::seconds timeout) {
- using namespace ignite;
- try {
- for (auto addr : ignite_runner_suite::get_node_addrs()) {
- ignite_client_configuration cfg{addr};
- auto client = ignite_client::start(cfg, timeout);
- }
- return true;
- } catch (...) {
- return false;
- }
-}
-
-void ensure_node_connectable(std::chrono::seconds timeout) {
+int main(int argc, char **argv) {
using namespace ignite;
- for (auto addr : ignite_runner_suite::NODE_ADDRS) {
- ignite_client_configuration cfg{addr};
- auto client = ignite_client::start(cfg, timeout);
- }
-}
-int main(int argc, char **argv) {
- if (ignite::single_node_mode())
+ if (ignite_runner::single_node_mode())
std::cout << "Tests run in a single-node mode." << std::endl;
else
std::cout << "Tests run in a multi-node mode." << std::endl;
- ignite::IgniteRunner runner;
+ ignite_runner runner;
set_process_abort_handler([&](int signal) {
std::cout << "Caught signal " << signal << " during tests" <<
std::endl;
diff --git a/modules/platforms/cpp/tests/test-common/ignite_runner.cpp
b/modules/platforms/cpp/tests/test-common/ignite_runner.cpp
index 6bbb00be26..f52b773e18 100644
--- a/modules/platforms/cpp/tests/test-common/ignite_runner.cpp
+++ b/modules/platforms/cpp/tests/test-common/ignite_runner.cpp
@@ -38,8 +38,8 @@ constexpr std::string_view GRADLEW_SCRIPT =
IGNITE_SWITCH_WIN_OTHER("gradlew.bat
namespace ignite {
-void IgniteRunner::start() {
- std::string home = resolveIgniteHome();
+void ignite_runner::start() {
+ std::string home = resolve_ignite_home();
if (home.empty())
throw std::runtime_error("Can not resolve Ignite home directory. Try
setting IGNITE_HOME explicitly");
@@ -68,12 +68,12 @@ void IgniteRunner::start() {
}
}
-void IgniteRunner::stop() {
+void ignite_runner::stop() {
if (m_process)
m_process->kill();
}
-void IgniteRunner::join(std::chrono::milliseconds timeout) {
+void ignite_runner::join(std::chrono::milliseconds timeout) {
if (m_process)
m_process->join(timeout);
}
diff --git a/modules/platforms/cpp/tests/test-common/ignite_runner.h
b/modules/platforms/cpp/tests/test-common/ignite_runner.h
index 434dbff05a..ce2dd868dc 100644
--- a/modules/platforms/cpp/tests/test-common/ignite_runner.h
+++ b/modules/platforms/cpp/tests/test-common/ignite_runner.h
@@ -17,24 +17,29 @@
#pragma once
-#include <chrono>
-
#include "cmd_process.h"
+#include "test_utils.h"
+
+#include <chrono>
+#include <string_view>
namespace ignite {
/**
- * Represents IgniteRunner process.
+ * Represents ignite_runner process.
*
- * IgniteRunner is started from command line. It is recommended to re-use
- * the same IgniteRunner as much as possible to make tests as quick as
possible.
+ * ignite_runner is started from command line. It is recommended to re-use
+ * the same ignite_runner as much as possible to make tests as quick as
possible.
*/
-class IgniteRunner {
+class ignite_runner {
public:
+ static inline std::vector<std::string> SINGLE_NODE_ADDR =
{"127.0.0.1:10942"};
+ static inline std::vector<std::string> NODE_ADDRS = {"127.0.0.1:10942",
"127.0.0.1:10943"};
+
/**
* Destructor.
*/
- ~IgniteRunner() { stop(); }
+ ~ignite_runner() { stop(); }
/**
* Start node.
@@ -53,6 +58,26 @@ public:
*/
void join(std::chrono::milliseconds timeout);
+ /**
+ * Check whether tests run in single node mode.
+ *
+ * @return @c true if tests run in single node mode.
+ */
+ static bool single_node_mode() {
+ return ignite::get_env("IGNITE_CPP_TESTS_USE_SINGLE_NODE").has_value();
+ }
+
+ /**
+ * Get node addresses to use for tests.
+ *
+ * @return Addresses.
+ */
+ static std::vector<std::string> get_node_addrs() {
+ if (single_node_mode())
+ return SINGLE_NODE_ADDR;
+
+ return NODE_ADDRS;
+ }
private:
/** Underlying process. */
std::unique_ptr<CmdProcess> m_process;
diff --git a/modules/platforms/cpp/tests/test-common/test_utils.cpp
b/modules/platforms/cpp/tests/test-common/test_utils.cpp
index 7f70cc8479..f40cff7061 100644
--- a/modules/platforms/cpp/tests/test-common/test_utils.cpp
+++ b/modules/platforms/cpp/tests/test-common/test_utils.cpp
@@ -15,13 +15,16 @@
* limitations under the License.
*/
+#include <ignite/client/ignite_client.h>
+
+#include "ignite_runner.h"
+#include "test_utils.h"
+
#include <filesystem>
#include <functional>
#include <iostream>
#include <vector>
-#include "test_utils.h"
-
namespace ignite {
std::optional<std::string> get_env(const std::string &name) {
@@ -58,14 +61,14 @@ bool looksLikeBinaryReleaseHome(const std::filesystem::path
&path) {
* Internally checks for presence of core source directory.
* @return @c true if the path looks like binary release home directory.
*/
-bool looksLikeSourceReleaseHome(const std::filesystem::path &path) {
- std::filesystem::path coreSourcePath =
+bool looks_like_source_release_home(const std::filesystem::path &path) {
+ std::filesystem::path core_source_path =
path / "modules" / "core" / "src" / "main" / "java" / "org" / "apache"
/ "ignite";
- return std::filesystem::is_directory(coreSourcePath);
+ return std::filesystem::is_directory(core_source_path);
}
-std::string resolveIgniteHome(const std::string &path) {
+std::string resolve_ignite_home(const std::string &path) {
std::error_code error;
std::filesystem::path home = std::filesystem::canonical(path, error);
@@ -81,7 +84,7 @@ std::string resolveIgniteHome(const std::string &path) {
home = std::filesystem::current_path();
while (!home.empty() && home.has_relative_path()) {
- if (looksLikeBinaryReleaseHome(home) ||
looksLikeSourceReleaseHome(home))
+ if (looksLikeBinaryReleaseHome(home) ||
looks_like_source_release_home(home))
return home.string();
home = home.parent_path();
@@ -89,4 +92,19 @@ std::string resolveIgniteHome(const std::string &path) {
return home.string();
}
+bool check_test_node_connectable(std::chrono::seconds timeout) {
+ try {
+ ensure_node_connectable(timeout);
+ return true;
+ } catch (...) {
+ return false;
+ }
+}
+
+void ensure_node_connectable(std::chrono::seconds timeout) {
+ for (auto &addr : ignite_runner::get_node_addrs()) {
+ auto client = ignite_client::start({addr}, timeout);
+ }
+}
+
} // namespace ignite
\ No newline at end of file
diff --git a/modules/platforms/cpp/tests/test-common/test_utils.h
b/modules/platforms/cpp/tests/test-common/test_utils.h
index d0ac873a2b..8717007c08 100644
--- a/modules/platforms/cpp/tests/test-common/test_utils.h
+++ b/modules/platforms/cpp/tests/test-common/test_utils.h
@@ -35,15 +35,6 @@ namespace ignite {
*/
std::optional<std::string> get_env(const std::string &name);
-/**
- * Check whether tests run in single node mode.
- *
- * @return @c true if tests run in single node mode.
- */
-inline bool single_node_mode() {
- return ignite::get_env("IGNITE_CPP_TESTS_USE_SINGLE_NODE").has_value();
-}
-
/**
* Resolve IGNITE_HOME directory. Resolution is performed in several steps:
* 1) Check for path provided as argument.
@@ -55,7 +46,7 @@ inline bool single_node_mode() {
* @param path Optional path to check.
* @return Resolved Ignite home.
*/
-std::string resolveIgniteHome(const std::string &path = "");
+std::string resolve_ignite_home(const std::string &path = "");
/**
* Check async operation result and propagate error to the promise if there is
@@ -75,4 +66,20 @@ bool check_and_set_operation_error(std::promise<T2> &prom,
const ignite_result<T
return true;
}
+/**
+ * Check whether test cluster is connectable.
+ *
+ * @param timeout Timeout.
+ * @return @c true if cluster is connectable.
+ */
+bool check_test_node_connectable(std::chrono::seconds timeout);
+
+/**
+ * Make sure that test cluster is connectable.
+ * Throws on fail.
+ *
+ * @param timeout Timeout.
+ */
+void ensure_node_connectable(std::chrono::seconds timeout);
+
} // namespace ignite
\ No newline at end of file