This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 10c837db69a IGNITE-27870 C++ Client: Add server proxy for testing 
(#7607)
10c837db69a is described below

commit 10c837db69a68736c4a2109ecc1cb03d05f40395
Author: Ed Rakhmankulov <[email protected]>
AuthorDate: Tue Mar 17 17:02:29 2026 +0300

    IGNITE-27870 C++ Client: Add server proxy for testing (#7607)
---
 modules/platforms/cpp/cmake/dependencies.cmake     |  40 ++-
 .../platforms/cpp/ignite/network/CMakeLists.txt    |   2 +-
 .../platforms/cpp/tests/fake_server/CMakeLists.txt |   2 +-
 .../cpp/tests/fake_server/connection_test.cpp      |  35 ++-
 .../cpp/tests/fake_server/proxy/asio_proxy.h       | 275 +++++++++++++++++++++
 .../cpp/tests/fake_server/proxy/message_listener.h |  38 +++
 6 files changed, 381 insertions(+), 11 deletions(-)

diff --git a/modules/platforms/cpp/cmake/dependencies.cmake 
b/modules/platforms/cpp/cmake/dependencies.cmake
index 1966568c55a..a66483e617c 100644
--- a/modules/platforms/cpp/cmake/dependencies.cmake
+++ b/modules/platforms/cpp/cmake/dependencies.cmake
@@ -29,17 +29,31 @@ if (CMAKE_VERSION VERSION_GREATER_EQUAL "3.30.0")
     cmake_policy(SET CMP0169 OLD)
 endif()
 
-function(fetch_dependency NAME URL MD5)
+function(fetch_dependency NAME URL SHA256 ADD_SUBDIR)
     message(STATUS "Download dependency: ${NAME}")
     FetchContent_Declare(
         ${NAME}
         URL ${URL}
-        URL_HASH MD5=${MD5}
+        URL_HASH SHA256=${SHA256}
     )
     FetchContent_GetProperties(${NAME})
     if(NOT ${NAME}_POPULATED)
         FetchContent_Populate(${NAME})
-        add_subdirectory(${${NAME}_SOURCE_DIR} ${${NAME}_BINARY_DIR} 
EXCLUDE_FROM_ALL)
+
+        if(${ADD_SUBDIR})
+            add_subdirectory(${${NAME}_SOURCE_DIR} ${${NAME}_BINARY_DIR} 
EXCLUDE_FROM_ALL)
+        endif()
+    endif()
+endfunction()
+
+function(add_asio_dependency)
+    fetch_dependency(asio 
https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-36-0.tar.gz 
0310a76b27e1854f09f696b30de57dc490b5e1b17faed1eb8c9a2891f956e52b FALSE)
+
+    FetchContent_GetProperties(asio)
+    if (NOT TARGET asio)
+        add_library(asio INTERFACE)
+        target_include_directories(asio INTERFACE 
${asio_SOURCE_DIR}/asio/include)
+        target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
     endif()
 endfunction()
 
@@ -68,14 +82,26 @@ if (${USE_LOCAL_DEPS})
             endif()
             message(STATUS "GTEST FOUND: " ${GTest_VERSION})
         endif()
+
+        find_package(asio REQUIRED)
+        if (${asio_FOUND})
+            if (NOT TARGET asio)
+                add_library(asio INTERFACE)
+                target_include_directories(asio INTERFACE 
${asio_SOURCE_DIR}/asio/include)
+                target_compile_definitions(asio INTERFACE ASIO_STANDALONE)
+
+                message(STATUS "asio FOUND: " ${asio_VERSION})
+            endif()
+        endif()
     endif()
 else()
     include(FetchContent)
-    fetch_dependency(msgpack-c 
https://github.com/msgpack/msgpack-c/releases/download/c-6.0.1/msgpack-c-6.0.1.tar.gz
 090df53a59b845767fcfc48221b30ee9)
-    fetch_dependency(tf-psa 
https://github.com/Mbed-TLS/TF-PSA-Crypto/releases/download/tf-psa-crypto-1.0.0/tf-psa-crypto-1.0.0.tar.bz2
 39037452d0314496589ab18461c1535c)
-    fetch_dependency(uni-algo 
https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz 
6e0cce94a6b45ebee7b904316df9f87f)
+    fetch_dependency(msgpack-c 
https://github.com/msgpack/msgpack-c/releases/download/c-6.0.1/msgpack-c-6.0.1.tar.gz
 a349cd9af28add2334c7009e331335af4a5b97d8558b2e9804d05f3b33d97925 TRUE)
+    fetch_dependency(tf-psa 
https://github.com/Mbed-TLS/TF-PSA-Crypto/releases/download/tf-psa-crypto-1.0.0/tf-psa-crypto-1.0.0.tar.bz2
 31f0df2ca17897b5db2757cb0307dcde267292ba21ade831663d972a7a5b7d40 TRUE)
+    fetch_dependency(uni-algo 
https://github.com/uni-algo/uni-algo/archive/v1.2.0.tar.gz 
f2a1539cd8635bc6088d05144a73ecfe7b4d74ee0361fabed6f87f9f19e74ca9 TRUE)
     if (${ENABLE_TESTS})
-        fetch_dependency(googletest 
https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz 
c8340a482851ef6a3fe618a082304cfc)
+        fetch_dependency(googletest 
https://github.com/google/googletest/archive/refs/tags/v1.14.0.tar.gz 
8ad598c73ad796e0d8280b082cebd82a630d73e73cd3c70057938a6501bba5d7 TRUE)
+        add_asio_dependency()
     endif()
 endif()
 
diff --git a/modules/platforms/cpp/ignite/network/CMakeLists.txt 
b/modules/platforms/cpp/ignite/network/CMakeLists.txt
index 202b54e37cb..2400a555320 100644
--- a/modules/platforms/cpp/ignite/network/CMakeLists.txt
+++ b/modules/platforms/cpp/ignite/network/CMakeLists.txt
@@ -83,7 +83,7 @@ elseif(APPLE)
         find_package(epoll-shim REQUIRED)
         target_link_libraries(${TARGET} PUBLIC epoll-shim::epoll-shim 
${CMAKE_DL_LIBS})
     else()
-        fetch_dependency(epoll-shim 
https://github.com/jiixyj/epoll-shim/archive/refs/tags/v0.0.20240608.tar.gz 
9751ab5cad7bff8a1388a951276247bf)
+        fetch_dependency(epoll-shim 
https://github.com/jiixyj/epoll-shim/archive/refs/tags/v0.0.20240608.tar.gz 
9751ab5cad7bff8a1388a951276247bf TRUE)
         target_link_libraries(${TARGET} PUBLIC epoll-shim)
     endif()
     add_compile_definitions(EPOLL_SHIM_NO_VARIADICS)
diff --git a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt 
b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
index ea4abb5be51..402291e4304 100644
--- a/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
+++ b/modules/platforms/cpp/tests/fake_server/CMakeLists.txt
@@ -24,4 +24,4 @@ set(SOURCES
     connection_test.cpp
 )
 
-ignite_test(${TARGET} SOURCES ${SOURCES} LIBS ignite-test-common 
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
+ignite_test(${TARGET} SOURCES ${SOURCES} LIBS asio ignite-test-common 
ignite3-client msgpack-c ignite-protocol ignite-tuple)
\ No newline at end of file
diff --git a/modules/platforms/cpp/tests/fake_server/connection_test.cpp 
b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
index d1146b1ae3e..393d6f92338 100644
--- a/modules/platforms/cpp/tests/fake_server/connection_test.cpp
+++ b/modules/platforms/cpp/tests/fake_server/connection_test.cpp
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-#include "tests/client-test/ignite_runner_suite.h"
-#include "ignite/client/ignite_client.h"
 #include "fake_server.h"
+#include "ignite/client/ignite_client.h"
+#include "proxy/asio_proxy.h"
+#include "tests/client-test/ignite_runner_suite.h"
 
 #include <gtest/gtest.h>
 #include <thread>
 
+
 using namespace ignite;
 using namespace std::chrono_literals;
 
@@ -76,3 +78,32 @@ TEST_F(connection_test, request_timeout) {
         EXPECT_EQ(error::code::OPERATION_TIMEOUT, err.get_status_code());
     }
 }
+
+TEST_F(connection_test, using_asio) {
+    fake_server fs{50900, get_logger()};
+    fs.start();
+
+    auto in_listener = std::make_shared<proxy::message_listener>();
+    auto out_listener = std::make_shared<proxy::message_listener>();
+
+    proxy::asio_proxy proxy{
+        {
+            proxy::configuration(50800, "127.0.0.1:50900", in_listener, 
out_listener)
+        },
+        get_logger()
+    };
+
+
+    ignite_client_configuration cfg;
+    cfg.set_logger(get_logger());
+    cfg.set_endpoints(get_endpoints());
+
+    auto cl = ignite_client::start(cfg, 5s);
+
+    auto cluster_nodes = cl.get_cluster_nodes();
+
+    ASSERT_EQ(1, cluster_nodes.size());
+
+    ASSERT_GT(in_listener->get_msg_queue().size(), 1);
+    ASSERT_GT(out_listener->get_msg_queue().size(), 1);
+}
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h 
b/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
new file mode 100644
index 00000000000..976f2faa8a8
--- /dev/null
+++ b/modules/platforms/cpp/tests/fake_server/proxy/asio_proxy.h
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+#include "gtest_logger.h"
+
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <queue>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ts/internet.hpp>
+
+#include "message_listener.h"
+
+#include <complex>
+#include <list>
+
+namespace ignite::proxy {
+
+using asio::ip::tcp;
+
+static constexpr size_t BUFF_SIZE = 4096;
+
+struct configuration {
+    asio::ip::port_type m_in_port;
+    std::string m_out_host_and_port;
+    std::shared_ptr<message_listener> m_in_listener;
+    std::shared_ptr<message_listener> m_out_listener;
+
+    configuration(
+        asio::ip::port_type m_in_port,
+        const std::string &m_out_host_and_port,
+        std::shared_ptr<message_listener> in_listener,
+        std::shared_ptr<message_listener> out_listener)
+        : m_in_port(m_in_port)
+        , m_out_host_and_port(m_out_host_and_port)
+        , m_in_listener(std::move(in_listener))
+        , m_out_listener(std::move(out_listener)) {}
+};
+
+struct proxy_entry {
+    tcp::acceptor m_in_acceptor;
+    std::string m_out_host;
+    std::string m_out_port;
+    std::shared_ptr<message_listener> m_in_listener;
+    std::shared_ptr<message_listener> m_out_listener;
+
+    proxy_entry(asio::io_context& io_context,const configuration& cfg)
+        : m_in_acceptor(io_context, tcp::endpoint(tcp::v4(), cfg.m_in_port))
+        , m_in_listener(std::move(cfg.m_in_listener))
+        , m_out_listener(std::move(cfg.m_out_listener))
+    {
+        auto colon_pos = cfg.m_out_host_and_port.find(':');
+
+        if (colon_pos == std::string::npos) {
+            throw std::runtime_error("Incorrect host and part format. Expected 
'hostname:port' but got " + cfg.m_out_host_and_port);
+        }
+
+        m_out_host = cfg.m_out_host_and_port.substr(0, colon_pos);
+        m_out_port = cfg.m_out_host_and_port.substr(colon_pos + 1);
+    }
+};
+
+class session_part: public std::enable_shared_from_this<session_part> {
+public:
+    session_part(
+        std::shared_ptr<tcp::socket> src,
+        std::shared_ptr<tcp::socket> dst,
+        std::shared_ptr<message_listener> listener,
+        std::atomic_bool& failed,
+        std::shared_ptr<gtest_logger> logger)
+        : m_src(std::move(src))
+        , m_dst(std::move(dst))
+        , m_listener(std::move(listener))
+        , m_failed(failed)
+        , m_logger(std::move(logger)) {}
+
+    void do_read() {
+        m_src->async_read_some(asio::buffer(m_buf, BUFF_SIZE),
+        [self = this->shared_from_this()](const asio::error_code& ec, size_t 
len) {
+            if (ec) {
+                if (ec == asio::error::eof) {
+                    return;
+                }
+                self->m_logger->log_error("Error while reading from socket " + 
ec.message());
+
+                self->m_failed.store(true);
+            }
+
+            message m{self->m_buf.begin(), self->m_buf.begin() + len};
+
+            if (self->m_listener) {
+                self->m_listener->register_message(m);
+            }
+
+            self->do_write(std::move(m));
+        });
+    }
+
+    void do_write(message&& msg) {
+        asio::async_write(
+            *m_dst, asio::buffer(msg.data(), msg.size()),
+            [self = shared_from_this()](asio::error_code ec, size_t) {
+                if (ec) {
+                    if (ec == asio::error::eof) {
+                        return;
+                    }
+                    self->m_logger->log_error("Error while writing to socket " 
+ ec.message());
+
+                    self->m_failed.store(true);
+                }
+
+                self->do_read();
+            });
+    }
+
+private:
+    std::shared_ptr<tcp::socket> m_src;
+    std::shared_ptr<tcp::socket> m_dst;
+    std::array<char, BUFF_SIZE> m_buf{};
+    std::shared_ptr<message_listener> m_listener{nullptr};
+    std::atomic_bool& m_failed;
+    std::shared_ptr<gtest_logger> m_logger;
+};
+
+class session : public std::enable_shared_from_this<session> {
+public:
+    session(
+        std::shared_ptr<tcp::socket> in_sock,
+        std::shared_ptr<tcp::socket> out_sock,
+        std::shared_ptr<message_listener> in_listener,
+        std::shared_ptr<message_listener> out_listener,
+        std::atomic_bool& failed,
+        std::shared_ptr<gtest_logger> logger)
+        : m_in_sock(std::move(in_sock))
+        , m_out_sock(std::move(out_sock))
+    {
+        m_forward_part = std::make_shared<session_part>(m_in_sock, m_out_sock, 
in_listener, failed, logger);
+        m_reverse_part = std::make_shared<session_part>(m_out_sock, m_in_sock, 
out_listener, failed, logger);
+    }
+
+    void connect(const tcp::resolver::results_type& endpoints) {
+        asio::async_connect(*m_out_sock, endpoints,
+            [self=shared_from_this()](const asio::error_code &ec, const 
tcp::endpoint &e) {
+                if (ec) {
+                    throw std::runtime_error(
+                        "Error connecting to server " + ec.message()
+                        + " port=" + std::to_string(e.port())
+                    );
+                }
+
+                self->do_serve();
+            });
+    }
+private:
+    void do_serve() {
+        m_forward_part->do_read();
+        m_reverse_part->do_read();
+    }
+
+    std::shared_ptr<tcp::socket> m_in_sock;
+    std::shared_ptr<tcp::socket> m_out_sock;
+
+    std::shared_ptr<session_part> m_forward_part;
+    std::shared_ptr<session_part> m_reverse_part;
+};
+
+class asio_proxy {
+public:
+    asio_proxy(std::vector<configuration> configurations, 
std::shared_ptr<gtest_logger> logger)
+        : m_resolver(m_io_context)
+        , m_logger(std::move(std::move(logger)))
+    {
+        for (auto &cfg : configurations) {
+            m_conn_map.emplace(
+                cfg.m_in_port,
+                proxy_entry{m_io_context, cfg}
+            );
+        }
+
+        do_serve();
+
+        m_executor = std::make_unique<std::thread>([this]() {
+            m_io_context.run();
+        });
+    }
+
+    ~asio_proxy() {
+        m_stopped.store(true);
+        m_io_context.stop();
+
+        m_executor->join();
+
+        if (m_failed.load()) {
+            ADD_FAILURE() << "Proxy error occurred during test execution";
+        }
+    }
+
+private:
+    void do_serve() {
+        for (auto& [_, entry]: m_conn_map) {
+            do_accept(entry);
+        }
+    }
+
+    void do_accept(proxy_entry& entry) {
+        if (m_stopped.load()) {
+            return;
+        }
+
+        entry.m_in_acceptor.async_accept([this, &entry](asio::error_code ec, 
tcp::socket in_sock) {
+            if (ec) {
+                throw std::runtime_error("Error accepting incoming connection 
" + ec.message());
+            }
+
+            auto p_in_sock = std::make_shared<tcp::socket>(std::move(in_sock));
+            auto p_out_sock = std::make_shared<tcp::socket>(m_io_context);
+            auto ses = std::make_shared<session>(
+                p_in_sock,
+                p_out_sock,
+                entry.m_in_listener,
+                entry.m_out_listener,
+                this->m_failed,
+                m_logger
+            );
+
+            tcp::resolver &resolver = m_resolver;
+            resolver.async_resolve(entry.m_out_host, entry.m_out_port,
+                [ses](
+                    asio::error_code ec, tcp::resolver::results_type 
endpoints) { // NOLINT(*-unnecessary-value-param)
+                    if (ec) {
+                        throw std::runtime_error("Error resolving server's 
address " + ec.message());
+                    }
+
+                    ses->connect(endpoints);
+                });
+
+            do_accept(entry);
+        });
+    }
+
+    std::map<asio::ip::port_type, proxy_entry> m_conn_map{};
+
+    asio::io_context m_io_context{};
+    std::unique_ptr<std::thread> m_executor{};
+
+    tcp::resolver m_resolver;
+
+    std::shared_ptr<gtest_logger> m_logger;
+
+    std::atomic_bool m_stopped{false};
+
+    std::atomic_bool m_failed{false};
+};
+} // namespace ignite::proxy
diff --git a/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h 
b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
new file mode 100644
index 00000000000..92773b60e39
--- /dev/null
+++ b/modules/platforms/cpp/tests/fake_server/proxy/message_listener.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#pragma once
+#include <queue>
+#include <utility>
+
+namespace ignite::proxy {
+
+using message = std::vector<char>;
+
+class message_listener {
+public:
+    void register_message(message msg) {
+        m_queue.push(std::move(msg));
+    }
+
+    const std::queue<message>& get_msg_queue() const {
+        return m_queue;
+    }
+
+private:
+    std::queue<message> m_queue{};
+};
+} // namespace ignite::proxy

Reply via email to