leekeiabstraction commented on code in PR #476:
URL: https://github.com/apache/fluss-rust/pull/476#discussion_r3047783188


##########
bindings/cpp/CMakeLists.txt:
##########
@@ -25,6 +25,7 @@ project(fluss-cpp LANGUAGES CXX)
 
 include(FetchContent)
 set(FLUSS_GOOGLETEST_VERSION 1.15.2 CACHE STRING "version of GoogleTest")
+set(FLUSS_NLOHMANN_JSON_VERSION 3.11.3 CACHE STRING "version of nlohmann/json")

Review Comment:
   Why not use latest version 3.12.0?



##########
bindings/cpp/test/test_utils.h:
##########
@@ -21,266 +21,124 @@
 
 #include <gtest/gtest.h>
 
-#include <algorithm>
 #include <chrono>
 #include <cstdio>
 #include <cstdlib>
 #include <cstring>
+#include <fstream>
+#include <nlohmann/json.hpp>
 #include <string>
-#include <thread>
 #include <vector>
 
-#ifdef _WIN32
-#include <winsock2.h>
-#include <ws2tcpip.h>
-#pragma comment(lib, "ws2_32.lib")
-#else
-#include <arpa/inet.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#endif
-
 #include "fluss.hpp"
 
-// Macro to assert Result is OK and print error message on failure
 #define ASSERT_OK(result) ASSERT_TRUE((result).Ok()) << (result).error_message
 #define EXPECT_OK(result) EXPECT_TRUE((result).Ok()) << (result).error_message
 
 namespace fluss_test {
 
-static constexpr const char* kFlussImage = "apache/fluss";
-static constexpr const char* kFlussVersion = "0.9.0-incubating";
-static constexpr const char* kNetworkName = "fluss-cpp-test-network";
-static constexpr const char* kZookeeperName = "zookeeper-cpp-test";
-static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test";
-static constexpr const char* kTabletServerName = "tablet-server-cpp-test";
-static constexpr int kCoordinatorPort = 9123;
-static constexpr int kTabletServerPort = 9124;
-static constexpr int kPlainClientPort = 9223;
-static constexpr int kPlainClientTabletPort = 9224;
-
-/// Execute a shell command and return its exit code.
-inline int RunCommand(const std::string& cmd) { return system(cmd.c_str()); }
-
-/// Join property lines with the escaped newline separator used by `printf` in 
docker commands.
-inline std::string JoinProps(const std::vector<std::string>& lines) {
-    std::string result;
-    for (size_t i = 0; i < lines.size(); ++i) {
-        if (i > 0) result += "\\n";
-        result += lines[i];
+inline std::string FindCliBinary() {
+    const char* env_bin = std::getenv("FLUSS_TEST_CLUSTER_BIN");
+    if (env_bin && std::strlen(env_bin) > 0) {
+        if (std::ifstream(env_bin).good()) {
+            return env_bin;
+        }
+        std::cerr << "FLUSS_TEST_CLUSTER_BIN is set to '" << env_bin
+                  << "' but that file does not exist." << std::endl;
+        std::abort();
     }
-    return result;
-}
-
-/// Build a `docker run` command with FLUSS_PROPERTIES.
-inline std::string DockerRunCmd(const std::string& name, const std::string& 
props,
-                                const std::vector<std::string>& port_mappings,
-                                const std::string& server_type) {
-    std::string cmd = "docker run -d --rm --name " + name + " --network " + 
kNetworkName;
-    for (const auto& pm : port_mappings) {
-        cmd += " -p " + pm;
+    FILE* pipe = popen("cargo locate-project --workspace --message-format 
plain", "r");
+    if (pipe) {
+        char buf[512];
+        std::string root;
+        while (fgets(buf, sizeof(buf), pipe)) root += buf;
+        if (pclose(pipe) == 0) {
+            // cargo returns path to Cargo.toml; strip filename + trailing 
whitespace.
+            while (!root.empty() && (root.back() == '\n' || root.back() == 
'\r')) root.pop_back();
+            auto slash = root.rfind('/');
+            if (slash != std::string::npos) {
+                std::string dir = root.substr(0, slash);
+                for (const char* profile : {"debug", "release"}) {
+                    std::string path = dir + "/target/" + profile + 
"/fluss-test-cluster";
+                    if (std::ifstream(path).good()) return path;
+                }
+            }
+        }
     }
-    cmd += " -e FLUSS_PROPERTIES=\"$(printf '" + props + "')\"";
-    cmd += " " + std::string(kFlussImage) + ":" + kFlussVersion + " " + 
server_type;
-    return cmd;
+    return "fluss-test-cluster";
 }
 
-/// Wait until a TCP port is accepting connections, or timeout.
-inline bool WaitForPort(const std::string& host, int port, int timeout_seconds 
= 60) {
-    auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(timeout_seconds);
-
-    while (std::chrono::steady_clock::now() < deadline) {
-        int sock = socket(AF_INET, SOCK_STREAM, 0);
-        if (sock < 0) {
-            std::this_thread::sleep_for(std::chrono::milliseconds(500));
-            continue;
-        }
+constexpr const char* kClusterName = "shared-test";
 
-        struct sockaddr_in addr {};
-        addr.sin_family = AF_INET;
-        addr.sin_port = htons(static_cast<uint16_t>(port));
-        inet_pton(AF_INET, host.c_str(), &addr.sin_addr);
+inline std::string CliStartCmd() {
+    return FindCliBinary() + " start --sasl --name " + kClusterName;
+}
 
-        int result = connect(sock, reinterpret_cast<struct sockaddr*>(&addr), 
sizeof(addr));
-#ifdef _WIN32
-        closesocket(sock);
-#else
-        close(sock);
-#endif
-        if (result == 0) {
-            return true;
+inline bool ParseClusterJson(const std::string& output, std::string& bootstrap,
+                             std::string& sasl_bootstrap) {
+    // Last non-empty line is the JSON (progress goes to stderr).
+    auto last_nl = output.rfind('\n', output.size() - 2);
+    std::string line = (last_nl != std::string::npos) ? output.substr(last_nl 
+ 1) : output;
+    try {
+        auto info = nlohmann::json::parse(line);
+        bootstrap = info.at("bootstrap_servers").get<std::string>();
+        if (info.contains("sasl_bootstrap_servers") && 
!info["sasl_bootstrap_servers"].is_null()) {
+            sasl_bootstrap = info["sasl_bootstrap_servers"].get<std::string>();
         }
-
-        std::this_thread::sleep_for(std::chrono::milliseconds(500));
+        return true;
+    } catch (const nlohmann::json::exception&) {
+        return false;

Review Comment:
   Can we log the exception message?



##########
bindings/cpp/test/test_utils.h:
##########
@@ -21,266 +21,124 @@
 
 #include <gtest/gtest.h>
 
-#include <algorithm>
 #include <chrono>
 #include <cstdio>
 #include <cstdlib>
 #include <cstring>
+#include <fstream>
+#include <nlohmann/json.hpp>
 #include <string>
-#include <thread>
 #include <vector>
 
-#ifdef _WIN32
-#include <winsock2.h>
-#include <ws2tcpip.h>
-#pragma comment(lib, "ws2_32.lib")
-#else
-#include <arpa/inet.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#endif
-
 #include "fluss.hpp"
 
-// Macro to assert Result is OK and print error message on failure
 #define ASSERT_OK(result) ASSERT_TRUE((result).Ok()) << (result).error_message
 #define EXPECT_OK(result) EXPECT_TRUE((result).Ok()) << (result).error_message
 
 namespace fluss_test {
 
-static constexpr const char* kFlussImage = "apache/fluss";
-static constexpr const char* kFlussVersion = "0.9.0-incubating";
-static constexpr const char* kNetworkName = "fluss-cpp-test-network";
-static constexpr const char* kZookeeperName = "zookeeper-cpp-test";
-static constexpr const char* kCoordinatorName = "coordinator-server-cpp-test";
-static constexpr const char* kTabletServerName = "tablet-server-cpp-test";
-static constexpr int kCoordinatorPort = 9123;
-static constexpr int kTabletServerPort = 9124;
-static constexpr int kPlainClientPort = 9223;
-static constexpr int kPlainClientTabletPort = 9224;
-
-/// Execute a shell command and return its exit code.
-inline int RunCommand(const std::string& cmd) { return system(cmd.c_str()); }
-
-/// Join property lines with the escaped newline separator used by `printf` in 
docker commands.
-inline std::string JoinProps(const std::vector<std::string>& lines) {
-    std::string result;
-    for (size_t i = 0; i < lines.size(); ++i) {
-        if (i > 0) result += "\\n";
-        result += lines[i];
+inline std::string FindCliBinary() {
+    const char* env_bin = std::getenv("FLUSS_TEST_CLUSTER_BIN");
+    if (env_bin && std::strlen(env_bin) > 0) {
+        if (std::ifstream(env_bin).good()) {
+            return env_bin;
+        }
+        std::cerr << "FLUSS_TEST_CLUSTER_BIN is set to '" << env_bin
+                  << "' but that file does not exist." << std::endl;
+        std::abort();
     }
-    return result;
-}
-
-/// Build a `docker run` command with FLUSS_PROPERTIES.
-inline std::string DockerRunCmd(const std::string& name, const std::string& 
props,
-                                const std::vector<std::string>& port_mappings,
-                                const std::string& server_type) {
-    std::string cmd = "docker run -d --rm --name " + name + " --network " + 
kNetworkName;
-    for (const auto& pm : port_mappings) {
-        cmd += " -p " + pm;
+    FILE* pipe = popen("cargo locate-project --workspace --message-format 
plain", "r");
+    if (pipe) {
+        char buf[512];
+        std::string root;
+        while (fgets(buf, sizeof(buf), pipe)) root += buf;
+        if (pclose(pipe) == 0) {
+            // cargo returns path to Cargo.toml; strip filename + trailing 
whitespace.
+            while (!root.empty() && (root.back() == '\n' || root.back() == 
'\r')) root.pop_back();
+            auto slash = root.rfind('/');
+            if (slash != std::string::npos) {
+                std::string dir = root.substr(0, slash);
+                for (const char* profile : {"debug", "release"}) {
+                    std::string path = dir + "/target/" + profile + 
"/fluss-test-cluster";
+                    if (std::ifstream(path).good()) return path;
+                }
+            }
+        }
     }
-    cmd += " -e FLUSS_PROPERTIES=\"$(printf '" + props + "')\"";
-    cmd += " " + std::string(kFlussImage) + ":" + kFlussVersion + " " + 
server_type;
-    return cmd;
+    return "fluss-test-cluster";
 }
 
-/// Wait until a TCP port is accepting connections, or timeout.
-inline bool WaitForPort(const std::string& host, int port, int timeout_seconds 
= 60) {
-    auto deadline = std::chrono::steady_clock::now() + 
std::chrono::seconds(timeout_seconds);
-
-    while (std::chrono::steady_clock::now() < deadline) {
-        int sock = socket(AF_INET, SOCK_STREAM, 0);
-        if (sock < 0) {
-            std::this_thread::sleep_for(std::chrono::milliseconds(500));
-            continue;
-        }
+constexpr const char* kClusterName = "shared-test";
 
-        struct sockaddr_in addr {};
-        addr.sin_family = AF_INET;
-        addr.sin_port = htons(static_cast<uint16_t>(port));
-        inet_pton(AF_INET, host.c_str(), &addr.sin_addr);
+inline std::string CliStartCmd() {
+    return FindCliBinary() + " start --sasl --name " + kClusterName;
+}
 
-        int result = connect(sock, reinterpret_cast<struct sockaddr*>(&addr), 
sizeof(addr));
-#ifdef _WIN32
-        closesocket(sock);
-#else
-        close(sock);
-#endif
-        if (result == 0) {
-            return true;
+inline bool ParseClusterJson(const std::string& output, std::string& bootstrap,
+                             std::string& sasl_bootstrap) {
+    // Last non-empty line is the JSON (progress goes to stderr).
+    auto last_nl = output.rfind('\n', output.size() - 2);

Review Comment:
   This seems a bit fragile. Can we search for specific string token e.g. 
command outputs something like CLUSTER_JSON: {...}
   
   We can also use other string token to signal other cases e.g. TIMEOUT: 
timeout waiting for cluster to be ready. This surface issues better than 
swallowing error from parsing a non json line from timeout trying to spin up 
cluster 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to