This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 10c837db69a IGNITE-27870 C++ Client: Add server proxy for testing
(#7607)
10c837db69a is described below
commit 10c837db69a68736c4a2109ecc1cb03d05f40395
Author: Ed Rakhmankulov <[email protected]>
AuthorDate: Tue Mar 17 17:02:29 2026 +0300
IGNITE-27870 C++ Client: Add server proxy for testing (#7607)
---
modules/platforms/cpp/cmake/dependencies.cmake | 40 ++-
.../platforms/cpp/ignite/network/CMakeLists.txt | 2 +-
.../platforms/cpp/tests/fake_server/CMakeLists.txt | 2 +-
.../cpp/tests/fake_server/connection_test.cpp | 35 ++-
.../cpp/tests/fake_server/proxy/asio_proxy.h | 275 +++++++++++++++++++++
.../cpp/tests/fake_server/proxy/message_listener.h | 38 +++
6 files changed, 381 insertions(+), 11 deletions(-)
diff --git a/modules/platforms/cpp/cmake/dependencies.cmake
b/modules/platforms/cpp/cmake/dependencies.cmake
index 1966568c55a..a66483e617c 100644
--- a/modules/platforms/cpp/cmake/dependencies.cmake
+++ b/modules/platforms/cpp/cmake/dependencies.cmake
@@ -29,17 +29,31 @@ if (CMAKE_VERSION VERSION_GREATER_EQUAL "3.30.0")
cmake_policy(SET CMP0169 OLD)
endif()
-function(fetch_dependency NAME URL MD5)
+function(fetch_dependency NAME URL SHA256 ADD_SUBDIR)
message(STATUS "Download dependency: ${NAME}")
FetchContent_Declare(
${NAME}
URL ${URL}
- URL_HASH MD5=${MD5}
+ URL_HASH SHA256=${SHA256}
)
FetchContent_GetProperties(${NAME})
if(NOT ${NAME}_POPULATED)
FetchContent_Populate(${NAME})
- add_subdirectory(${${NAME}_SOURCE_DIR} ${${NAME}_BINARY_DIR}
EXCLUDE_FROM_ALL)
+
+ if(${ADD_SUBDIR})
+ add_subdirectory(${${NAME}_SOURCE_DIR} ${${NAME}_BINARY_DIR}
EXCLUDE_FROM_ALL)
+ endif()
+ endif()
+endfunction()
+
+function(add_asio_dependency)
+ fetch_dependency(asio
https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz
0310a76b27e1854f09f696b30de57dc490b5e1b17faed1eb8c9a2891f956e52b FALSE)
+
+ FetchContent_GetProperties(asio)
+ if (NOT TARGET asio)
+ add_library(asio INTERFACE)
+ target_include_directories(asio INTERFACE
${asio_SOURCE_DIR}/asio/include)
+ target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
endif()
endfunction()
@@ -68,14 +82,26 @@ if (${USE_LOCAL_DEPS})
endif()
message(STATUS "GTEST FOUND: " ${GTest_VERSION})
endif()
+
+ find_package(asio REQUIRED)
+ if (${asio_FOUND})
+ if (NOT TARGET asio)
+ add_library(asio INTERFACE)
+ target_include_directories(asio INTERFACE
${asio_SOURCE_DIR}/asio/include)
+ target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
+
+ message(STATUS "asio FOUND: " ${asio_VERSION})
+ endif()
+ endif()
endif()
else()
include(FetchContent)
- fetch_dependency(msgpack-c
https://github.com/msgpack/msgpack-c/releases/download/c-6.0.1/msgpack-c-6.0.1.tar.gz
090df53a59b845767fcfc48221b30ee9)
- fetch_dependency(tf-psa
https://github.com/Mbed-TLS/TF-PSA-Crypto/releases/download/tf-psa-crypto-1.0.0/tf-psa-crypto-1.0.0.tar.bz2
39037452d0314496589ab18461c1535c)
- fetch_dependency(uni-algo
https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz
6e0cce94a6b45ebee7b904316df9f87f)
+ fetch_dependency(msgpack-c
https://github.com/msgpack/msgpack-c/releases/download/c-6.0.1/msgpack-c-6.0.1.tar.gz
a349cd9af28add2334c7009e331335af4a5b97d8558b2e9804d05f3b33d97925 TRUE)
+ fetch_dependency(tf-psa
https://github.com/Mbed-TLS/TF-PSA-Crypto/releases/download/tf-psa-crypto-1.0.0/tf-psa-crypto-1.0.0.tar.bz2
31f0df2ca17897b5db2757cb0307dcde267292ba21ade831663d972a7a5b7d40 TRUE)
+ fetch_dependency(uni-algo
https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz
f2a1539cd8635bc6088d05144a73ecfe7b4d74ee0361fabed6f87f9f19e74ca9 TRUE)
if (${ENABLE_TESTS})
- fetch_dependency(googletest
https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz
c8340a482851ef6a3fe618a082304cfc)
+ fetch_dependency(googletest
https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz
8ad598c73ad796e0d8280b082cebd82a630d73e73cd3c70057938a6501bba5d7 TRUE)
+ add_asio_dependency()
endif()
endif()
diff --git a/modules/platforms/cpp/ignite/network/CMakeLists.txt
b/modules/platforms/cpp/ignite/network/CMakeLists.txt
index 202b54e37cb..2400a555320 100644
--- a/modules/platforms/cpp/ignite/network/CMakeLists.txt
+++ b/modules/platforms/cpp/ignite/network/CMakeLists.txt
@@ -83,7 +83,7 @@ elseif(APPLE)
find_package(epoll-shim REQUIRED)
target_link_libraries(${TARGET} PUBLIC epoll-shim::epoll-shim
${CMAKE_DL_LIBS})
else()
- fetch_dependency(epoll-shim
https://github.com/jiixyj/epoll-shim/archive/refs/tags/v0.0.20240608.tar.gz
9751ab5cad7bff8a1388a951276247bf)
+ fetch_dependency(epoll-shim
https://github.com/jiixyj/epoll-shim/archive/refs/tags/v0.0.20240608.tar.gz
9751ab5cad7bff8a1388a951276247bf TRUE)
target_link_libraries(${TARGET} PUBLIC epoll-shim)
endif()
add_compile_definitions(EPOLL_SHIM_NO_VARIADICS)
diff --git a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
index ea4abb5be51..402291e4304 100644
--- a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
@@ -24,4 +24,4 @@ set(SOURCES
connection_test.cpp
)
-ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
+ignite_test(${TARGET} SOURCES ${SOURCES} LIBS asio ignite-test-common
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
diff --git a/modules/platforms/cpp/tests/fake_server/connection_test.cpp
b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
index d1146b1ae3e..393d6f92338 100644
--- a/modules/platforms/cpp/tests/fake_server/connection_test.cpp
+++ b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
@@ -15,13 +15,15 @@
* limitations under the License.
*/
-#include "tests/client-test/ignite_runner_suite.h"
-#include "ignite/client/ignite_client.h"
#include "fake_server.h"
+#include "ignite/client/ignite_client.h"
+#include "proxy/asio_proxy.h"
+#include "tests/client-test/ignite_runner_suite.h"
#include <gtest/gtest.h>
#include <thread>
+
using namespace ignite;
using namespace std::chrono_literals;
@@ -76,3 +78,32 @@ TEST_F(connection_test, request_timeout) {
EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
}
}
+
+TEST_F(connection_test, using_asio) {
+ fake_server fs{50900, get_logger()};
+ fs.start();
+
+ auto in_listener = std::make_shared<proxy::message_listener>();
+ auto out_listener = std::make_shared<proxy::message_listener>();
+
+ proxy::asio_proxy proxy{
+ {
+ proxy::configuration(50800, "127.0.0.1:50900", in_listener,
out_listener)
+ },
+ get_logger()
+ };
+
+
+ ignite_client_configuration cfg;
+ cfg.set_logger(get_logger());
+ cfg.set_endpoints(get_endpoints());
+
+ auto cl = ignite_client::start(cfg, 5s);
+
+ auto cluster_nodes = cl.get_cluster_nodes();
+
+ ASSERT_EQ(1, cluster_nodes.size());
+
+ ASSERT_GT(in_listener->get_msg_queue().size(), 1);
+ ASSERT_GT(out_listener->get_msg_queue().size(), 1);
+}
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
b/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
new file mode 100644
index 00000000000..976f2faa8a8
--- /dev/null
+++ b/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+#include "gtest_logger.h"
+
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ts/internet.hpp>
+
+#include "message_listener.h"
+
+#include <complex>
+#include <list>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+static constexpr size_t BUFF_SIZE = 4096;
+
+struct configuration {
+ asio::ip::port_type m_in_port;
+ std::string m_out_host_and_port;
+ std::shared_ptr<message_listener> m_in_listener;
+ std::shared_ptr<message_listener> m_out_listener;
+
+ configuration(
+ asio::ip::port_type m_in_port,
+ const std::string &m_out_host_and_port,
+ std::shared_ptr<message_listener> in_listener,
+ std::shared_ptr<message_listener> out_listener)
+ : m_in_port(m_in_port)
+ , m_out_host_and_port(m_out_host_and_port)
+ , m_in_listener(std::move(in_listener))
+ , m_out_listener(std::move(out_listener)) {}
+};
+
+struct proxy_entry {
+ tcp::acceptor m_in_acceptor;
+ std::string m_out_host;
+ std::string m_out_port;
+ std::shared_ptr<message_listener> m_in_listener;
+ std::shared_ptr<message_listener> m_out_listener;
+
+ proxy_entry(asio::io_context& io_context,const configuration& cfg)
+ : m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), cfg.m_in_port))
+ , m_in_listener(std::move(cfg.m_in_listener))
+ , m_out_listener(std::move(cfg.m_out_listener))
+ {
+ auto colon_pos = cfg.m_out_host_and_port.find(':');
+
+ if (colon_pos == std::string::npos) {
+ throw std::runtime_error("Incorrect host and part format. Expected
'hostname:port' but got " + cfg.m_out_host_and_port);
+ }
+
+ m_out_host = cfg.m_out_host_and_port.substr(0, colon_pos);
+ m_out_port = cfg.m_out_host_and_port.substr(colon_pos + 1);
+ }
+};
+
+class session_part: public std::enable_shared_from_this<session_part> {
+public:
+ session_part(
+ std::shared_ptr<tcp::socket> src,
+ std::shared_ptr<tcp::socket> dst,
+ std::shared_ptr<message_listener> listener,
+ std::atomic_bool& failed,
+ std::shared_ptr<gtest_logger> logger)
+ : m_src(std::move(src))
+ , m_dst(std::move(dst))
+ , m_listener(std::move(listener))
+ , m_failed(failed)
+ , m_logger(std::move(logger)) {}
+
+ void do_read() {
+ m_src->async_read_some(asio::buffer(m_buf, BUFF_SIZE),
+ [self = this->shared_from_this()](const asio::error_code& ec, size_t
len) {
+ if (ec) {
+ if (ec == asio::error::eof) {
+ return;
+ }
+ self->m_logger->log_error("Error while reading from socket " +
ec.message());
+
+ self->m_failed.store(true);
+ }
+
+ message m{self->m_buf.begin(), self->m_buf.begin() + len};
+
+ if (self->m_listener) {
+ self->m_listener->register_message(m);
+ }
+
+ self->do_write(std::move(m));
+ });
+ }
+
+ void do_write(message&& msg) {
+ asio::async_write(
+ *m_dst, asio::buffer(msg.data(), msg.size()),
+ [self = shared_from_this()](asio::error_code ec, size_t) {
+ if (ec) {
+ if (ec == asio::error::eof) {
+ return;
+ }
+ self->m_logger->log_error("Error while writing to socket "
+ ec.message());
+
+ self->m_failed.store(true);
+ }
+
+ self->do_read();
+ });
+ }
+
+private:
+ std::shared_ptr<tcp::socket> m_src;
+ std::shared_ptr<tcp::socket> m_dst;
+ std::array<char, BUFF_SIZE> m_buf{};
+ std::shared_ptr<message_listener> m_listener{nullptr};
+ std::atomic_bool& m_failed;
+ std::shared_ptr<gtest_logger> m_logger;
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+ session(
+ std::shared_ptr<tcp::socket> in_sock,
+ std::shared_ptr<tcp::socket> out_sock,
+ std::shared_ptr<message_listener> in_listener,
+ std::shared_ptr<message_listener> out_listener,
+ std::atomic_bool& failed,
+ std::shared_ptr<gtest_logger> logger)
+ : m_in_sock(std::move(in_sock))
+ , m_out_sock(std::move(out_sock))
+ {
+ m_forward_part = std::make_shared<session_part>(m_in_sock, m_out_sock,
in_listener, failed, logger);
+ m_reverse_part = std::make_shared<session_part>(m_out_sock, m_in_sock,
out_listener, failed, logger);
+ }
+
+ void connect(const tcp::resolver::results_type& endpoints) {
+ asio::async_connect(*m_out_sock, endpoints,
+ [self=shared_from_this()](const asio::error_code &ec, const
tcp::endpoint &e) {
+ if (ec) {
+ throw std::runtime_error(
+ "Error connecting to server " + ec.message()
+ + " port=" + std::to_string(e.port())
+ );
+ }
+
+ self->do_serve();
+ });
+ }
+private:
+ void do_serve() {
+ m_forward_part->do_read();
+ m_reverse_part->do_read();
+ }
+
+ std::shared_ptr<tcp::socket> m_in_sock;
+ std::shared_ptr<tcp::socket> m_out_sock;
+
+ std::shared_ptr<session_part> m_forward_part;
+ std::shared_ptr<session_part> m_reverse_part;
+};
+
+class asio_proxy {
+public:
+ asio_proxy(std::vector<configuration> configurations,
std::shared_ptr<gtest_logger> logger)
+ : m_resolver(m_io_context)
+ , m_logger(std::move(std::move(logger)))
+ {
+ for (auto &cfg : configurations) {
+ m_conn_map.emplace(
+ cfg.m_in_port,
+ proxy_entry{m_io_context, cfg}
+ );
+ }
+
+ do_serve();
+
+ m_executor = std::make_unique<std::thread>([this]() {
+ m_io_context.run();
+ });
+ }
+
+ ~asio_proxy() {
+ m_stopped.store(true);
+ m_io_context.stop();
+
+ m_executor->join();
+
+ if (m_failed.load()) {
+ ADD_FAILURE() << "Proxy error occurred during test execution";
+ }
+ }
+
+private:
+ void do_serve() {
+ for (auto& [_, entry]: m_conn_map) {
+ do_accept(entry);
+ }
+ }
+
+ void do_accept(proxy_entry& entry) {
+ if (m_stopped.load()) {
+ return;
+ }
+
+ entry.m_in_acceptor.async_accept([this, &entry](asio::error_code ec,
tcp::socket in_sock) {
+ if (ec) {
+ throw std::runtime_error("Error accepting incoming connection
" + ec.message());
+ }
+
+ auto p_in_sock = std::make_shared<tcp::socket>(std::move(in_sock));
+ auto p_out_sock = std::make_shared<tcp::socket>(m_io_context);
+ auto ses = std::make_shared<session>(
+ p_in_sock,
+ p_out_sock,
+ entry.m_in_listener,
+ entry.m_out_listener,
+ this->m_failed,
+ m_logger
+ );
+
+ tcp::resolver &resolver = m_resolver;
+ resolver.async_resolve(entry.m_out_host, entry.m_out_port,
+ [ses](
+ asio::error_code ec, tcp::resolver::results_type
endpoints) { // NOLINT(*-unnecessary-value-param)
+ if (ec) {
+ throw std::runtime_error("Error resolving server's
address " + ec.message());
+ }
+
+ ses->connect(endpoints);
+ });
+
+ do_accept(entry);
+ });
+ }
+
+ std::map<asio::ip::port_type, proxy_entry> m_conn_map{};
+
+ asio::io_context m_io_context{};
+ std::unique_ptr<std::thread> m_executor{};
+
+ tcp::resolver m_resolver;
+
+ std::shared_ptr<gtest_logger> m_logger;
+
+ std::atomic_bool m_stopped{false};
+
+ std::atomic_bool m_failed{false};
+};
+} // namespace ignite::proxy
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
new file mode 100644
index 00000000000..92773b60e39
--- /dev/null
+++ b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+#include <queue>
+#include <utility>
+
+namespace ignite::proxy {
+
+using message = std::vector<char>;
+
+class message_listener {
+public:
+ void register_message(message msg) {
+ m_queue.push(std::move(msg));
+ }
+
+ const std::queue<message>& get_msg_queue() const {
+ return m_queue;
+ }
+
+private:
+ std::queue<message> m_queue{};
+};
+} // namespace ignite::proxy