This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 65fad0d02 MINIFICPP-1708 - Route flowfiles to Failure on decompression
error
65fad0d02 is described below
commit 65fad0d02d04463181470b21c8ea290ddd61c52a
Author: Adam Debreceni <[email protected]>
AuthorDate: Mon Apr 11 11:15:07 2022 +0200
MINIFICPP-1708 - Route flowfiles to Failure on decompression error
Signed-off-by: Arpad Boda <[email protected]>
This closes #1298
---
extensions/libarchive/CompressContent.cpp | 17 +-
.../test/archive-tests/CompressContentTests.cpp | 217 ++++++++++-----------
2 files changed, 116 insertions(+), 118 deletions(-)
diff --git a/extensions/libarchive/CompressContent.cpp
b/extensions/libarchive/CompressContent.cpp
index 421bc8d83..874eb72c0 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -175,7 +175,7 @@ void CompressContent::processFlowFile(const
std::shared_ptr<core::FlowFile>& flo
fileExtension = search->second;
}
std::shared_ptr<core::FlowFile> result = session->create(flowFile);
- bool success = false;
+ bool success = true;
if (encapsulateInTar_) {
std::function<int64_t(const std::shared_ptr<io::InputStream>&, const
std::shared_ptr<io::OutputStream>&)> transformer;
@@ -193,9 +193,15 @@ void CompressContent::processFlowFile(const
std::shared_ptr<core::FlowFile>& flo
transformer = [&] (const std::shared_ptr<io::InputStream>& in, const
std::shared_ptr<io::OutputStream>& out) -> int64_t {
io::ReadArchiveStreamImpl decompressor(in);
if (!decompressor.nextEntry()) {
- return -1;
+ success = false;
+ return 0; // prevents a session rollback
+ }
+ auto ret = internal::pipe(&decompressor, out.get());
+ if (ret < 0) {
+ success = false;
+ return 0; // prevents a session rollback
}
- return internal::pipe(&decompressor, out.get());
+ return ret;
};
}
session->write(result, FunctionOutputStreamCallback([&] (const auto& out) {
@@ -203,11 +209,6 @@ void CompressContent::processFlowFile(const
std::shared_ptr<core::FlowFile>& flo
return transformer(in, out);
}));
}));
- // TODO(adebreceni): previous attempt to handle a malformed archive were
in vain
- // as the session->read threw anyway rolling back the flowfile, we
should correctly
- // forward a malformed archive to failure
- // https://issues.apache.org/jira/browse/MINIFICPP-1708
- success = true;
} else {
CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_,
flowFile, session);
session->write(result, &callback);
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp
b/libminifi/test/archive-tests/CompressContentTests.cpp
index bd1957797..6804e3a45 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -123,7 +123,7 @@ class CompressDecompressionTestController : public
TestController{
std::shared_ptr<core::ContentRepository> content_repo =
std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(std::make_shared<minifi::Configure>());
- // connection from compress processor to log attribute
+ // connection from compress processor to success
output = std::make_shared<minifi::Connection>(repo, content_repo,
"Output");
output->addRelationship(core::Relationship("success", "compress successful
output"));
output->setSource(processor.get());
@@ -135,12 +135,47 @@ class CompressDecompressionTestController : public
TestController{
input->setDestinationUUID(processoruuid);
processor->addConnection(input.get());
- processor->setAutoTerminatedRelationships({{"failure", ""}});
+ // connection from compress processor to failure
+ failure_output = std::make_shared<minifi::Connection>(repo, content_repo,
"FailureOutput");
+ failure_output->addRelationship(core::Relationship("failure", "compress
failure output"));
+ failure_output->setSource(processor.get());
+ failure_output->setSourceUUID(processoruuid);
+ processor->addConnection(failure_output.get());
processor->incrementActiveTasks();
processor->setScheduledState(core::ScheduledState::RUNNING);
context =
std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor.get()),
nullptr, repo, repo, content_repo);
+ helper_session = std::make_shared<core::ProcessSession>(context);
+ }
+
+ std::shared_ptr<core::FlowFile> importFlowFile(const std::string&
content_path) {
+ std::shared_ptr<core::FlowFile> flow =
std::static_pointer_cast<core::FlowFile>(helper_session->create());
+ helper_session->import(content_path, flow, true, 0);
+ helper_session->flushContent();
+ input->put(flow);
+ return flow;
+ }
+
+ template<typename T>
+ std::shared_ptr<core::FlowFile> importFlowFileFrom(T&& source) {
+ std::shared_ptr<core::FlowFile> flow =
std::static_pointer_cast<core::FlowFile>(helper_session->create());
+ helper_session->importFrom(std::forward<T>(source), flow);
+ helper_session->flushContent();
+ input->put(flow);
+ return flow;
+ }
+
+ void trigger() {
+ auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+ processor->onSchedule(context, factory);
+ auto session = std::make_shared<core::ProcessSession>(context);
+ processor->onTrigger(context, session);
+ session->commit();
+ }
+
+ void read(const std::shared_ptr<core::FlowFile>& file, ReadCallback& reader)
{
+ helper_session->read(file, &reader);
}
public:
@@ -172,11 +207,15 @@ class CompressDecompressionTestController : public
TestController{
return RawContent{std::move(contents)};
}
+
+
virtual ~CompressDecompressionTestController() = 0;
std::shared_ptr<core::Processor> processor;
+ std::shared_ptr<core::ProcessSession> helper_session;
std::shared_ptr<core::ProcessContext> context;
std::shared_ptr<minifi::Connection> output;
+ std::shared_ptr<minifi::Connection> failure_output;
std::shared_ptr<minifi::Connection> input;
};
@@ -239,19 +278,10 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileGZip", "[compressfiletest1
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
+ auto flow = importFlowFile(rawContentPath());
flow->setAttribute(core::SpecialFlowAttribute::FILENAME, "inputfile");
- sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
- sessionGenFlowFile.flushContent();
- input->put(flow);
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ trigger();
// validate the compress content
std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -265,7 +295,7 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileGZip", "[compressfiletest1
flow1->getAttribute(core::SpecialFlowAttribute::FILENAME, attribute_value);
REQUIRE(attribute_value == "inputfile.tar.gz");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
callback.archive_read();
std::string content(reinterpret_cast<char *>
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
REQUIRE(getRawContent() == content);
@@ -280,19 +310,10 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileGZip", "[compressfilet
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
+ auto flow = importFlowFile(compressedPath());
flow->setAttribute(core::SpecialFlowAttribute::FILENAME, "inputfile.tar.gz");
- sessionGenFlowFile.import(compressedPath(), flow, true, 0);
- sessionGenFlowFile.flushContent();
- input->put(flow);
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ trigger();
// validate the compress content
std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -305,7 +326,7 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileGZip", "[compressfilet
flow1->getAttribute(core::SpecialFlowAttribute::FILENAME, attribute_value);
REQUIRE(attribute_value == "inputfile");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
std::string content(reinterpret_cast<char *> (callback.buffer_.data()),
callback.read_size_);
REQUIRE(getRawContent() == content);
}
@@ -317,18 +338,8 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileBZip", "[compressfiletest3
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
- sessionGenFlowFile.flushContent();
- input->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ auto flow = importFlowFile(rawContentPath());
+ trigger();
// validate the compress content
std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -340,7 +351,7 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileBZip", "[compressfiletest3
flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/bzip2");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
callback.archive_read();
std::string contents(reinterpret_cast<char *>
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
REQUIRE(getRawContent() == contents);
@@ -356,18 +367,9 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileBZip", "[compressfilet
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(compressedPath(), flow, true, 0);
- sessionGenFlowFile.flushContent();
- input->put(flow);
+ auto flow = importFlowFile(compressedPath());
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ trigger();
// validate the compress content
std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -378,7 +380,7 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileBZip", "[compressfilet
std::string mime;
REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime)
== false);
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_.data()),
callback.read_size_);
REQUIRE(getRawContent() == contents);
}
@@ -390,18 +392,8 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileLZMA", "[compressfiletest5
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
- sessionGenFlowFile.flushContent();
- input->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ auto flow = importFlowFile(rawContentPath());
+ trigger();
if (LogTestController::getInstance().contains("compression not supported on
this platform")) {
// platform not support LZMA
@@ -419,7 +411,7 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileLZMA", "[compressfiletest5
flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/x-lzma");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
callback.archive_read();
std::string contents(reinterpret_cast<char *>
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
REQUIRE(getRawContent() == contents);
@@ -435,19 +427,9 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileLZMA", "[compressfilet
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+ auto flow = importFlowFile(compressedPath());
flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE,
"application/x-lzma");
- sessionGenFlowFile.flushContent();
- input->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ trigger();
if (LogTestController::getInstance().contains("compression not supported on
this platform")) {
// platform not support LZMA
@@ -464,7 +446,7 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileLZMA", "[compressfilet
std::string mime;
REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime)
== false);
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_.data()),
callback.read_size_);
REQUIRE(getRawContent() == contents);
}
@@ -476,18 +458,8 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileXYLZMA", "[compressfiletes
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
- sessionGenFlowFile.flushContent();
- input->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ auto flow = importFlowFile(rawContentPath());
+ trigger();
if (LogTestController::getInstance().contains("compression not supported on
this platform")) {
// platform not support LZMA
@@ -505,7 +477,7 @@ TEST_CASE_METHOD(CompressTestController,
"CompressFileXYLZMA", "[compressfiletes
flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/x-xz");
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
callback.archive_read();
std::string contents(reinterpret_cast<char *>
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
REQUIRE(getRawContent() == contents);
@@ -521,19 +493,9 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileXYLZMA", "[compressfil
context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
- core::ProcessSession sessionGenFlowFile(context);
- std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast <
core::FlowFile > (sessionGenFlowFile.create());
- sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+ auto flow = importFlowFile(compressedPath());
flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE,
"application/x-xz");
- sessionGenFlowFile.flushContent();
- input->put(flow);
-
- REQUIRE(processor->getName() == "compresscontent");
- auto factory = std::make_shared<core::ProcessSessionFactory>(context);
- processor->onSchedule(context, factory);
- auto session = std::make_shared<core::ProcessSession>(context);
- processor->onTrigger(context, session);
- session->commit();
+ trigger();
if (LogTestController::getInstance().contains("compression not supported on
this platform")) {
// platform not support LZMA
@@ -550,7 +512,7 @@ TEST_CASE_METHOD(DecompressTestController,
"DecompressFileXYLZMA", "[compressfil
std::string mime;
REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime)
== false);
ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
- sessionGenFlowFile.read(flow1, &callback);
+ read(flow1, callback);
std::string contents(reinterpret_cast<char *> (callback.buffer_.data()),
callback.read_size_);
REQUIRE(getRawContent() == contents);
}
@@ -670,13 +632,8 @@ TEST_CASE_METHOD(CompressTestController, "Batch
CompressFileGZip", "[compressFil
context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
context->setProperty(minifi::processors::CompressContent::BatchSize,
std::to_string(batchSize));
-
- core::ProcessSession sessionGenFlowFile(context);
for (const auto& content : flowFileContents) {
- auto flow = sessionGenFlowFile.create();
- sessionGenFlowFile.importFrom(minifi::io::BufferStream(content), flow);
- sessionGenFlowFile.flushContent();
- input->put(flow);
+ importFlowFileFrom(minifi::io::BufferStream(content));
}
REQUIRE(processor->getName() == "compresscontent");
@@ -716,9 +673,49 @@ TEST_CASE_METHOD(CompressTestController, "Batch
CompressFileGZip", "[compressFil
file->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
REQUIRE(mime == "application/gzip");
ReadCallback callback(gsl::narrow<size_t>(file->getSize()));
- sessionGenFlowFile.read(file, &callback);
+ read(file, callback);
callback.archive_read();
std::string content(reinterpret_cast<char *>
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
REQUIRE(flowFileContents[idx] == content);
}
}
+
+TEST_CASE_METHOD(DecompressTestController, "Invalid archive decompression",
"[compressfiletest9]") {
+ context->setProperty(minifi::processors::CompressContent::CompressMode,
toString(CompressionMode::Decompress));
+ SECTION("GZIP") {
+ context->setProperty(minifi::processors::CompressContent::CompressFormat,
toString(CompressionFormat::GZIP));
+ }
+ SECTION("LZMA") {
+ context->setProperty(minifi::processors::CompressContent::CompressFormat,
toString(CompressionFormat::LZMA));
+ }
+ SECTION("XZ_LZMA2") {
+ context->setProperty(minifi::processors::CompressContent::CompressFormat,
toString(CompressionFormat::XZ_LZMA2));
+ }
+ SECTION("BZIP2") {
+ context->setProperty(minifi::processors::CompressContent::CompressFormat,
toString(CompressionFormat::BZIP2));
+ }
+ context->setProperty(minifi::processors::CompressContent::CompressLevel,
"9");
+ context->setProperty(minifi::processors::CompressContent::UpdateFileName,
"true");
+
+ importFlowFileFrom(minifi::io::BufferStream(std::string{"banana bread"}));
+ trigger();
+
+ if (LogTestController::getInstance().contains("compression not supported on
this platform")) {
+ return;
+ }
+
+ // validate the compress content
+ std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+ REQUIRE_FALSE(output->poll(expiredFlowRecords));
+ REQUIRE(expiredFlowRecords.empty());
+
+ auto invalid_flow = failure_output->poll(expiredFlowRecords);
+ REQUIRE(invalid_flow);
+ REQUIRE(expiredFlowRecords.empty());
+ {
+ ReadCallback callback(gsl::narrow<size_t>(invalid_flow->getSize()));
+ read(invalid_flow, callback);
+ std::string contents(reinterpret_cast<char *> (callback.buffer_.data()),
callback.read_size_);
+ REQUIRE(contents == "banana bread");
+ }
+}