pitrou commented on code in PR #43766: URL: https://github.com/apache/arrow/pull/43766#discussion_r1722904328
########## cpp/src/arrow/testing/process.cc: ########## @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/testing/process.h" + +// This boost/asio/io_context.hpp include is needless for no MinGW +// build. +// +// This is for including boost/asio/detail/socket_types.hpp before any +// "#include <windows.h>". boost/asio/detail/socket_types.hpp doesn't +// work if windows.h is already included. +#include <boost/asio/io_context.hpp> + +#ifdef BOOST_PROCESS_NEED_SOURCE +// Workaround for https://github.com/boostorg/process/issues/312 +#define BOOST_PROCESS_V2_SEPARATE_COMPILATION +#include <boost/process/v2.hpp> +#include <boost/process/v2/src.hpp> +#else +#include <boost/process/v2.hpp> +#endif + +#ifdef __APPLE__ +#include <limits.h> +#include <mach-o/dyld.h> +#endif + +#include <chrono> +#include <iostream> +#include <sstream> +#include <thread> + +namespace asio = BOOST_PROCESS_V2_ASIO_NAMESPACE; +namespace process = BOOST_PROCESS_V2_NAMESPACE; + +namespace arrow::util { + +class Process::Impl { + private: + process::filesystem::path executable_; + std::vector<std::string> args_; + std::unordered_map<process::environment::key, process::environment::value> env_; + std::string marker_; + std::unique_ptr<process::process> process_; + asio::io_context ctx_; + + Status ResolveCurrentExecutable(process::filesystem::path* out) { Review Comment: Why not return a `Result<path>`? ########## cpp/src/arrow/filesystem/gcsfs_test.cc: ########## @@ -99,60 +77,55 @@ class GcsTestbench : public ::testing::Environment { "Could not start GCS emulator." " Used the following list of python interpreter names:"); for (const auto& interpreter : names) { - auto exe_path = bp::search_path(interpreter); + auto server_process = std::make_unique<util::Process>(); error += " " + interpreter; - if (exe_path.empty()) { + auto status = server_process->SetExecutable(interpreter); + if (!status.ok()) { error += " (exe not found)"; continue; } - bp::ipstream output; - server_process_ = bp::child(exe_path, "-m", "testbench", "--port", port_, group_, - bp::std_err > output); - // Wait for message: "* Restarting with" - auto testbench_is_running = [&output, this](bp::child& process) { - std::string line; - std::chrono::time_point<std::chrono::steady_clock> end = - std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (server_process_.valid() && server_process_.running() && - std::chrono::steady_clock::now() < end) { - if (output.peek() && std::getline(output, line)) { - std::cerr << line << std::endl; - if (line.find("* Restarting with") != std::string::npos) return true; - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - } - } - return false; - }; - - if (testbench_is_running(server_process_)) break; - error += " (failed to start)"; - server_process_.terminate(); - server_process_.wait(); + status = server_process->SetArgs({"-m", "testbench", "--port", port_}); + if (!status.ok()) { + error += " (failed to set args: "; + error += status.ToString(); + error += ")"; + continue; + } + + status = server_process->SetReadyErrorMessage("* Restarting with"); + if (!status.ok()) { + error += " (failed to set ready error message: "; + error += status.ToString(); + error += ")"; + continue; + } + + status = server_process->Execute(); + if (!status.ok()) { + error += " (failed to launch: "; + error += status.ToString(); + error += ")"; + continue; + } + + server_process_ = std::move(server_process); + break; } - if (server_process_.valid() && server_process_.valid()) return; + if (server_process_) return; error_ = std::move(error); } - bool running() { return server_process_.running(); } + bool running() { return server_process_ && server_process_->IsRunning(); } - ~GcsTestbench() override { - // Brutal shutdown, kill the full process group because the GCS testbench may launch - // additional children. - group_.terminate(); - if (server_process_.valid()) { - server_process_.wait(); - } - } + ~GcsTestbench() override { server_process_ = nullptr; } Review Comment: Is it possible to keep the "kill process group" behavior, or is that impossible with bp v2? ########## cpp/src/arrow/filesystem/s3_test_util.cc: ########## @@ -105,44 +83,26 @@ Status MinioTestServer::Start() { ARROW_ASSIGN_OR_RAISE(impl_->temp_dir_, TemporaryDir::Make("s3fs-test-")); - // Get a copy of the current environment. - // (NOTE: using "auto" would return a native_environment that mutates - // the current environment) - bp::environment env = boost::this_process::environment(); - env["MINIO_ACCESS_KEY"] = kMinioAccessKey; - env["MINIO_SECRET_KEY"] = kMinioSecretKey; - // Disable the embedded console (one less listening address to care about) - env["MINIO_BROWSER"] = "off"; - + impl_->server_process_ = std::make_unique<util::Process>(); + ARROW_RETURN_NOT_OK( + impl_->server_process_->SetEnv("MINIO_ACCESS_KEY", kMinioAccessKey)); + ARROW_RETURN_NOT_OK( + impl_->server_process_->SetEnv("MINIO_ACCESS_KEY", kMinioAccessKey)); + ARROW_RETURN_NOT_OK( + impl_->server_process_->SetEnv("MINIO_SECRET_KEY", kMinioSecretKey)); + ARROW_RETURN_NOT_OK(impl_->server_process_->SetEnv("MINIO_BROWSER", "off")); Review Comment: Can you keep the comment from above? ########## cpp/src/arrow/filesystem/azurefs_test.cc: ########## @@ -65,7 +48,7 @@ namespace arrow { using internal::TemporaryDir; namespace fs { using internal::ConcatAbstractPath; -namespace bp = boost::process; +// namespace bp = BOOST_PROCESS_V2_NAMESPACE; Review Comment: Remove this? ########## cpp/src/arrow/filesystem/s3_test_util.cc: ########## @@ -105,44 +83,26 @@ Status MinioTestServer::Start() { ARROW_ASSIGN_OR_RAISE(impl_->temp_dir_, TemporaryDir::Make("s3fs-test-")); - // Get a copy of the current environment. - // (NOTE: using "auto" would return a native_environment that mutates - // the current environment) - bp::environment env = boost::this_process::environment(); - env["MINIO_ACCESS_KEY"] = kMinioAccessKey; - env["MINIO_SECRET_KEY"] = kMinioSecretKey; - // Disable the embedded console (one less listening address to care about) - env["MINIO_BROWSER"] = "off"; - + impl_->server_process_ = std::make_unique<util::Process>(); + ARROW_RETURN_NOT_OK( + impl_->server_process_->SetEnv("MINIO_ACCESS_KEY", kMinioAccessKey)); + ARROW_RETURN_NOT_OK( + impl_->server_process_->SetEnv("MINIO_ACCESS_KEY", kMinioAccessKey)); Review Comment: This line is duplicated? ########## cpp/src/arrow/testing/process.cc: ########## @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/testing/process.h" + +// This boost/asio/io_context.hpp include is needless for no MinGW +// build. +// +// This is for including boost/asio/detail/socket_types.hpp before any +// "#include <windows.h>". boost/asio/detail/socket_types.hpp doesn't +// work if windows.h is already included. +#include <boost/asio/io_context.hpp> + +#ifdef BOOST_PROCESS_NEED_SOURCE +// Workaround for https://github.com/boostorg/process/issues/312 +#define BOOST_PROCESS_V2_SEPARATE_COMPILATION +#include <boost/process/v2.hpp> +#include <boost/process/v2/src.hpp> +#else +#include <boost/process/v2.hpp> +#endif + +#ifdef __APPLE__ +#include <limits.h> +#include <mach-o/dyld.h> +#endif + +#include <chrono> +#include <iostream> +#include <sstream> +#include <thread> + +namespace asio = BOOST_PROCESS_V2_ASIO_NAMESPACE; +namespace process = BOOST_PROCESS_V2_NAMESPACE; + +namespace arrow::util { + +class Process::Impl { + private: + process::filesystem::path executable_; + std::vector<std::string> args_; + std::unordered_map<process::environment::key, process::environment::value> env_; + std::string marker_; + std::unique_ptr<process::process> process_; + asio::io_context ctx_; + + Status ResolveCurrentExecutable(process::filesystem::path* out) { + // See https://stackoverflow.com/a/1024937/10194 for various + // platform-specific recipes. + + boost::system::error_code ec; + +#if defined(__linux__) + *out = process::filesystem::canonical("/proc/self/exe", ec); +#elif defined(__APPLE__) + char buf[PATH_MAX + 1]; + uint32_t bufsize = sizeof(buf); + if (_NSGetExecutablePath(buf, &bufsize) < 0) { + return Status::Invalid("Can't resolve current exe: path too large"); + } + *out = process::filesystem::canonical(buf, ec); +#elif defined(_WIN32) + char buf[MAX_PATH + 1]; + if (!GetModuleFileNameA(NULL, buf, sizeof(buf))) { + return Status::Invalid("Can't get executable file path"); + } + *out = process::filesystem::canonical(buf, ec); +#else + ARROW_UNUSED(ec); + return Status::NotImplemented("Not available on this system"); +#endif + if (ec) { + // XXX fold this into the Status class? + return Status::IOError("Can't resolve current exe: ", ec.message()); + } else { + return Status::OK(); + } + } + + public: + Impl() { + // Get a copy of the current environment. + for (const auto& kv : process::environment::current()) { + env_[kv.key()] = kv.value().string(); + } + } + + ~Impl() { process_ = nullptr; } + + Status SetExecutable(const std::string& name) { + executable_ = process::environment::find_executable(name); + if (executable_.empty()) { + // Search the current executable directory as fallback. + boost::filesystem::path current_exe; + auto status = ResolveCurrentExecutable(¤t_exe); + if (status.ok()) { + std::unordered_map<process::environment::key, process::environment::value> env = { + {"PATH", current_exe.parent_path().string()}, + }; + executable_ = process::environment::find_executable(name, env); + } + } + if (executable_.empty()) { + return Status::IOError("Failed to find '", name, "' in PATH"); + } + return Status::OK(); + } + + Status SetArgs(const std::vector<std::string>& args) { + args_ = args; + return Status::OK(); + } + + Status SetEnv(const std::string& name, const std::string& value) { + // Workaround for https://github.com/boostorg/process/issues/365 + env_[name] = std::string(value); + return Status::OK(); + } + + Status SetReadyErrorMessage(const std::string& marker) { + marker_ = marker; + return Status::OK(); + } + + Status Execute() { + try { + process::process_environment env(env_); + if (marker_.empty()) { + // We can't use std::make_unique<process::process>. + process_ = std::unique_ptr<process::process>( + new process::process(ctx_, executable_, args_, env)); + return Status::OK(); + } + + asio::readable_pipe stderr(ctx_); + // We can't use std::make_unique<process::process>. + process_ = std::unique_ptr<process::process>(new process::process( + ctx_, executable_, args_, env, process::process_stdio{{}, {}, stderr})); + std::stringstream buffered_output; + std::array<char, 1024> buffer; + std::string line; + auto timeout = std::chrono::seconds(10); + std::chrono::time_point<std::chrono::steady_clock> end = + std::chrono::steady_clock::now() + timeout; + while (process_->running() && std::chrono::steady_clock::now() < end) { + auto read_bytes = stderr.read_some(asio::buffer(buffer.data(), buffer.size())); + if (buffered_output.eof()) { + buffered_output.clear(); + auto last = buffered_output.str().size(); + buffered_output.seekg(last); + buffered_output.seekp(last); + } Review Comment: What does this do? Can you add an explanatory comment? ########## cpp/src/arrow/filesystem/gcsfs_test.cc: ########## @@ -99,60 +77,55 @@ class GcsTestbench : public ::testing::Environment { "Could not start GCS emulator." " Used the following list of python interpreter names:"); for (const auto& interpreter : names) { - auto exe_path = bp::search_path(interpreter); + auto server_process = std::make_unique<util::Process>(); error += " " + interpreter; - if (exe_path.empty()) { + auto status = server_process->SetExecutable(interpreter); + if (!status.ok()) { error += " (exe not found)"; continue; } - bp::ipstream output; - server_process_ = bp::child(exe_path, "-m", "testbench", "--port", port_, group_, - bp::std_err > output); - // Wait for message: "* Restarting with" - auto testbench_is_running = [&output, this](bp::child& process) { - std::string line; - std::chrono::time_point<std::chrono::steady_clock> end = - std::chrono::steady_clock::now() + std::chrono::seconds(10); - while (server_process_.valid() && server_process_.running() && - std::chrono::steady_clock::now() < end) { - if (output.peek() && std::getline(output, line)) { - std::cerr << line << std::endl; - if (line.find("* Restarting with") != std::string::npos) return true; - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(20)); - } - } - return false; - }; - - if (testbench_is_running(server_process_)) break; - error += " (failed to start)"; - server_process_.terminate(); - server_process_.wait(); + status = server_process->SetArgs({"-m", "testbench", "--port", port_}); + if (!status.ok()) { Review Comment: Is it useful to add this information, especially as `SetArgs` cannot actually fail? (same question below) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
