This is an automated email from the ASF dual-hosted git repository. adebreceni pushed a commit to branch minifi-api in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 7949b987d0914d22db515f2d679e6636a15196a9 Author: Adam Debreceni <[email protected]> AuthorDate: Mon Jan 6 14:20:33 2025 +0100 Rebase fixes --- extensions/civetweb/processors/ListenHTTP.cpp | 6 +- .../couchbase/tests/GetCouchbaseKeyTests.cpp | 3 +- .../couchbase/tests/PutCouchbaseKeyTests.cpp | 3 +- extensions/lua/LuaScriptProcessContext.h | 1 + libminifi/src/utils/TimeUtil.cpp | 90 ---------------------- .../integration/OnScheduleErrorHandlingTests.cpp | 2 +- utils/src/utils/TimeUtil.cpp | 56 ++++++++++++++ 7 files changed, 65 insertions(+), 96 deletions(-) diff --git a/extensions/civetweb/processors/ListenHTTP.cpp b/extensions/civetweb/processors/ListenHTTP.cpp index b50e78b57..1e12f72b1 100644 --- a/extensions/civetweb/processors/ListenHTTP.cpp +++ b/extensions/civetweb/processors/ListenHTTP.cpp @@ -254,7 +254,7 @@ bool ListenHTTP::processRequestBuffer(core::ProcessSession& session) { namespace { -class MgConnectionInputStream : public io::InputStream { +class MgConnectionInputStream : public io::InputStreamImpl { public: MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t> size): conn_(conn), netstream_size_limit_(size) {} @@ -275,7 +275,7 @@ class MgConnectionInputStream : public io::InputStream { std::optional<size_t> netstream_size_limit_; // how much can we read from conn_ }; -class MgConnectionOutputStream : public io::OutputStream { +class MgConnectionOutputStream : public io::StreamImpl, public virtual io::OutputStreamImpl { public: explicit MgConnectionOutputStream(struct mg_connection* conn): conn_(conn) {} @@ -548,7 +548,7 @@ void ListenHTTP::notifyStop() { } std::set<core::Connectable*> ListenHTTP::getOutGoingConnections(const std::string &relationship) { - auto result = core::Processor::getOutGoingConnections(relationship); + auto result = core::ProcessorImpl::getOutGoingConnections(relationship); if (relationship == Self.name) { result.insert(this); } diff --git a/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp index 0d2e5b910..185d99ac0 100644 --- a/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp +++ b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp @@ -46,7 +46,8 @@ class GetCouchbaseKeyTestController : public TestController { LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>(); LogTestController::getInstance().setDebug<processors::GetCouchbaseKey>(); auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService"); - mock_couchbase_cluster_service_ = std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation()); + mock_couchbase_cluster_service_ = std::dynamic_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation()); + gsl_Assert(mock_couchbase_cluster_service_); proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService"); } diff --git a/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp index 84afe362b..56970289f 100644 --- a/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp +++ b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp @@ -50,7 +50,8 @@ class PutCouchbaseKeyTestController : public TestController { LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>(); LogTestController::getInstance().setDebug<processors::PutCouchbaseKey>(); auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService"); - mock_couchbase_cluster_service_ = std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation()); + mock_couchbase_cluster_service_ = std::dynamic_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation()); + gsl_Assert(mock_couchbase_cluster_service_); proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService, "MockCouchbaseClusterService"); } diff --git a/extensions/lua/LuaScriptProcessContext.h b/extensions/lua/LuaScriptProcessContext.h index 98fa3c949..49dd86747 100644 --- a/extensions/lua/LuaScriptProcessContext.h +++ b/extensions/lua/LuaScriptProcessContext.h @@ -23,6 +23,7 @@ #include "core/ProcessSession.h" #include "LuaScriptStateManager.h" +#include "minifi-cpp/core/ProcessContext.h" namespace org::apache::nifi::minifi::extensions::lua { diff --git a/libminifi/src/utils/TimeUtil.cpp b/libminifi/src/utils/TimeUtil.cpp index a512fe2de..66fd07d8f 100644 --- a/libminifi/src/utils/TimeUtil.cpp +++ b/libminifi/src/utils/TimeUtil.cpp @@ -17,10 +17,6 @@ #include "utils/TimeUtil.h" -#include "fmt/format.h" -#include "fmt/chrono.h" -#include "range/v3/algorithm/contains.hpp" - #ifdef WIN32 #include "date/tz.h" #endif @@ -41,92 +37,6 @@ void setClock(std::shared_ptr<SteadyClock> clock) { global_clock = std::move(clock); } -namespace { -template<class... Durations> -std::tuple<Durations...> breakDownDurations(std::chrono::system_clock::duration input_duration) { - std::tuple<Durations...> result; - - ([&]<typename T>(T& duration) { - duration = std::chrono::duration_cast<std::decay_t<T>>(input_duration); - input_duration -= duration; - } (std::get<Durations>(result)), ...); - - return result; -} - -std::string formatAsDaysHoursMinutesSeconds(std::chrono::system_clock::duration input_duration) { - const auto durs = breakDownDurations<std::chrono::days, std::chrono::hours, std::chrono::minutes, std::chrono::seconds>(input_duration); - const auto& days = std::get<std::chrono::days>(durs); - std::string day_string; - if (days.count() > 0) { - day_string = fmt::format("{} {}", days.count(), days.count() == 1 ? "day, " : "days, "); - } - return fmt::format("{}{:02}:{:02}:{:02}", - day_string, std::get<std::chrono::hours>(durs).count(), - std::get<std::chrono::minutes>(durs).count(), - std::get<std::chrono::seconds>(durs).count()); -} - -template<class... Durations> -std::string formatAsRoundedLargestUnit(std::chrono::system_clock::duration input_duration) { - std::optional<std::string> rounded_value_str; - using std::chrono::duration_cast; - using std::chrono::duration; - - ((rounded_value_str = input_duration >= Durations(1) - ? std::optional<std::string>{fmt::format("{:.2}", duration_cast<duration<double, typename Durations::period>>(input_duration))} - : std::nullopt) || ...); - - - if (!rounded_value_str) { - return fmt::format("{}", input_duration); - } - - return *rounded_value_str; -} - -} // namespace - -std::string humanReadableDuration(std::chrono::system_clock::duration input_duration) { - if (input_duration > 5s) { - return formatAsDaysHoursMinutesSeconds(input_duration); - } - - return formatAsRoundedLargestUnit<std::chrono::seconds, std::chrono::milliseconds, std::chrono::microseconds>(input_duration); -} - -std::optional<std::chrono::system_clock::time_point> parseRfc3339(const std::string& str) { - std::istringstream stream(str); - date::year_month_day date_part{}; - date::from_stream(stream, "%F", date_part); - - if (stream.fail()) - return std::nullopt; - - constexpr std::string_view accepted_delimiters = "tT_ "; - char delimiter_char = 0; - stream.get(delimiter_char); - - if (stream.fail() || !ranges::contains(accepted_delimiters, delimiter_char)) - return std::nullopt; - - std::chrono::system_clock::duration time_part; - std::chrono::minutes offset = 0min; - if (str.ends_with('Z') || str.ends_with('z')) { - date::from_stream(stream, "%T", time_part); - if (stream.fail()) - return std::nullopt; - stream.get(); - } else { - date::from_stream(stream, "%T%Ez", time_part, {}, &offset); - } - - if (stream.fail() || (stream.peek() && !stream.eof())) - return std::nullopt; - - return date::sys_days(date_part) + time_part - offset; -} - #ifdef WIN32 void dateSetGlobalInstall(const std::string& install) { date::set_install(install); diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp index dc10474c4..4d80a4652 100644 --- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp +++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp @@ -17,7 +17,7 @@ */ #include "integration/IntegrationBase.h" #include "core/logging/Logger.h" -#include "core/Scheduling.h" +#include "minifi-cpp/core/Scheduling.h" #include "core/state/ProcessorController.h" #include "unit/TestBase.h" #include "../../../extensions/test-processors/KamikazeProcessor.h" diff --git a/utils/src/utils/TimeUtil.cpp b/utils/src/utils/TimeUtil.cpp index f79575bc4..67b5ecc6e 100644 --- a/utils/src/utils/TimeUtil.cpp +++ b/utils/src/utils/TimeUtil.cpp @@ -17,6 +17,8 @@ #include "utils/TimeUtil.h" #include "range/v3/algorithm/contains.hpp" +#include "fmt/format.h" +#include "fmt/chrono.h" #ifdef WIN32 #include "date/tz.h" @@ -26,6 +28,60 @@ namespace org::apache::nifi::minifi::utils::timeutils { using namespace std::literals::chrono_literals; +namespace { +template<class... Durations> +std::tuple<Durations...> breakDownDurations(std::chrono::system_clock::duration input_duration) { + std::tuple<Durations...> result; + + ([&]<typename T>(T& duration) { + duration = std::chrono::duration_cast<std::decay_t<T>>(input_duration); + input_duration -= duration; + } (std::get<Durations>(result)), ...); + + return result; +} + +std::string formatAsDaysHoursMinutesSeconds(std::chrono::system_clock::duration input_duration) { + const auto durs = breakDownDurations<std::chrono::days, std::chrono::hours, std::chrono::minutes, std::chrono::seconds>(input_duration); + const auto& days = std::get<std::chrono::days>(durs); + std::string day_string; + if (days.count() > 0) { + day_string = fmt::format("{} {}", days.count(), days.count() == 1 ? "day, " : "days, "); + } + return fmt::format("{}{:02}:{:02}:{:02}", + day_string, std::get<std::chrono::hours>(durs).count(), + std::get<std::chrono::minutes>(durs).count(), + std::get<std::chrono::seconds>(durs).count()); +} + +template<class... Durations> +std::string formatAsRoundedLargestUnit(std::chrono::system_clock::duration input_duration) { + std::optional<std::string> rounded_value_str; + using std::chrono::duration_cast; + using std::chrono::duration; + + ((rounded_value_str = input_duration >= Durations(1) + ? std::optional<std::string>{fmt::format("{:.2}", duration_cast<duration<double, typename Durations::period>>(input_duration))} + : std::nullopt) || ...); + + + if (!rounded_value_str) { + return fmt::format("{}", input_duration); + } + + return *rounded_value_str; +} + +} // namespace + +std::string humanReadableDuration(std::chrono::system_clock::duration input_duration) { + if (input_duration > 5s) { + return formatAsDaysHoursMinutesSeconds(input_duration); + } + + return formatAsRoundedLargestUnit<std::chrono::seconds, std::chrono::milliseconds, std::chrono::microseconds>(input_duration); +} + std::optional<std::chrono::system_clock::time_point> parseRfc3339(const std::string& str) { std::istringstream stream(str); date::year_month_day date_part{};
