This is an automated email from the ASF dual-hosted git repository.
lordgamez pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new be51e246c MINIFICPP-2230 MergeContent should yield if no ready bins
and no incoming flowfiles
be51e246c is described below
commit be51e246cded86ced5a125bd240aa5ef74aca9fb
Author: Martin Zink <[email protected]>
AuthorDate: Wed Mar 27 14:24:25 2024 +0100
MINIFICPP-2230 MergeContent should yield if no ready bins and no incoming
flowfiles
Co-authored-by: Ferenc Gerlits <[email protected]>
Co-authored-by: Gabor Gyimesi <[email protected]>
Signed-off-by: Gabor Gyimesi <[email protected]>
This closes #1748
---
extensions/libarchive/BinFiles.cpp | 16 +++++++++++++---
extensions/libarchive/BinFiles.h | 2 +-
extensions/libarchive/tests/MergeFileTests.cpp | 24 ++++++++++++++++++++++++
3 files changed, 38 insertions(+), 4 deletions(-)
diff --git a/extensions/libarchive/BinFiles.cpp
b/extensions/libarchive/BinFiles.cpp
index e2982973d..b0687d46e 100644
--- a/extensions/libarchive/BinFiles.cpp
+++ b/extensions/libarchive/BinFiles.cpp
@@ -223,11 +223,14 @@ bool BinFiles::resurrectFlowFiles(core::ProcessSession
&session) {
return had_failure;
}
-void BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) {
+bool BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) {
for (size_t i = 0; i < batchSize_; ++i) {
auto flow = session.get();
if (flow == nullptr) {
+ if (i == 0) { // Batch didn't contain a single flowfile, we should
yield if there are no ready bins either
+ return false;
+ }
break;
}
@@ -242,6 +245,7 @@ void
BinFiles::assumeOwnershipOfNextBatch(core::ProcessSession &session) {
session.transfer(flow, Self);
}
session.commit();
+ return true;
}
void BinFiles::processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins,
core::ProcessSession &session) {
@@ -283,8 +287,14 @@ void BinFiles::onTrigger(core::ProcessContext& context,
core::ProcessSession& se
return;
}
- assumeOwnershipOfNextBatch(session);
- processReadyBins(gatherReadyBins(context), session);
+ const bool valid_batch = assumeOwnershipOfNextBatch(session);
+ if (auto ready_bins = gatherReadyBins(context); ready_bins.empty()) {
+ if (!valid_batch) {
+ yield();
+ }
+ } else {
+ processReadyBins(std::move(ready_bins), session);
+ }
}
void BinFiles::transferFlowsToFail(core::ProcessSession &session,
std::unique_ptr<Bin> &bin) {
diff --git a/extensions/libarchive/BinFiles.h b/extensions/libarchive/BinFiles.h
index ab33e2c64..3dbb8fe86 100644
--- a/extensions/libarchive/BinFiles.h
+++ b/extensions/libarchive/BinFiles.h
@@ -279,7 +279,7 @@ class BinFiles : public core::Processor {
// Sort flow files retrieved from the flow file repository after restart to
their respective bins
bool resurrectFlowFiles(core::ProcessSession &session);
- void assumeOwnershipOfNextBatch(core::ProcessSession &session);
+ bool assumeOwnershipOfNextBatch(core::ProcessSession &session);
std::deque<std::unique_ptr<Bin>> gatherReadyBins(core::ProcessContext
&context);
void processReadyBins(std::deque<std::unique_ptr<Bin>> ready_bins,
core::ProcessSession &session);
diff --git a/extensions/libarchive/tests/MergeFileTests.cpp
b/extensions/libarchive/tests/MergeFileTests.cpp
index a98a18cf2..8c3015712 100644
--- a/extensions/libarchive/tests/MergeFileTests.cpp
+++ b/extensions/libarchive/tests/MergeFileTests.cpp
@@ -36,6 +36,7 @@
#include "MergeContent.h"
#include "processors/LogAttribute.h"
#include "TestBase.h"
+#include "SingleProcessorTestController.h"
#include "Catch.h"
#include "unit/ProvenanceTestHelper.h"
#include "serialization/FlowFileV3Serializer.h"
@@ -899,3 +900,26 @@ TEST_CASE_METHOD(MergeTestController, "Maximum Group Size
is respected", "[testM
REQUIRE(expiredFlowRecords.empty());
REQUIRE_FALSE(flow3);
}
+
+TEST_CASE("Empty MergeContent yields") {
+ const auto merge_content =
std::make_shared<minifi::processors::MergeContent>("mergeContent");
+
+ minifi::test::SingleProcessorTestController controller{merge_content};
+ controller.trigger();
+
+ CHECK(merge_content->isYield());
+}
+
+TEST_CASE("Empty MergeContent doesnt yield when processing readybins") {
+ const auto merge_content =
std::make_shared<minifi::processors::MergeContent>("mergeContent");
+
+ minifi::test::SingleProcessorTestController controller{merge_content};
+ controller.plan->setProperty(merge_content,
minifi::processors::MergeContent::MaxBinAge, "100ms");
+ controller.plan->setProperty(merge_content,
minifi::processors::MergeContent::MinEntries, "2");
+
+ auto first_trigger_results = controller.trigger("foo");
+ CHECK_FALSE(merge_content->isYield());
+ std::this_thread::sleep_for(100ms);
+ auto second_trigger_results = controller.trigger();
+ CHECK_FALSE(merge_content->isYield());
+}