This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch MINIFICPP-2567 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 330963e9d035aec9688672a985b3d34bbc31f49b Author: Martin Zink <[email protected]> AuthorDate: Wed May 21 13:31:04 2025 +0200 MINIFICPP-2567 C++23 Support --- cmake/Asio.cmake | 4 +- cmake/BundledOpenCV.cmake | 6 + cmake/BundledRocksDB.cmake | 17 +- cmake/Couchbase.cmake | 4 +- cmake/CppVersion.cmake | 19 +- cmake/GoogleCloudCpp.cmake | 4 +- cmake/LlamaCpp.cmake | 4 +- cmake/RangeV3.cmake | 1 + .../controllers/JsonRecordSetWriter.cpp | 2 +- .../controllers/JsonTreeReader.cpp | 2 +- .../standard-processors/modbus/ByteConverters.h | 1 + .../standard-processors/tests/unit/PutTCPTests.cpp | 2 +- .../tests/unit/RecordSetTests.cpp | 16 +- libminifi/src/core/RecordField.cpp | 6 +- libminifi/test/libtest/unit/TestRecord.h | 12 +- libminifi/test/unit/NetUtilsTest.cpp | 8 +- minifi-api/include/minifi-cpp/core/RecordField.h | 26 +- thirdparty/couchbase/c++23_fixes.patch | 78 +++ thirdparty/google-cloud-cpp/c++23_fixes.patch | 19 + thirdparty/opencv/c++23_fixes.patch | 23 + thirdparty/rocksdb/all/patches/arm7.patch | 15 - thirdparty/rocksdb/all/patches/c++23_fixes.patch | 641 +++++++++++++++++++++ thirdparty/rocksdb/all/patches/cstdint.patch | 12 - utils/include/utils/expected.h | 29 +- utils/include/utils/net/AsioCoro.h | 4 +- utils/include/utils/net/AsioSocketUtils.h | 2 +- utils/src/utils/net/DNS.cpp | 7 +- 27 files changed, 845 insertions(+), 119 deletions(-) diff --git a/cmake/Asio.cmake b/cmake/Asio.cmake index 6f94e6afe..05f45867d 100644 --- a/cmake/Asio.cmake +++ b/cmake/Asio.cmake @@ -18,8 +18,8 @@ include(FetchContent) FetchContent_Declare(asio - URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-28-1.tar.gz - URL_HASH SHA256=5ff6111ec8cbe73a168d997c547f562713aa7bd004c5c02326f0e9d579a5f2ce) + URL https://github.com/chriskohlhoff/asio/archive/refs/tags/asio-1-34-2.tar.gz + URL_HASH SHA256=f3bac015305fbb700545bd2959fbc52d75a1ec2e05f9c7f695801273ceb78cf5) FetchContent_GetProperties(asio) if(NOT asio_POPULATED) diff --git a/cmake/BundledOpenCV.cmake b/cmake/BundledOpenCV.cmake index c53294e51..95384208b 100644 --- a/cmake/BundledOpenCV.cmake +++ b/cmake/BundledOpenCV.cmake @@ -102,12 +102,18 @@ function(use_bundled_opencv SOURCE_DIR BINARY_DIR) append_third_party_passthrough_args(OPENCV_CMAKE_ARGS "${OPENCV_CMAKE_ARGS}") + set(PATCH_FILE_1 "${SOURCE_DIR}/thirdparty/opencv/c++23_fixes.patch") + set(PC ${Bash_EXECUTABLE} -c "set -x &&\ + (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE_1}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_1}\")") + + # Build project ExternalProject_Add( opencv-external URL "https://github.com/opencv/opencv/archive/refs/tags/4.7.0.tar.gz" URL_HASH "SHA256=8df0079cdbe179748a18d44731af62a245a45ebf5085223dc03133954c662973" SOURCE_DIR "${BINARY_DIR}/thirdparty/opencv-src" + PATCH_COMMAND ${PC} CMAKE_ARGS ${OPENCV_CMAKE_ARGS} BUILD_BYPRODUCTS "${BYPRODUCTS}" EXCLUDE_FROM_ALL TRUE diff --git a/cmake/BundledRocksDB.cmake b/cmake/BundledRocksDB.cmake index 85a28304a..9d1f4644c 100644 --- a/cmake/BundledRocksDB.cmake +++ b/cmake/BundledRocksDB.cmake @@ -23,16 +23,15 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR) include(LZ4) endif() - # Patch to fix build issue on ARM7 architecture: https://github.com/facebook/rocksdb/issues/8609#issuecomment-1009572506 - set(PATCH_FILE_1 "${SOURCE_DIR}/thirdparty/rocksdb/all/patches/arm7.patch") - set(PATCH_FILE_2 "${SOURCE_DIR}/thirdparty/rocksdb/all/patches/dboptions_equality_operator.patch") - set(PATCH_FILE_3 "${SOURCE_DIR}/thirdparty/rocksdb/all/patches/cstdint.patch") + set(PATCH_FILE_1 "${SOURCE_DIR}/thirdparty/rocksdb/all/patches/dboptions_equality_operator.patch") + set(PATCH_FILE_2 "${SOURCE_DIR}/thirdparty/rocksdb/all/patches/c++23_fixes.patch") + set(PC ${Bash_EXECUTABLE} -c "set -x &&\ (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE_1}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_1}\") &&\ - (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE_2}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_2}\") &&\ - (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE_3}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_3}\") ") + (\"${Patch_EXECUTABLE}\" -p1 -R -s -f --dry-run -i \"${PATCH_FILE_2}\" || \"${Patch_EXECUTABLE}\" -p1 -N -i \"${PATCH_FILE_2}\") ") + - # Define byproducts +# Define byproducts if (WIN32) set(BYPRODUCT "lib/rocksdb.lib") else() @@ -72,8 +71,8 @@ function(use_bundled_rocksdb SOURCE_DIR BINARY_DIR) # Build project ExternalProject_Add( rocksdb-external - URL "https://github.com/facebook/rocksdb/archive/refs/tags/v8.10.2.tar.gz" - URL_HASH "SHA256=44b6ec2f4723a0d495762da245d4a59d38704e0d9d3d31c45af4014bee853256" + URL "https://github.com/facebook/rocksdb/archive/refs/tags/v10.2.1.tar.gz" + URL_HASH "SHA256=d1ddfd3551e649f7e2d180d5a6a006d90cfde56dcfe1e548c58d95b7f1c87049" SOURCE_DIR "${BINARY_DIR}/thirdparty/rocksdb-src" CMAKE_ARGS ${ROCKSDB_CMAKE_ARGS} PATCH_COMMAND ${PC} diff --git a/cmake/Couchbase.cmake b/cmake/Couchbase.cmake index 74a7e1326..f7a22dd77 100644 --- a/cmake/Couchbase.cmake +++ b/cmake/Couchbase.cmake @@ -32,10 +32,12 @@ set(COUCHBASE_CXX_CLIENT_INSTALL OFF CACHE BOOL "" FORCE) set(PATCH_FILE_1 "${CMAKE_SOURCE_DIR}/thirdparty/couchbase/remove-thirdparty.patch") set(PATCH_FILE_2 "${CMAKE_SOURCE_DIR}/thirdparty/couchbase/remove-debug-symbols.patch") +set(PATCH_FILE_3 "${CMAKE_SOURCE_DIR}/thirdparty/couchbase/c++23_fixes.patch") set(PC ${Bash_EXECUTABLE} -c "set -x &&\ (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\") &&\ - (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_2}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_2}\\\")") + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_2}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_2}\\\") &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_3}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_3}\\\")") FetchContent_Declare(couchbase-cxx-client URL https://github.com/couchbase/couchbase-cxx-client/releases/download/1.0.2/couchbase-cxx-client-1.0.2.tar.gz diff --git a/cmake/CppVersion.cmake b/cmake/CppVersion.cmake index c356e6b40..8b6edee60 100644 --- a/cmake/CppVersion.cmake +++ b/cmake/CppVersion.cmake @@ -23,19 +23,20 @@ function(set_cpp_version) else() message(STATUS "The Visual Studio C++ compiler ${CMAKE_CXX_COMPILER} is not supported. Please use Visual Studio 2022 or newer.") endif() - set(CMAKE_CXX_STANDARD 20 PARENT_SCOPE) + set(CMAKE_CXX_STANDARD 23 PARENT_SCOPE) else() include(CheckCXXCompilerFlag) - CHECK_CXX_COMPILER_FLAG("-std=c++20" COMPILER_SUPPORTS_CXX20) - CHECK_CXX_COMPILER_FLAG("-std=c++2a" COMPILER_SUPPORTS_CXX2A) - if(COMPILER_SUPPORTS_CXX20) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++20" PARENT_SCOPE) - elseif(COMPILER_SUPPORTS_CXX2A) - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++2a" PARENT_SCOPE) + CHECK_CXX_COMPILER_FLAG("-std=c++23" COMPILER_SUPPORTS_CXX23) + CHECK_CXX_COMPILER_FLAG("-std=c++2b" COMPILER_SUPPORTS_CXX2B) + if(COMPILER_SUPPORTS_CXX23) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++23" PARENT_SCOPE) + set(CMAKE_CXX_STANDARD 23 PARENT_SCOPE) + elseif(COMPILER_SUPPORTS_CXX2B) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++2b" PARENT_SCOPE) + set(CMAKE_CXX_STANDARD 23 PARENT_SCOPE) else() - message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no support for -std=c++20 or -std=c++2a. Please use a more recent C++ compiler version.") + message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no support for -std=c++23 or -std=c++2b. Please use a more recent C++ compiler version.") endif() - set(CMAKE_CXX_STANDARD 20 PARENT_SCOPE) endif() set(CMAKE_CXX_STANDARD_REQUIRED ON PARENT_SCOPE) diff --git a/cmake/GoogleCloudCpp.cmake b/cmake/GoogleCloudCpp.cmake index c02804455..59b6984bc 100644 --- a/cmake/GoogleCloudCpp.cmake +++ b/cmake/GoogleCloudCpp.cmake @@ -22,9 +22,11 @@ include(Crc32c) set(PATCH_FILE_1 "${CMAKE_SOURCE_DIR}/thirdparty/google-cloud-cpp/remove-find_package.patch") set(PATCH_FILE_2 "${CMAKE_SOURCE_DIR}/thirdparty/google-cloud-cpp/nlohmann_lib_as_interface.patch") +set(PATCH_FILE_3 "${CMAKE_SOURCE_DIR}/thirdparty/google-cloud-cpp/c++23_fixes.patch") set(PC ${Bash_EXECUTABLE} -c "set -x &&\ (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\") &&\ - (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_2}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_2}\\\")") + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_2}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_2}\\\") &&\ + (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_3}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_3}\\\")") set(GOOGLE_CLOUD_CPP_WITH_MOCKS OFF CACHE BOOL "" FORCE) if (NOT SKIP_TESTS) diff --git a/cmake/LlamaCpp.cmake b/cmake/LlamaCpp.cmake index af4e94994..f45b22bd2 100644 --- a/cmake/LlamaCpp.cmake +++ b/cmake/LlamaCpp.cmake @@ -35,8 +35,8 @@ set(PC ${Bash_EXECUTABLE} -c "set -x &&\ (\\\"${Patch_EXECUTABLE}\\\" -p1 -R -s -f --dry-run -i \\\"${PATCH_FILE_1}\\\" || \\\"${Patch_EXECUTABLE}\\\" -p1 -N -i \\\"${PATCH_FILE_1}\\\")") FetchContent_Declare(llamacpp - URL https://github.com/ggerganov/llama.cpp/archive/refs/tags/b5038.tar.gz - URL_HASH SHA256=5e81c3badc181ed3b7a6ab6bda2abedc80c52527e3c079c7afff4c09f4843564 + URL https://github.com/ggerganov/llama.cpp/archive/refs/tags/b5502.tar.gz + URL_HASH SHA256=9436852125dfe1b33ed47c5fb78bde614d9a8393072c5fa9689d0eaf2727dd1a PATCH_COMMAND "${PC}" ) diff --git a/cmake/RangeV3.cmake b/cmake/RangeV3.cmake index ff26be3a1..e8223985a 100644 --- a/cmake/RangeV3.cmake +++ b/cmake/RangeV3.cmake @@ -22,3 +22,4 @@ FetchContent_Declare(range-v3_src URL_HASH SHA256=015adb2300a98edfceaf0725beec3337f542af4915cec4d0b89fa0886f4ba9cb ) FetchContent_MakeAvailable(range-v3_src) +target_compile_definitions(range-v3 INTERFACE RANGES_CXX_THREAD_LOCAL=201103L) diff --git a/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp b/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp index 0f9e37de7..9a20c2942 100644 --- a/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp +++ b/extensions/standard-processors/controllers/JsonRecordSetWriter.cpp @@ -61,7 +61,7 @@ template<> rapidjson::Value toJson(const core::RecordObject& field, rapidjson::Document::AllocatorType& alloc) { auto object_json = rapidjson::Value(rapidjson::kObjectType); for (const auto& [record_name, record_value] : field) { - auto json_value = (std::visit([&alloc](auto&& f)-> rapidjson::Value{ return toJson(f, alloc); }, record_value.field->value_)); + auto json_value = (std::visit([&alloc](auto&& f)-> rapidjson::Value{ return toJson(f, alloc); }, record_value.value_)); rapidjson::Value json_name(record_name.c_str(), gsl::narrow<rapidjson::SizeType>(record_name.length()), alloc); object_json.AddMember(json_name, json_value, alloc); } diff --git a/extensions/standard-processors/controllers/JsonTreeReader.cpp b/extensions/standard-processors/controllers/JsonTreeReader.cpp index 17b196811..1031a5a62 100644 --- a/extensions/standard-processors/controllers/JsonTreeReader.cpp +++ b/extensions/standard-processors/controllers/JsonTreeReader.cpp @@ -59,7 +59,7 @@ nonstd::expected<core::RecordField, std::error_code> parse(const rapidjson::Valu auto element_field = parse(m.value); if (!element_field) return nonstd::make_unexpected(element_field.error()); - record_object[element_key] = core::BoxedRecordField{std::make_unique<core::RecordField>(std::move(*element_field))}; + record_object.emplace(element_key, std::move(*element_field)); } return core::RecordField{std::move(record_object)}; } diff --git a/extensions/standard-processors/modbus/ByteConverters.h b/extensions/standard-processors/modbus/ByteConverters.h index cc5d38e17..19166c664 100644 --- a/extensions/standard-processors/modbus/ByteConverters.h +++ b/extensions/standard-processors/modbus/ByteConverters.h @@ -17,6 +17,7 @@ */ #pragma once +#include <algorithm> #include <array> #include <cstdint> #include <span> diff --git a/extensions/standard-processors/tests/unit/PutTCPTests.cpp b/extensions/standard-processors/tests/unit/PutTCPTests.cpp index ad6590203..8dc3c5287 100644 --- a/extensions/standard-processors/tests/unit/PutTCPTests.cpp +++ b/extensions/standard-processors/tests/unit/PutTCPTests.cpp @@ -50,7 +50,7 @@ class CancellableTcpServer : public utils::net::TcpServer { void cancelEverything() { for (auto& timer : cancellable_timers_) - io_context_.post([=]{timer->cancel();}); + asio::post(io_context_, [=]{timer->cancel();}); } asio::awaitable<void> doReceive() override { diff --git a/extensions/standard-processors/tests/unit/RecordSetTests.cpp b/extensions/standard-processors/tests/unit/RecordSetTests.cpp index e33822b42..7d1db9994 100644 --- a/extensions/standard-processors/tests/unit/RecordSetTests.cpp +++ b/extensions/standard-processors/tests/unit/RecordSetTests.cpp @@ -74,8 +74,8 @@ TEST_CASE("Test JSON serialization of a RecordField") { } { minifi::core::RecordObject obj; - obj.emplace("key1", std::make_unique<minifi::core::RecordField>(1)); - obj.emplace("key2", std::make_unique<minifi::core::RecordField>(std::string("hello"))); + obj.emplace("key1", minifi::core::RecordField(1)); + obj.emplace("key2", core::RecordField(std::string("hello"))); minifi::core::RecordField field{std::move(obj)}; rapidjson::Document doc; auto value = field.toJson(doc.GetAllocator()); @@ -114,11 +114,11 @@ TEST_CASE("Test JSON serialization of a Record") { record.emplace("key6", minifi::core::RecordField{std::move(arr)}); minifi::core::RecordObject subobj; - subobj.emplace("subkey3", std::make_unique<minifi::core::RecordField>(1)); - subobj.emplace("subkey4", std::make_unique<minifi::core::RecordField>(std::string("subhello"))); + subobj.emplace("subkey3", core::RecordField(1)); + subobj.emplace("subkey4", core::RecordField(std::string("subhello"))); minifi::core::RecordObject obj; - obj.emplace("subkey1", std::make_unique<minifi::core::RecordField>(-2)); - obj.emplace("subkey2", std::make_unique<minifi::core::RecordField>(std::move(subobj))); + obj.emplace("subkey1", core::RecordField(-2)); + obj.emplace("subkey2", core::RecordField(std::move(subobj))); record.emplace("key7", minifi::core::RecordField{std::move(obj)}); rapidjson::Document doc = record.toJson(); @@ -160,8 +160,8 @@ TEST_CASE("Test Record deserialization from JSON") { CHECK(record.at("time_point") == minifi::core::RecordField{test_time}); minifi::core::RecordObject subobj; - subobj.emplace("number2", std::make_unique<minifi::core::RecordField>(2)); - subobj.emplace("message", std::make_unique<minifi::core::RecordField>(std::string("mymessage"))); + subobj.emplace("number2", core::RecordField(2)); + subobj.emplace("message", core::RecordField(std::string("mymessage"))); minifi::core::RecordField obj_field{std::move(subobj)}; CHECK(record.at("obj") == obj_field); diff --git a/libminifi/src/core/RecordField.cpp b/libminifi/src/core/RecordField.cpp index bbc6c2a40..8c88ff8c6 100644 --- a/libminifi/src/core/RecordField.cpp +++ b/libminifi/src/core/RecordField.cpp @@ -51,11 +51,11 @@ rapidjson::Value RecordField::toJson(rapidjson::Document::AllocatorType& allocat }, [&value, &allocator](const RecordObject& obj) { value.SetObject(); - for (const auto& [key, boxed_field] : obj) { + for (const auto& [key, field] : obj) { rapidjson::Value keyValue; keyValue.SetString(key.c_str(), allocator); - rapidjson::Value fieldValue = boxed_field.field->toJson(allocator); + rapidjson::Value fieldValue = field.toJson(allocator); value.AddMember(keyValue, fieldValue, allocator); } } @@ -88,7 +88,7 @@ RecordField RecordField::fromJson(const rapidjson::Value& value) { } else if (value.IsObject()) { RecordObject obj; for (const auto& member : value.GetObject()) { - obj.emplace(member.name.GetString(), BoxedRecordField{std::make_unique<RecordField>(RecordField::fromJson(member.value))}); + obj.emplace(member.name.GetString(), RecordField::fromJson(member.value)); } return RecordField{std::move(obj)}; } else { diff --git a/libminifi/test/libtest/unit/TestRecord.h b/libminifi/test/libtest/unit/TestRecord.h index 9b88f347f..64dcd7de6 100644 --- a/libminifi/test/libtest/unit/TestRecord.h +++ b/libminifi/test/libtest/unit/TestRecord.h @@ -44,9 +44,9 @@ inline Record createSampleRecord2(const bool stringify = false) { qux.emplace_back('z'); RecordObject quux; - quux["Apfel"] = BoxedRecordField{std::make_unique<RecordField>(std::string{"pomme"})}; - quux["Birne"] = BoxedRecordField{std::make_unique<RecordField>(std::string{"poire"})}; - quux["Aprikose"] = BoxedRecordField{std::make_unique<RecordField>(std::string{"abricot"})}; + quux.emplace("Apfel", RecordField(std::string{"pomme"})); + quux.emplace("Birne", RecordField(std::string{"poire"})); + quux.emplace("Aprikose", RecordField(std::string{"abricot"})); RecordArray corge; corge.emplace_back(false); @@ -78,9 +78,9 @@ inline Record createSampleRecord(const bool stringify = false) { qux.emplace_back('b'); qux.emplace_back('c'); RecordObject quux; - quux["Apfel"] = BoxedRecordField{std::make_unique<RecordField>(std::string{"apple"})}; - quux["Birne"] = BoxedRecordField{std::make_unique<RecordField>(std::string{"pear"})}; - quux["Aprikose"] = BoxedRecordField{std::make_unique<RecordField>(std::string{"apricot"})}; + quux.emplace("Apfel", RecordField(std::string{"apple"})); + quux.emplace("Birne", RecordField(std::string{"pear"})); + quux.emplace("Aprikose", RecordField(std::string{"apricot"})); RecordArray corge; corge.emplace_back(true); diff --git a/libminifi/test/unit/NetUtilsTest.cpp b/libminifi/test/unit/NetUtilsTest.cpp index fe6fbc848..cc7a94dc2 100644 --- a/libminifi/test/unit/NetUtilsTest.cpp +++ b/libminifi/test/unit/NetUtilsTest.cpp @@ -33,13 +33,13 @@ TEST_CASE("net::reverseDnsLookup", "[net][dns][reverseDnsLookup]") { SECTION("dns.google IPv6") { if (minifi::test::utils::isIPv6Disabled()) SKIP("IPv6 is disabled"); - auto dns_google_hostname = net::reverseDnsLookup(asio::ip::address::from_string("2001:4860:4860::8888")); + auto dns_google_hostname = net::reverseDnsLookup(asio::ip::make_address("2001:4860:4860::8888")); REQUIRE(dns_google_hostname.has_value()); CHECK(dns_google_hostname == "dns.google"); } SECTION("dns.google IPv4") { - auto dns_google_hostname = net::reverseDnsLookup(asio::ip::address::from_string("8.8.8.8")); + auto dns_google_hostname = net::reverseDnsLookup(asio::ip::make_address("8.8.8.8")); REQUIRE(dns_google_hostname.has_value()); CHECK(dns_google_hostname == "dns.google"); } @@ -47,13 +47,13 @@ TEST_CASE("net::reverseDnsLookup", "[net][dns][reverseDnsLookup]") { SECTION("Unresolvable address IPv6") { if (minifi::test::utils::isIPv6Disabled()) SKIP("IPv6 is disabled"); - auto unresolvable_hostname = net::reverseDnsLookup(asio::ip::address::from_string("2001:db8::")); + auto unresolvable_hostname = net::reverseDnsLookup(asio::ip::make_address("2001:db8::")); REQUIRE(unresolvable_hostname.has_value()); CHECK(unresolvable_hostname == "2001:db8::"); } SECTION("Unresolvable address IPv4") { - auto unresolvable_hostname = net::reverseDnsLookup(asio::ip::address::from_string("192.0.2.0")); + auto unresolvable_hostname = net::reverseDnsLookup(asio::ip::make_address("192.0.2.0")); REQUIRE(unresolvable_hostname.has_value()); CHECK(unresolvable_hostname == "192.0.2.0"); } diff --git a/minifi-api/include/minifi-cpp/core/RecordField.h b/minifi-api/include/minifi-cpp/core/RecordField.h index ca6bf2e71..bde55b695 100644 --- a/minifi-api/include/minifi-cpp/core/RecordField.h +++ b/minifi-api/include/minifi-cpp/core/RecordField.h @@ -30,26 +30,8 @@ namespace org::apache::nifi::minifi::core { struct RecordField; -struct BoxedRecordField { - BoxedRecordField() = default; - BoxedRecordField(const BoxedRecordField&) = delete; - BoxedRecordField(BoxedRecordField&& rhs) noexcept : field(std::move(rhs.field)) {} - BoxedRecordField& operator=(const BoxedRecordField&) = delete; - BoxedRecordField& operator=(BoxedRecordField&& rhs) noexcept { - field = std::move(rhs.field); - return *this; - }; - ~BoxedRecordField() = default; - - explicit BoxedRecordField(std::unique_ptr<RecordField>&& _field) : field(std::move(_field)) {} - bool operator==(const BoxedRecordField&) const; - std::unique_ptr<RecordField> field = nullptr; -}; - - using RecordArray = std::vector<RecordField>; - -using RecordObject = std::unordered_map<std::string, BoxedRecordField>; +using RecordObject = std::unordered_map<std::string, RecordField>; template<typename T> concept Float = std::is_floating_point_v<T>; @@ -87,10 +69,4 @@ struct RecordField { std::variant<std::string, int64_t, uint64_t, double, bool, std::chrono::system_clock::time_point, RecordArray, RecordObject> value_; }; -inline bool BoxedRecordField::operator==(const BoxedRecordField& rhs) const { - if (!field || !rhs.field) - return field == rhs.field; - return *field == *rhs.field; -} - } // namespace org::apache::nifi::minifi::core diff --git a/thirdparty/couchbase/c++23_fixes.patch b/thirdparty/couchbase/c++23_fixes.patch new file mode 100644 index 000000000..b434483dc --- /dev/null +++ b/thirdparty/couchbase/c++23_fixes.patch @@ -0,0 +1,78 @@ +Subject: [PATCH] c++23 fixes +--- +Index: couchbase/numeric_range_facet_result.hxx +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/couchbase/numeric_range_facet_result.hxx b/couchbase/numeric_range_facet_result.hxx +--- a/couchbase/numeric_range_facet_result.hxx (revision 51f4775e56fb9ba975f92d7791d4d9feca336f05) ++++ b/couchbase/numeric_range_facet_result.hxx (date 1747839882124) +@@ -19,15 +19,13 @@ + + #include <couchbase/search_facet_result.hxx> + #include <couchbase/search_numeric_range.hxx> ++#include <core/impl/internal_numeric_range_facet_result.hxx> + + #include <string> + #include <vector> + + namespace couchbase + { +-#ifndef COUCHBASE_CXX_CLIENT_DOXYGEN +-class internal_numeric_range_facet_result; +-#endif + + /** + * @since 1.0.0 +Index: couchbase/term_facet_result.hxx +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/couchbase/term_facet_result.hxx b/couchbase/term_facet_result.hxx +--- a/couchbase/term_facet_result.hxx (revision 51f4775e56fb9ba975f92d7791d4d9feca336f05) ++++ b/couchbase/term_facet_result.hxx (date 1747839742361) +@@ -19,15 +19,13 @@ + + #include <couchbase/search_facet_result.hxx> + #include <couchbase/search_term_range.hxx> ++#include <core/impl/internal_term_facet_result.hxx> + + #include <string> + #include <vector> + + namespace couchbase + { +-#ifndef COUCHBASE_CXX_CLIENT_DOXYGEN +-class internal_term_facet_result; +-#endif + + /** + * @since 1.0.0 +Index: couchbase/date_range_facet_result.hxx +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/couchbase/date_range_facet_result.hxx b/couchbase/date_range_facet_result.hxx +--- a/couchbase/date_range_facet_result.hxx (revision 51f4775e56fb9ba975f92d7791d4d9feca336f05) ++++ b/couchbase/date_range_facet_result.hxx (date 1747839803545) +@@ -19,6 +19,7 @@ + + #include <couchbase/search_date_range.hxx> + #include <couchbase/search_facet_result.hxx> ++#include <core/impl/internal_date_range_facet_result.hxx> + + #include <memory> + #include <string> +@@ -26,9 +27,6 @@ + + namespace couchbase + { +-#ifndef COUCHBASE_CXX_CLIENT_DOXYGEN +-class internal_date_range_facet_result; +-#endif + + /** + * @since 1.0.0 diff --git a/thirdparty/google-cloud-cpp/c++23_fixes.patch b/thirdparty/google-cloud-cpp/c++23_fixes.patch new file mode 100644 index 000000000..317d6aa71 --- /dev/null +++ b/thirdparty/google-cloud-cpp/c++23_fixes.patch @@ -0,0 +1,19 @@ +Subject: [PATCH] c++23 fixes +--- +Index: google/cloud/internal/rest_parse_json_error.cc +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/google/cloud/internal/rest_parse_json_error.cc b/google/cloud/internal/rest_parse_json_error.cc +--- a/google/cloud/internal/rest_parse_json_error.cc (revision 0c5fa4fc7f5377b420a0ec5725f334e597d841a5) ++++ b/google/cloud/internal/rest_parse_json_error.cc (date 1747841684609) +@@ -62,7 +62,7 @@ + if (m != v.end() && m->is_object()) { + for (auto const& i : m->items()) { + if (!i.value().is_string()) continue; +- metadata[i.key()] = i.value(); ++ metadata[i.key()] = i.value().get<std::string>(); + } + } + metadata["http_status_code"] = std::to_string(http_status_code); diff --git a/thirdparty/opencv/c++23_fixes.patch b/thirdparty/opencv/c++23_fixes.patch new file mode 100644 index 000000000..93052b20f --- /dev/null +++ b/thirdparty/opencv/c++23_fixes.patch @@ -0,0 +1,23 @@ +Subject: [PATCH] c++23 fixes +--- +Index: modules/gapi/src/compiler/gislandmodel.hpp +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/modules/gapi/src/compiler/gislandmodel.hpp b/modules/gapi/src/compiler/gislandmodel.hpp +--- a/modules/gapi/src/compiler/gislandmodel.hpp (revision 725e440d278aca07d35a5e8963ef990572b07316) ++++ b/modules/gapi/src/compiler/gislandmodel.hpp (date 1747828665540) +@@ -8,8 +8,11 @@ + #ifndef OPENCV_GAPI_GISLANDMODEL_HPP + #define OPENCV_GAPI_GISLANDMODEL_HPP + +-#include <unordered_set> ++#include <unordered_set> // unordered_map + #include <memory> // shared_ptr ++#include <exception> // exception_ptr ++#include <string> // string ++#include <cstddef> // size_t + + #include <ade/graph.hpp> + #include <ade/typed_graph.hpp> diff --git a/thirdparty/rocksdb/all/patches/arm7.patch b/thirdparty/rocksdb/all/patches/arm7.patch deleted file mode 100644 index 1eb64ed5c..000000000 --- a/thirdparty/rocksdb/all/patches/arm7.patch +++ /dev/null @@ -1,15 +0,0 @@ -diff --git a/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h b/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h -index 225e3fa72..cd5f935f1 100644 ---- a/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h -+++ b/utilities/transactions/lock/range/range_tree/lib/portability/toku_time.h -@@ -131,6 +131,10 @@ static inline tokutime_t toku_time_now(void) { - uint64_t result; - __asm __volatile__("mrs %[rt], cntvct_el0" : [ rt ] "=r"(result)); - return result; -+#elif defined(__arm__) -+ uint32_t lo, hi; -+ __asm __volatile__("mrrc p15, 1, %[lo], %[hi], c14" : [ lo ] "=r" (lo), [hi] "=r" (hi)); -+ return (uint64_t)hi << 32 | lo; - #elif defined(__powerpc__) - return __ppc_get_timebase(); - #elif defined(__s390x__) diff --git a/thirdparty/rocksdb/all/patches/c++23_fixes.patch b/thirdparty/rocksdb/all/patches/c++23_fixes.patch new file mode 100644 index 000000000..a3349c8fc --- /dev/null +++ b/thirdparty/rocksdb/all/patches/c++23_fixes.patch @@ -0,0 +1,641 @@ +Subject: [PATCH] C++23 fixes +--- +Index: table/block_based/block_based_table_builder.cc +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc +--- a/table/block_based/block_based_table_builder.cc (revision 4b2122578e475cb88aef4dcf152cccd5dbf51060) ++++ b/table/block_based/block_based_table_builder.cc (date 1747822597282) +@@ -268,6 +268,315 @@ + bool decoupled_partitioned_filters_; + }; + ++struct BlockBasedTableBuilder::ParallelCompressionRep { ++ // TODO: consider replacing with autovector or similar ++ // Keys is a wrapper of vector of strings avoiding ++ // releasing string memories during vector clear() ++ // in order to save memory allocation overhead ++ class Keys { ++ public: ++ Keys() : keys_(kKeysInitSize), size_(0) {} ++ void PushBack(const Slice& key) { ++ if (size_ == keys_.size()) { ++ keys_.emplace_back(key.data(), key.size()); ++ } else { ++ keys_[size_].assign(key.data(), key.size()); ++ } ++ size_++; ++ } ++ void SwapAssign(std::vector<std::string>& keys) { ++ size_ = keys.size(); ++ std::swap(keys_, keys); ++ } ++ void Clear() { size_ = 0; } ++ size_t Size() { return size_; } ++ std::string& Back() { return keys_[size_ - 1]; } ++ std::string& operator[](size_t idx) { ++ assert(idx < size_); ++ return keys_[idx]; ++ } ++ ++ private: ++ const size_t kKeysInitSize = 32; ++ std::vector<std::string> keys_; ++ size_t size_; ++ }; ++ std::unique_ptr<Keys> curr_block_keys; ++ ++ class BlockRepSlot; ++ ++ // BlockRep instances are fetched from and recycled to ++ // block_rep_pool during parallel compression. ++ struct BlockRep { ++ Slice contents; ++ Slice compressed_contents; ++ std::unique_ptr<std::string> data; ++ std::unique_ptr<std::string> compressed_data; ++ CompressionType compression_type; ++ std::unique_ptr<std::string> first_key_in_next_block; ++ std::unique_ptr<Keys> keys; ++ std::unique_ptr<BlockRepSlot> slot; ++ Status status; ++ }; ++ // Use a vector of BlockRep as a buffer for a determined number ++ // of BlockRep structures. All data referenced by pointers in ++ // BlockRep will be freed when this vector is destructed. ++ using BlockRepBuffer = std::vector<BlockRep>; ++ BlockRepBuffer block_rep_buf; ++ // Use a thread-safe queue for concurrent access from block ++ // building thread and writer thread. ++ using BlockRepPool = WorkQueue<BlockRep*>; ++ BlockRepPool block_rep_pool; ++ ++ // Use BlockRepSlot to keep block order in write thread. ++ // slot_ will pass references to BlockRep ++ class BlockRepSlot { ++ public: ++ BlockRepSlot() : slot_(1) {} ++ template <typename T> ++ void Fill(T&& rep) { ++ slot_.push(std::forward<T>(rep)); ++ } ++ void Take(BlockRep*& rep) { slot_.pop(rep); } ++ ++ private: ++ // slot_ will pass references to BlockRep in block_rep_buf, ++ // and those references are always valid before the destruction of ++ // block_rep_buf. ++ WorkQueue<BlockRep*> slot_; ++ }; ++ ++ // Compression queue will pass references to BlockRep in block_rep_buf, ++ // and those references are always valid before the destruction of ++ // block_rep_buf. ++ using CompressQueue = WorkQueue<BlockRep*>; ++ CompressQueue compress_queue; ++ std::vector<port::Thread> compress_thread_pool; ++ ++ // Write queue will pass references to BlockRep::slot in block_rep_buf, ++ // and those references are always valid before the corresponding ++ // BlockRep::slot is destructed, which is before the destruction of ++ // block_rep_buf. ++ using WriteQueue = WorkQueue<BlockRepSlot*>; ++ WriteQueue write_queue; ++ std::unique_ptr<port::Thread> write_thread; ++ ++ // Estimate output file size when parallel compression is enabled. This is ++ // necessary because compression & flush are no longer synchronized, ++ // and BlockBasedTableBuilder::FileSize() is no longer accurate. ++ // memory_order_relaxed suffices because accurate statistics is not required. ++ class FileSizeEstimator { ++ public: ++ explicit FileSizeEstimator() ++ : uncomp_bytes_compressed(0), ++ uncomp_bytes_curr_block(0), ++ uncomp_bytes_curr_block_set(false), ++ uncomp_bytes_inflight(0), ++ blocks_inflight(0), ++ curr_compression_ratio(0), ++ estimated_file_size(0) {} ++ ++ // Estimate file size when a block is about to be emitted to ++ // compression thread ++ void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { ++ uint64_t new_uncomp_bytes_inflight = ++ uncomp_bytes_inflight.fetch_add(uncomp_block_size, ++ std::memory_order_relaxed) + ++ uncomp_block_size; ++ ++ uint64_t new_blocks_inflight = ++ blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; ++ ++ estimated_file_size.store( ++ curr_file_size + ++ static_cast<uint64_t>( ++ static_cast<double>(new_uncomp_bytes_inflight) * ++ curr_compression_ratio.load(std::memory_order_relaxed)) + ++ new_blocks_inflight * kBlockTrailerSize, ++ std::memory_order_relaxed); ++ } ++ ++ // Estimate file size when a block is already reaped from ++ // compression thread ++ void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { ++ assert(uncomp_bytes_curr_block_set); ++ ++ uint64_t new_uncomp_bytes_compressed = ++ uncomp_bytes_compressed + uncomp_bytes_curr_block; ++ assert(new_uncomp_bytes_compressed > 0); ++ ++ curr_compression_ratio.store( ++ (curr_compression_ratio.load(std::memory_order_relaxed) * ++ uncomp_bytes_compressed + ++ compressed_block_size) / ++ static_cast<double>(new_uncomp_bytes_compressed), ++ std::memory_order_relaxed); ++ uncomp_bytes_compressed = new_uncomp_bytes_compressed; ++ ++ uint64_t new_uncomp_bytes_inflight = ++ uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, ++ std::memory_order_relaxed) - ++ uncomp_bytes_curr_block; ++ ++ uint64_t new_blocks_inflight = ++ blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; ++ ++ estimated_file_size.store( ++ curr_file_size + ++ static_cast<uint64_t>( ++ static_cast<double>(new_uncomp_bytes_inflight) * ++ curr_compression_ratio.load(std::memory_order_relaxed)) + ++ new_blocks_inflight * kBlockTrailerSize, ++ std::memory_order_relaxed); ++ ++ uncomp_bytes_curr_block_set = false; ++ } ++ ++ void SetEstimatedFileSize(uint64_t size) { ++ estimated_file_size.store(size, std::memory_order_relaxed); ++ } ++ ++ uint64_t GetEstimatedFileSize() { ++ return estimated_file_size.load(std::memory_order_relaxed); ++ } ++ ++ void SetCurrBlockUncompSize(uint64_t size) { ++ uncomp_bytes_curr_block = size; ++ uncomp_bytes_curr_block_set = true; ++ } ++ ++ private: ++ // Input bytes compressed so far. ++ uint64_t uncomp_bytes_compressed; ++ // Size of current block being appended. ++ uint64_t uncomp_bytes_curr_block; ++ // Whether uncomp_bytes_curr_block has been set for next ++ // ReapBlock call. ++ bool uncomp_bytes_curr_block_set; ++ // Input bytes under compression and not appended yet. ++ std::atomic<uint64_t> uncomp_bytes_inflight; ++ // Number of blocks under compression and not appended yet. ++ std::atomic<uint64_t> blocks_inflight; ++ // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. ++ std::atomic<double> curr_compression_ratio; ++ // Estimated SST file size. ++ std::atomic<uint64_t> estimated_file_size; ++ }; ++ FileSizeEstimator file_size_estimator; ++ ++ // Facilities used for waiting first block completion. Need to Wait for ++ // the completion of first block compression and flush to get a non-zero ++ // compression ratio. ++ std::atomic<bool> first_block_processed; ++ std::condition_variable first_block_cond; ++ std::mutex first_block_mutex; ++ ++ explicit ParallelCompressionRep(uint32_t parallel_threads) ++ : curr_block_keys(new Keys()), ++ block_rep_buf(parallel_threads), ++ block_rep_pool(parallel_threads), ++ compress_queue(parallel_threads), ++ write_queue(parallel_threads), ++ first_block_processed(false) { ++ for (uint32_t i = 0; i < parallel_threads; i++) { ++ block_rep_buf[i].contents = Slice(); ++ block_rep_buf[i].compressed_contents = Slice(); ++ block_rep_buf[i].data.reset(new std::string()); ++ block_rep_buf[i].compressed_data.reset(new std::string()); ++ block_rep_buf[i].compression_type = CompressionType(); ++ block_rep_buf[i].first_key_in_next_block.reset(new std::string()); ++ block_rep_buf[i].keys.reset(new Keys()); ++ block_rep_buf[i].slot.reset(new BlockRepSlot()); ++ block_rep_buf[i].status = Status::OK(); ++ block_rep_pool.push(&block_rep_buf[i]); ++ } ++ } ++ ++ ~ParallelCompressionRep() { block_rep_pool.finish(); } ++ ++ // Make a block prepared to be emitted to compression thread ++ // Used in non-buffered mode ++ BlockRep* PrepareBlock(CompressionType compression_type, ++ const Slice* first_key_in_next_block, ++ BlockBuilder* data_block) { ++ BlockRep* block_rep = ++ PrepareBlockInternal(compression_type, first_key_in_next_block); ++ assert(block_rep != nullptr); ++ data_block->SwapAndReset(*(block_rep->data)); ++ block_rep->contents = *(block_rep->data); ++ std::swap(block_rep->keys, curr_block_keys); ++ curr_block_keys->Clear(); ++ return block_rep; ++ } ++ ++ // Used in EnterUnbuffered ++ BlockRep* PrepareBlock(CompressionType compression_type, ++ const Slice* first_key_in_next_block, ++ std::string* data_block, ++ std::vector<std::string>* keys) { ++ BlockRep* block_rep = ++ PrepareBlockInternal(compression_type, first_key_in_next_block); ++ assert(block_rep != nullptr); ++ std::swap(*(block_rep->data), *data_block); ++ block_rep->contents = *(block_rep->data); ++ block_rep->keys->SwapAssign(*keys); ++ return block_rep; ++ } ++ ++ // Emit a block to compression thread ++ void EmitBlock(BlockRep* block_rep) { ++ assert(block_rep != nullptr); ++ assert(block_rep->status.ok()); ++ if (!write_queue.push(block_rep->slot.get())) { ++ return; ++ } ++ if (!compress_queue.push(block_rep)) { ++ return; ++ } ++ ++ if (!first_block_processed.load(std::memory_order_relaxed)) { ++ std::unique_lock<std::mutex> lock(first_block_mutex); ++ first_block_cond.wait(lock, [this] { ++ return first_block_processed.load(std::memory_order_relaxed); ++ }); ++ } ++ } ++ ++ // Reap a block from compression thread ++ void ReapBlock(BlockRep* block_rep) { ++ assert(block_rep != nullptr); ++ block_rep->compressed_data->clear(); ++ block_rep_pool.push(block_rep); ++ ++ if (!first_block_processed.load(std::memory_order_relaxed)) { ++ std::lock_guard<std::mutex> lock(first_block_mutex); ++ first_block_processed.store(true, std::memory_order_relaxed); ++ first_block_cond.notify_one(); ++ } ++ } ++ ++ private: ++ BlockRep* PrepareBlockInternal(CompressionType compression_type, ++ const Slice* first_key_in_next_block) { ++ BlockRep* block_rep = nullptr; ++ block_rep_pool.pop(block_rep); ++ assert(block_rep != nullptr); ++ ++ assert(block_rep->data); ++ ++ block_rep->compression_type = compression_type; ++ ++ if (first_key_in_next_block == nullptr) { ++ block_rep->first_key_in_next_block.reset(nullptr); ++ } else { ++ block_rep->first_key_in_next_block->assign( ++ first_key_in_next_block->data(), first_key_in_next_block->size()); ++ } ++ ++ return block_rep; ++ } ++}; ++ + struct BlockBasedTableBuilder::Rep { + const ImmutableOptions ioptions; + // BEGIN from MutableCFOptions +@@ -667,314 +976,6 @@ + IOStatus io_status; + }; + +-struct BlockBasedTableBuilder::ParallelCompressionRep { +- // TODO: consider replacing with autovector or similar +- // Keys is a wrapper of vector of strings avoiding +- // releasing string memories during vector clear() +- // in order to save memory allocation overhead +- class Keys { +- public: +- Keys() : keys_(kKeysInitSize), size_(0) {} +- void PushBack(const Slice& key) { +- if (size_ == keys_.size()) { +- keys_.emplace_back(key.data(), key.size()); +- } else { +- keys_[size_].assign(key.data(), key.size()); +- } +- size_++; +- } +- void SwapAssign(std::vector<std::string>& keys) { +- size_ = keys.size(); +- std::swap(keys_, keys); +- } +- void Clear() { size_ = 0; } +- size_t Size() { return size_; } +- std::string& Back() { return keys_[size_ - 1]; } +- std::string& operator[](size_t idx) { +- assert(idx < size_); +- return keys_[idx]; +- } +- +- private: +- const size_t kKeysInitSize = 32; +- std::vector<std::string> keys_; +- size_t size_; +- }; +- std::unique_ptr<Keys> curr_block_keys; +- +- class BlockRepSlot; +- +- // BlockRep instances are fetched from and recycled to +- // block_rep_pool during parallel compression. +- struct BlockRep { +- Slice contents; +- Slice compressed_contents; +- std::unique_ptr<std::string> data; +- std::unique_ptr<std::string> compressed_data; +- CompressionType compression_type; +- std::unique_ptr<std::string> first_key_in_next_block; +- std::unique_ptr<Keys> keys; +- std::unique_ptr<BlockRepSlot> slot; +- Status status; +- }; +- // Use a vector of BlockRep as a buffer for a determined number +- // of BlockRep structures. All data referenced by pointers in +- // BlockRep will be freed when this vector is destructed. +- using BlockRepBuffer = std::vector<BlockRep>; +- BlockRepBuffer block_rep_buf; +- // Use a thread-safe queue for concurrent access from block +- // building thread and writer thread. +- using BlockRepPool = WorkQueue<BlockRep*>; +- BlockRepPool block_rep_pool; +- +- // Use BlockRepSlot to keep block order in write thread. +- // slot_ will pass references to BlockRep +- class BlockRepSlot { +- public: +- BlockRepSlot() : slot_(1) {} +- template <typename T> +- void Fill(T&& rep) { +- slot_.push(std::forward<T>(rep)); +- } +- void Take(BlockRep*& rep) { slot_.pop(rep); } +- +- private: +- // slot_ will pass references to BlockRep in block_rep_buf, +- // and those references are always valid before the destruction of +- // block_rep_buf. +- WorkQueue<BlockRep*> slot_; +- }; +- +- // Compression queue will pass references to BlockRep in block_rep_buf, +- // and those references are always valid before the destruction of +- // block_rep_buf. +- using CompressQueue = WorkQueue<BlockRep*>; +- CompressQueue compress_queue; +- std::vector<port::Thread> compress_thread_pool; +- +- // Write queue will pass references to BlockRep::slot in block_rep_buf, +- // and those references are always valid before the corresponding +- // BlockRep::slot is destructed, which is before the destruction of +- // block_rep_buf. +- using WriteQueue = WorkQueue<BlockRepSlot*>; +- WriteQueue write_queue; +- std::unique_ptr<port::Thread> write_thread; +- +- // Estimate output file size when parallel compression is enabled. This is +- // necessary because compression & flush are no longer synchronized, +- // and BlockBasedTableBuilder::FileSize() is no longer accurate. +- // memory_order_relaxed suffices because accurate statistics is not required. +- class FileSizeEstimator { +- public: +- explicit FileSizeEstimator() +- : uncomp_bytes_compressed(0), +- uncomp_bytes_curr_block(0), +- uncomp_bytes_curr_block_set(false), +- uncomp_bytes_inflight(0), +- blocks_inflight(0), +- curr_compression_ratio(0), +- estimated_file_size(0) {} +- +- // Estimate file size when a block is about to be emitted to +- // compression thread +- void EmitBlock(uint64_t uncomp_block_size, uint64_t curr_file_size) { +- uint64_t new_uncomp_bytes_inflight = +- uncomp_bytes_inflight.fetch_add(uncomp_block_size, +- std::memory_order_relaxed) + +- uncomp_block_size; +- +- uint64_t new_blocks_inflight = +- blocks_inflight.fetch_add(1, std::memory_order_relaxed) + 1; +- +- estimated_file_size.store( +- curr_file_size + +- static_cast<uint64_t>( +- static_cast<double>(new_uncomp_bytes_inflight) * +- curr_compression_ratio.load(std::memory_order_relaxed)) + +- new_blocks_inflight * kBlockTrailerSize, +- std::memory_order_relaxed); +- } +- +- // Estimate file size when a block is already reaped from +- // compression thread +- void ReapBlock(uint64_t compressed_block_size, uint64_t curr_file_size) { +- assert(uncomp_bytes_curr_block_set); +- +- uint64_t new_uncomp_bytes_compressed = +- uncomp_bytes_compressed + uncomp_bytes_curr_block; +- assert(new_uncomp_bytes_compressed > 0); +- +- curr_compression_ratio.store( +- (curr_compression_ratio.load(std::memory_order_relaxed) * +- uncomp_bytes_compressed + +- compressed_block_size) / +- static_cast<double>(new_uncomp_bytes_compressed), +- std::memory_order_relaxed); +- uncomp_bytes_compressed = new_uncomp_bytes_compressed; +- +- uint64_t new_uncomp_bytes_inflight = +- uncomp_bytes_inflight.fetch_sub(uncomp_bytes_curr_block, +- std::memory_order_relaxed) - +- uncomp_bytes_curr_block; +- +- uint64_t new_blocks_inflight = +- blocks_inflight.fetch_sub(1, std::memory_order_relaxed) - 1; +- +- estimated_file_size.store( +- curr_file_size + +- static_cast<uint64_t>( +- static_cast<double>(new_uncomp_bytes_inflight) * +- curr_compression_ratio.load(std::memory_order_relaxed)) + +- new_blocks_inflight * kBlockTrailerSize, +- std::memory_order_relaxed); +- +- uncomp_bytes_curr_block_set = false; +- } +- +- void SetEstimatedFileSize(uint64_t size) { +- estimated_file_size.store(size, std::memory_order_relaxed); +- } +- +- uint64_t GetEstimatedFileSize() { +- return estimated_file_size.load(std::memory_order_relaxed); +- } +- +- void SetCurrBlockUncompSize(uint64_t size) { +- uncomp_bytes_curr_block = size; +- uncomp_bytes_curr_block_set = true; +- } +- +- private: +- // Input bytes compressed so far. +- uint64_t uncomp_bytes_compressed; +- // Size of current block being appended. +- uint64_t uncomp_bytes_curr_block; +- // Whether uncomp_bytes_curr_block has been set for next +- // ReapBlock call. +- bool uncomp_bytes_curr_block_set; +- // Input bytes under compression and not appended yet. +- std::atomic<uint64_t> uncomp_bytes_inflight; +- // Number of blocks under compression and not appended yet. +- std::atomic<uint64_t> blocks_inflight; +- // Current compression ratio, maintained by BGWorkWriteMaybeCompressedBlock. +- std::atomic<double> curr_compression_ratio; +- // Estimated SST file size. +- std::atomic<uint64_t> estimated_file_size; +- }; +- FileSizeEstimator file_size_estimator; +- +- // Facilities used for waiting first block completion. Need to Wait for +- // the completion of first block compression and flush to get a non-zero +- // compression ratio. +- std::atomic<bool> first_block_processed; +- std::condition_variable first_block_cond; +- std::mutex first_block_mutex; +- +- explicit ParallelCompressionRep(uint32_t parallel_threads) +- : curr_block_keys(new Keys()), +- block_rep_buf(parallel_threads), +- block_rep_pool(parallel_threads), +- compress_queue(parallel_threads), +- write_queue(parallel_threads), +- first_block_processed(false) { +- for (uint32_t i = 0; i < parallel_threads; i++) { +- block_rep_buf[i].contents = Slice(); +- block_rep_buf[i].compressed_contents = Slice(); +- block_rep_buf[i].data.reset(new std::string()); +- block_rep_buf[i].compressed_data.reset(new std::string()); +- block_rep_buf[i].compression_type = CompressionType(); +- block_rep_buf[i].first_key_in_next_block.reset(new std::string()); +- block_rep_buf[i].keys.reset(new Keys()); +- block_rep_buf[i].slot.reset(new BlockRepSlot()); +- block_rep_buf[i].status = Status::OK(); +- block_rep_pool.push(&block_rep_buf[i]); +- } +- } +- +- ~ParallelCompressionRep() { block_rep_pool.finish(); } +- +- // Make a block prepared to be emitted to compression thread +- // Used in non-buffered mode +- BlockRep* PrepareBlock(CompressionType compression_type, +- const Slice* first_key_in_next_block, +- BlockBuilder* data_block) { +- BlockRep* block_rep = +- PrepareBlockInternal(compression_type, first_key_in_next_block); +- assert(block_rep != nullptr); +- data_block->SwapAndReset(*(block_rep->data)); +- block_rep->contents = *(block_rep->data); +- std::swap(block_rep->keys, curr_block_keys); +- curr_block_keys->Clear(); +- return block_rep; +- } +- +- // Used in EnterUnbuffered +- BlockRep* PrepareBlock(CompressionType compression_type, +- const Slice* first_key_in_next_block, +- std::string* data_block, +- std::vector<std::string>* keys) { +- BlockRep* block_rep = +- PrepareBlockInternal(compression_type, first_key_in_next_block); +- assert(block_rep != nullptr); +- std::swap(*(block_rep->data), *data_block); +- block_rep->contents = *(block_rep->data); +- block_rep->keys->SwapAssign(*keys); +- return block_rep; +- } +- +- // Emit a block to compression thread +- void EmitBlock(BlockRep* block_rep) { +- assert(block_rep != nullptr); +- assert(block_rep->status.ok()); +- if (!write_queue.push(block_rep->slot.get())) { +- return; +- } +- if (!compress_queue.push(block_rep)) { +- return; +- } +- +- if (!first_block_processed.load(std::memory_order_relaxed)) { +- std::unique_lock<std::mutex> lock(first_block_mutex); +- first_block_cond.wait(lock, [this] { +- return first_block_processed.load(std::memory_order_relaxed); +- }); +- } +- } +- +- // Reap a block from compression thread +- void ReapBlock(BlockRep* block_rep) { +- assert(block_rep != nullptr); +- block_rep->compressed_data->clear(); +- block_rep_pool.push(block_rep); +- +- if (!first_block_processed.load(std::memory_order_relaxed)) { +- std::lock_guard<std::mutex> lock(first_block_mutex); +- first_block_processed.store(true, std::memory_order_relaxed); +- first_block_cond.notify_one(); +- } +- } +- +- private: +- BlockRep* PrepareBlockInternal(CompressionType compression_type, +- const Slice* first_key_in_next_block) { +- BlockRep* block_rep = nullptr; +- block_rep_pool.pop(block_rep); +- assert(block_rep != nullptr); +- +- assert(block_rep->data); +- +- block_rep->compression_type = compression_type; +- +- if (first_key_in_next_block == nullptr) { +- block_rep->first_key_in_next_block.reset(nullptr); +- } else { +- block_rep->first_key_in_next_block->assign( +- first_key_in_next_block->data(), first_key_in_next_block->size()); +- } +- +- return block_rep; +- } +-}; + + BlockBasedTableBuilder::BlockBasedTableBuilder( + const BlockBasedTableOptions& table_options, const TableBuilderOptions& tbo, diff --git a/thirdparty/rocksdb/all/patches/cstdint.patch b/thirdparty/rocksdb/all/patches/cstdint.patch deleted file mode 100644 index 2435905c9..000000000 --- a/thirdparty/rocksdb/all/patches/cstdint.patch +++ /dev/null @@ -1,12 +0,0 @@ -diff --git a/options/offpeak_time_info.h b/options/offpeak_time_info.h -index 75d61abb4..f42ef6dc2 100644 ---- a/options/offpeak_time_info.h -+++ b/options/offpeak_time_info.h -@@ -5,6 +5,7 @@ - - #pragma once - -+#include <cstdint> - #include <string> - - #include "rocksdb/rocksdb_namespace.h" diff --git a/utils/include/utils/expected.h b/utils/include/utils/expected.h index aff2c0a05..fb0f3457e 100644 --- a/utils/include/utils/expected.h +++ b/utils/include/utils/expected.h @@ -239,26 +239,28 @@ auto try_expression(F&& action, Args&&... args) noexcept { } // namespace org::apache::nifi::minifi::utils -#ifndef WIN32 // on windows this conflicts because nonstd::expected === std::expected +template <typename T, typename E> +concept HasStdExpected = requires { typename std::expected<T, E>; }; + +template <typename T, typename E> +concept ExpectedTypesDoNotConflict = + (!HasStdExpected<T, E> || + !std::same_as<nonstd::expected<T, E>, std::expected<T, E>>); + // based on fmt::formatter<std::expected<T, E>, Char> template <typename T, typename E, typename Char> -struct fmt::formatter<nonstd::expected<T, E>, Char, - std::enable_if_t<(std::is_void<T>::value || - fmt::is_formattable<T, Char>::value) && - fmt::is_formattable<E, Char>::value>> { - template <typename ParseContext> - constexpr auto parse(ParseContext& ctx) -> const Char* { - return ctx.begin(); - } +requires ExpectedTypesDoNotConflict<T, E> && + (std::is_void_v<T> || fmt::is_formattable<T, Char>::value) && + fmt::is_formattable<E, Char>::value +struct fmt::formatter<nonstd::expected<T, E>, Char> { + constexpr auto parse(auto& ctx) { return ctx.begin(); } - template <typename FormatContext> - auto format(const nonstd::expected<T, E>& value, FormatContext& ctx) const - -> decltype(ctx.out()) { + auto format(const nonstd::expected<T, E>& value, auto& ctx) const { auto out = ctx.out(); if (value.has_value()) { out = fmt::detail::write<Char>(out, "nonstd::expected("); - if constexpr (!std::is_void<T>::value) + if constexpr (!std::is_void_v<T>) out = fmt::detail::write_escaped_alternative<Char>(out, *value); } else { out = fmt::detail::write<Char>(out, "nonstd::unexpected("); @@ -268,4 +270,3 @@ struct fmt::formatter<nonstd::expected<T, E>, Char, return out; } }; -#endif diff --git a/utils/include/utils/net/AsioCoro.h b/utils/include/utils/net/AsioCoro.h index 55a3a4cbc..b18087268 100644 --- a/utils/include/utils/net/AsioCoro.h +++ b/utils/include/utils/net/AsioCoro.h @@ -29,11 +29,11 @@ #include "asio/this_coro.hpp" #include "asio/use_awaitable.hpp" #include "asio/experimental/awaitable_operators.hpp" -#include "asio/experimental/as_tuple.hpp" +#include "asio/as_tuple.hpp" namespace org::apache::nifi::minifi::utils::net { -constexpr auto use_nothrow_awaitable = asio::experimental::as_tuple(asio::use_awaitable); +constexpr auto use_nothrow_awaitable = asio::as_tuple(asio::use_awaitable); #if defined(__GNUC__) && __GNUC__ < 11 // [coroutines] unexpected 'warning: statement has no effect [-Wunused-value]' diff --git a/utils/include/utils/net/AsioSocketUtils.h b/utils/include/utils/net/AsioSocketUtils.h index f8df7e959..990361163 100644 --- a/utils/include/utils/net/AsioSocketUtils.h +++ b/utils/include/utils/net/AsioSocketUtils.h @@ -142,7 +142,7 @@ class AsioSocketConnection : public io::BaseStreamImpl { return; } - asio::ip::tcp::endpoint local_endpoint(asio::ip::address::from_string(address), 0); + asio::ip::tcp::endpoint local_endpoint(asio::ip::make_address(address), 0); asio::error_code err; socket.open(local_endpoint.protocol(), err); if (err) { diff --git a/utils/src/utils/net/DNS.cpp b/utils/src/utils/net/DNS.cpp index 7af353124..6e0ebde23 100644 --- a/utils/src/utils/net/DNS.cpp +++ b/utils/src/utils/net/DNS.cpp @@ -27,7 +27,7 @@ namespace org::apache::nifi::minifi::utils::net { nonstd::expected<asio::ip::address, std::error_code> addressFromString(const std::string_view ip_address_str) { std::error_code ip_address_from_string_error; - auto ip_address = asio::ip::address::from_string(ip_address_str.data(), ip_address_from_string_error); + auto ip_address = asio::ip::make_address(ip_address_str.data(), ip_address_from_string_error); if (ip_address_from_string_error) return nonstd::make_unexpected(ip_address_from_string_error); return ip_address; @@ -55,7 +55,10 @@ nonstd::expected<std::string, std::error_code> reverseDnsLookup(const asio::ip:: if (resolve_error) return nonstd::make_unexpected(resolve_error); - return results->host_name(); + if (!results.empty()) { + return results.begin()->host_name(); + } + return nonstd::make_unexpected(std::make_error_code(std::errc::host_unreachable)); } std::string getMyHostName() {
