This is an automated email from the ASF dual-hosted git repository.
adebreceni pushed a commit to branch minifi-api
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/minifi-api by this push:
new c86e24435 Rebase fixes
c86e24435 is described below
commit c86e244358d49e2f069ff930792cb3037277d331
Author: Adam Debreceni <[email protected]>
AuthorDate: Mon Jan 6 14:20:33 2025 +0100
Rebase fixes
---
extensions/civetweb/processors/ListenHTTP.cpp | 6 +-
.../couchbase/tests/GetCouchbaseKeyTests.cpp | 3 +-
.../couchbase/tests/PutCouchbaseKeyTests.cpp | 3 +-
extensions/lua/LuaScriptProcessContext.h | 1 +
libminifi/src/utils/TimeUtil.cpp | 90 ----------------------
.../integration/OnScheduleErrorHandlingTests.cpp | 2 +-
utils/src/utils/TimeUtil.cpp | 56 ++++++++++++++
7 files changed, 65 insertions(+), 96 deletions(-)
diff --git a/extensions/civetweb/processors/ListenHTTP.cpp
b/extensions/civetweb/processors/ListenHTTP.cpp
index b50e78b57..1e12f72b1 100644
--- a/extensions/civetweb/processors/ListenHTTP.cpp
+++ b/extensions/civetweb/processors/ListenHTTP.cpp
@@ -254,7 +254,7 @@ bool ListenHTTP::processRequestBuffer(core::ProcessSession&
session) {
namespace {
-class MgConnectionInputStream : public io::InputStream {
+class MgConnectionInputStream : public io::InputStreamImpl {
public:
MgConnectionInputStream(struct mg_connection* conn, std::optional<size_t>
size): conn_(conn), netstream_size_limit_(size) {}
@@ -275,7 +275,7 @@ class MgConnectionInputStream : public io::InputStream {
std::optional<size_t> netstream_size_limit_; // how much can we read from
conn_
};
-class MgConnectionOutputStream : public io::OutputStream {
+class MgConnectionOutputStream : public io::StreamImpl, public virtual
io::OutputStreamImpl {
public:
explicit MgConnectionOutputStream(struct mg_connection* conn): conn_(conn) {}
@@ -548,7 +548,7 @@ void ListenHTTP::notifyStop() {
}
std::set<core::Connectable*> ListenHTTP::getOutGoingConnections(const
std::string &relationship) {
- auto result = core::Processor::getOutGoingConnections(relationship);
+ auto result = core::ProcessorImpl::getOutGoingConnections(relationship);
if (relationship == Self.name) {
result.insert(this);
}
diff --git a/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp
b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp
index 0d2e5b910..185d99ac0 100644
--- a/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp
+++ b/extensions/couchbase/tests/GetCouchbaseKeyTests.cpp
@@ -46,7 +46,8 @@ class GetCouchbaseKeyTestController : public TestController {
LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>();
LogTestController::getInstance().setDebug<processors::GetCouchbaseKey>();
auto controller_service_node =
controller_.plan->addController("MockCouchbaseClusterService",
"MockCouchbaseClusterService");
- mock_couchbase_cluster_service_ =
std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
+ mock_couchbase_cluster_service_ =
std::dynamic_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
+ gsl_Assert(mock_couchbase_cluster_service_);
proc_->setProperty(processors::GetCouchbaseKey::CouchbaseClusterControllerService,
"MockCouchbaseClusterService");
}
diff --git a/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp
b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp
index 84afe362b..56970289f 100644
--- a/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp
+++ b/extensions/couchbase/tests/PutCouchbaseKeyTests.cpp
@@ -50,7 +50,8 @@ class PutCouchbaseKeyTestController : public TestController {
LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>();
LogTestController::getInstance().setDebug<processors::PutCouchbaseKey>();
auto controller_service_node =
controller_.plan->addController("MockCouchbaseClusterService",
"MockCouchbaseClusterService");
- mock_couchbase_cluster_service_ =
std::static_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
+ mock_couchbase_cluster_service_ =
std::dynamic_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
+ gsl_Assert(mock_couchbase_cluster_service_);
proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService,
"MockCouchbaseClusterService");
}
diff --git a/extensions/lua/LuaScriptProcessContext.h
b/extensions/lua/LuaScriptProcessContext.h
index 98fa3c949..49dd86747 100644
--- a/extensions/lua/LuaScriptProcessContext.h
+++ b/extensions/lua/LuaScriptProcessContext.h
@@ -23,6 +23,7 @@
#include "core/ProcessSession.h"
#include "LuaScriptStateManager.h"
+#include "minifi-cpp/core/ProcessContext.h"
namespace org::apache::nifi::minifi::extensions::lua {
diff --git a/libminifi/src/utils/TimeUtil.cpp b/libminifi/src/utils/TimeUtil.cpp
index a512fe2de..66fd07d8f 100644
--- a/libminifi/src/utils/TimeUtil.cpp
+++ b/libminifi/src/utils/TimeUtil.cpp
@@ -17,10 +17,6 @@
#include "utils/TimeUtil.h"
-#include "fmt/format.h"
-#include "fmt/chrono.h"
-#include "range/v3/algorithm/contains.hpp"
-
#ifdef WIN32
#include "date/tz.h"
#endif
@@ -41,92 +37,6 @@ void setClock(std::shared_ptr<SteadyClock> clock) {
global_clock = std::move(clock);
}
-namespace {
-template<class... Durations>
-std::tuple<Durations...>
breakDownDurations(std::chrono::system_clock::duration input_duration) {
- std::tuple<Durations...> result;
-
- ([&]<typename T>(T& duration) {
- duration = std::chrono::duration_cast<std::decay_t<T>>(input_duration);
- input_duration -= duration;
- } (std::get<Durations>(result)), ...);
-
- return result;
-}
-
-std::string
formatAsDaysHoursMinutesSeconds(std::chrono::system_clock::duration
input_duration) {
- const auto durs = breakDownDurations<std::chrono::days, std::chrono::hours,
std::chrono::minutes, std::chrono::seconds>(input_duration);
- const auto& days = std::get<std::chrono::days>(durs);
- std::string day_string;
- if (days.count() > 0) {
- day_string = fmt::format("{} {}", days.count(), days.count() == 1 ? "day,
" : "days, ");
- }
- return fmt::format("{}{:02}:{:02}:{:02}",
- day_string, std::get<std::chrono::hours>(durs).count(),
- std::get<std::chrono::minutes>(durs).count(),
- std::get<std::chrono::seconds>(durs).count());
-}
-
-template<class... Durations>
-std::string formatAsRoundedLargestUnit(std::chrono::system_clock::duration
input_duration) {
- std::optional<std::string> rounded_value_str;
- using std::chrono::duration_cast;
- using std::chrono::duration;
-
- ((rounded_value_str = input_duration >= Durations(1)
- ? std::optional<std::string>{fmt::format("{:.2}",
duration_cast<duration<double, typename Durations::period>>(input_duration))}
- : std::nullopt) || ...);
-
-
- if (!rounded_value_str) {
- return fmt::format("{}", input_duration);
- }
-
- return *rounded_value_str;
-}
-
-} // namespace
-
-std::string humanReadableDuration(std::chrono::system_clock::duration
input_duration) {
- if (input_duration > 5s) {
- return formatAsDaysHoursMinutesSeconds(input_duration);
- }
-
- return formatAsRoundedLargestUnit<std::chrono::seconds,
std::chrono::milliseconds, std::chrono::microseconds>(input_duration);
-}
-
-std::optional<std::chrono::system_clock::time_point> parseRfc3339(const
std::string& str) {
- std::istringstream stream(str);
- date::year_month_day date_part{};
- date::from_stream(stream, "%F", date_part);
-
- if (stream.fail())
- return std::nullopt;
-
- constexpr std::string_view accepted_delimiters = "tT_ ";
- char delimiter_char = 0;
- stream.get(delimiter_char);
-
- if (stream.fail() || !ranges::contains(accepted_delimiters, delimiter_char))
- return std::nullopt;
-
- std::chrono::system_clock::duration time_part;
- std::chrono::minutes offset = 0min;
- if (str.ends_with('Z') || str.ends_with('z')) {
- date::from_stream(stream, "%T", time_part);
- if (stream.fail())
- return std::nullopt;
- stream.get();
- } else {
- date::from_stream(stream, "%T%Ez", time_part, {}, &offset);
- }
-
- if (stream.fail() || (stream.peek() && !stream.eof()))
- return std::nullopt;
-
- return date::sys_days(date_part) + time_part - offset;
-}
-
#ifdef WIN32
void dateSetGlobalInstall(const std::string& install) {
date::set_install(install);
diff --git a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
index dc10474c4..4d80a4652 100644
--- a/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
+++ b/libminifi/test/integration/OnScheduleErrorHandlingTests.cpp
@@ -17,7 +17,7 @@
*/
#include "integration/IntegrationBase.h"
#include "core/logging/Logger.h"
-#include "core/Scheduling.h"
+#include "minifi-cpp/core/Scheduling.h"
#include "core/state/ProcessorController.h"
#include "unit/TestBase.h"
#include "../../../extensions/test-processors/KamikazeProcessor.h"
diff --git a/utils/src/utils/TimeUtil.cpp b/utils/src/utils/TimeUtil.cpp
index f79575bc4..67b5ecc6e 100644
--- a/utils/src/utils/TimeUtil.cpp
+++ b/utils/src/utils/TimeUtil.cpp
@@ -17,6 +17,8 @@
#include "utils/TimeUtil.h"
#include "range/v3/algorithm/contains.hpp"
+#include "fmt/format.h"
+#include "fmt/chrono.h"
#ifdef WIN32
#include "date/tz.h"
@@ -26,6 +28,60 @@ namespace org::apache::nifi::minifi::utils::timeutils {
using namespace std::literals::chrono_literals;
+namespace {
+template<class... Durations>
+std::tuple<Durations...>
breakDownDurations(std::chrono::system_clock::duration input_duration) {
+ std::tuple<Durations...> result;
+
+ ([&]<typename T>(T& duration) {
+ duration = std::chrono::duration_cast<std::decay_t<T>>(input_duration);
+ input_duration -= duration;
+ } (std::get<Durations>(result)), ...);
+
+ return result;
+}
+
+std::string
formatAsDaysHoursMinutesSeconds(std::chrono::system_clock::duration
input_duration) {
+ const auto durs = breakDownDurations<std::chrono::days, std::chrono::hours,
std::chrono::minutes, std::chrono::seconds>(input_duration);
+ const auto& days = std::get<std::chrono::days>(durs);
+ std::string day_string;
+ if (days.count() > 0) {
+ day_string = fmt::format("{} {}", days.count(), days.count() == 1 ? "day,
" : "days, ");
+ }
+ return fmt::format("{}{:02}:{:02}:{:02}",
+ day_string, std::get<std::chrono::hours>(durs).count(),
+ std::get<std::chrono::minutes>(durs).count(),
+ std::get<std::chrono::seconds>(durs).count());
+}
+
+template<class... Durations>
+std::string formatAsRoundedLargestUnit(std::chrono::system_clock::duration
input_duration) {
+ std::optional<std::string> rounded_value_str;
+ using std::chrono::duration_cast;
+ using std::chrono::duration;
+
+ ((rounded_value_str = input_duration >= Durations(1)
+ ? std::optional<std::string>{fmt::format("{:.2}",
duration_cast<duration<double, typename Durations::period>>(input_duration))}
+ : std::nullopt) || ...);
+
+
+ if (!rounded_value_str) {
+ return fmt::format("{}", input_duration);
+ }
+
+ return *rounded_value_str;
+}
+
+} // namespace
+
+std::string humanReadableDuration(std::chrono::system_clock::duration
input_duration) {
+ if (input_duration > 5s) {
+ return formatAsDaysHoursMinutesSeconds(input_duration);
+ }
+
+ return formatAsRoundedLargestUnit<std::chrono::seconds,
std::chrono::milliseconds, std::chrono::microseconds>(input_duration);
+}
+
std::optional<std::chrono::system_clock::time_point> parseRfc3339(const
std::string& str) {
std::istringstream stream(str);
date::year_month_day date_part{};