This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 65fad0d02 MINIFICPP-1708 - Route flowfiles to Failure on decompression 
error
65fad0d02 is described below

commit 65fad0d02d04463181470b21c8ea290ddd61c52a
Author: Adam Debreceni <[email protected]>
AuthorDate: Mon Apr 11 11:15:07 2022 +0200

    MINIFICPP-1708 - Route flowfiles to Failure on decompression error
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #1298
---
 extensions/libarchive/CompressContent.cpp          |  17 +-
 .../test/archive-tests/CompressContentTests.cpp    | 217 ++++++++++-----------
 2 files changed, 116 insertions(+), 118 deletions(-)

diff --git a/extensions/libarchive/CompressContent.cpp 
b/extensions/libarchive/CompressContent.cpp
index 421bc8d83..874eb72c0 100644
--- a/extensions/libarchive/CompressContent.cpp
+++ b/extensions/libarchive/CompressContent.cpp
@@ -175,7 +175,7 @@ void CompressContent::processFlowFile(const 
std::shared_ptr<core::FlowFile>& flo
     fileExtension = search->second;
   }
   std::shared_ptr<core::FlowFile> result = session->create(flowFile);
-  bool success = false;
+  bool success = true;
   if (encapsulateInTar_) {
     std::function<int64_t(const std::shared_ptr<io::InputStream>&, const 
std::shared_ptr<io::OutputStream>&)> transformer;
 
@@ -193,9 +193,15 @@ void CompressContent::processFlowFile(const 
std::shared_ptr<core::FlowFile>& flo
       transformer = [&] (const std::shared_ptr<io::InputStream>& in, const 
std::shared_ptr<io::OutputStream>& out) -> int64_t {
         io::ReadArchiveStreamImpl decompressor(in);
         if (!decompressor.nextEntry()) {
-          return -1;
+          success = false;
+          return 0;  // prevents a session rollback
+        }
+        auto ret = internal::pipe(&decompressor, out.get());
+        if (ret < 0) {
+          success = false;
+          return 0;  // prevents a session rollback
         }
-        return internal::pipe(&decompressor, out.get());
+        return ret;
       };
     }
     session->write(result, FunctionOutputStreamCallback([&] (const auto& out) {
@@ -203,11 +209,6 @@ void CompressContent::processFlowFile(const 
std::shared_ptr<core::FlowFile>& flo
         return transformer(in, out);
       }));
     }));
-    // TODO(adebreceni): previous attempt to handle a malformed archive were 
in vain
-    //    as the session->read threw anyway rolling back the flowfile, we 
should correctly
-    //    forward a malformed archive to failure
-    //    https://issues.apache.org/jira/browse/MINIFICPP-1708
-    success = true;
   } else {
     CompressContent::GzipWriteCallback callback(compressMode_, compressLevel_, 
flowFile, session);
     session->write(result, &callback);
diff --git a/libminifi/test/archive-tests/CompressContentTests.cpp 
b/libminifi/test/archive-tests/CompressContentTests.cpp
index bd1957797..6804e3a45 100644
--- a/libminifi/test/archive-tests/CompressContentTests.cpp
+++ b/libminifi/test/archive-tests/CompressContentTests.cpp
@@ -123,7 +123,7 @@ class CompressDecompressionTestController : public 
TestController{
 
     std::shared_ptr<core::ContentRepository> content_repo = 
std::make_shared<core::repository::VolatileContentRepository>();
     content_repo->initialize(std::make_shared<minifi::Configure>());
-    // connection from compress processor to log attribute
+    // connection from compress processor to success
     output = std::make_shared<minifi::Connection>(repo, content_repo, 
"Output");
     output->addRelationship(core::Relationship("success", "compress successful 
output"));
     output->setSource(processor.get());
@@ -135,12 +135,47 @@ class CompressDecompressionTestController : public 
TestController{
     input->setDestinationUUID(processoruuid);
     processor->addConnection(input.get());
 
-    processor->setAutoTerminatedRelationships({{"failure", ""}});
+    // connection from compress processor to failure
+    failure_output = std::make_shared<minifi::Connection>(repo, content_repo, 
"FailureOutput");
+    failure_output->addRelationship(core::Relationship("failure", "compress 
failure output"));
+    failure_output->setSource(processor.get());
+    failure_output->setSourceUUID(processoruuid);
+    processor->addConnection(failure_output.get());
 
     processor->incrementActiveTasks();
     processor->setScheduledState(core::ScheduledState::RUNNING);
 
     context = 
std::make_shared<core::ProcessContext>(std::make_shared<core::ProcessorNode>(processor.get()),
 nullptr, repo, repo, content_repo);
+    helper_session = std::make_shared<core::ProcessSession>(context);
+  }
+
+  std::shared_ptr<core::FlowFile> importFlowFile(const std::string& 
content_path) {
+    std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(helper_session->create());
+    helper_session->import(content_path, flow, true, 0);
+    helper_session->flushContent();
+    input->put(flow);
+    return flow;
+  }
+
+  template<typename T>
+  std::shared_ptr<core::FlowFile> importFlowFileFrom(T&& source) {
+    std::shared_ptr<core::FlowFile> flow = 
std::static_pointer_cast<core::FlowFile>(helper_session->create());
+    helper_session->importFrom(std::forward<T>(source), flow);
+    helper_session->flushContent();
+    input->put(flow);
+    return flow;
+  }
+
+  void trigger() {
+    auto factory = std::make_shared<core::ProcessSessionFactory>(context);
+    processor->onSchedule(context, factory);
+    auto session = std::make_shared<core::ProcessSession>(context);
+    processor->onTrigger(context, session);
+    session->commit();
+  }
+
+  void read(const std::shared_ptr<core::FlowFile>& file, ReadCallback& reader) 
{
+    helper_session->read(file, &reader);
   }
 
  public:
@@ -172,11 +207,15 @@ class CompressDecompressionTestController : public 
TestController{
     return RawContent{std::move(contents)};
   }
 
+
+
   virtual ~CompressDecompressionTestController() = 0;
 
   std::shared_ptr<core::Processor> processor;
+  std::shared_ptr<core::ProcessSession> helper_session;
   std::shared_ptr<core::ProcessContext> context;
   std::shared_ptr<minifi::Connection> output;
+  std::shared_ptr<minifi::Connection> failure_output;
   std::shared_ptr<minifi::Connection> input;
 };
 
@@ -239,19 +278,10 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileGZip", "[compressfiletest1
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+  auto flow = importFlowFile(rawContentPath());
   flow->setAttribute(core::SpecialFlowAttribute::FILENAME, "inputfile");
-  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
 
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  trigger();
 
   // validate the compress content
   std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -265,7 +295,7 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileGZip", "[compressfiletest1
     flow1->getAttribute(core::SpecialFlowAttribute::FILENAME, attribute_value);
     REQUIRE(attribute_value == "inputfile.tar.gz");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     callback.archive_read();
     std::string content(reinterpret_cast<char *> 
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
     REQUIRE(getRawContent() == content);
@@ -280,19 +310,10 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileGZip", "[compressfilet
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
+  auto flow = importFlowFile(compressedPath());
   flow->setAttribute(core::SpecialFlowAttribute::FILENAME, "inputfile.tar.gz");
-  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
 
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  trigger();
 
   // validate the compress content
   std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -305,7 +326,7 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileGZip", "[compressfilet
     flow1->getAttribute(core::SpecialFlowAttribute::FILENAME, attribute_value);
     REQUIRE(attribute_value == "inputfile");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     std::string content(reinterpret_cast<char *> (callback.buffer_.data()), 
callback.read_size_);
     REQUIRE(getRawContent() == content);
   }
@@ -317,18 +338,8 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileBZip", "[compressfiletest3
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
-
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  auto flow = importFlowFile(rawContentPath());
+  trigger();
 
   // validate the compress content
   std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -340,7 +351,7 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileBZip", "[compressfiletest3
     flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/bzip2");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     callback.archive_read();
     std::string contents(reinterpret_cast<char *> 
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
     REQUIRE(getRawContent() == contents);
@@ -356,18 +367,9 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileBZip", "[compressfilet
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
+  auto flow = importFlowFile(compressedPath());
 
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  trigger();
 
   // validate the compress content
   std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
@@ -378,7 +380,7 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileBZip", "[compressfilet
     std::string mime;
     REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) 
== false);
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_.data()), 
callback.read_size_);
     REQUIRE(getRawContent() == contents);
   }
@@ -390,18 +392,8 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileLZMA", "[compressfiletest5
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
-
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  auto flow = importFlowFile(rawContentPath());
+  trigger();
 
   if (LogTestController::getInstance().contains("compression not supported on 
this platform")) {
     // platform not support LZMA
@@ -419,7 +411,7 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileLZMA", "[compressfiletest5
     flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/x-lzma");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     callback.archive_read();
     std::string contents(reinterpret_cast<char *> 
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
     REQUIRE(getRawContent() == contents);
@@ -435,19 +427,9 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileLZMA", "[compressfilet
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+  auto flow = importFlowFile(compressedPath());
   flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, 
"application/x-lzma");
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
-
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  trigger();
 
   if (LogTestController::getInstance().contains("compression not supported on 
this platform")) {
     // platform not support LZMA
@@ -464,7 +446,7 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileLZMA", "[compressfilet
     std::string mime;
     REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) 
== false);
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_.data()), 
callback.read_size_);
     REQUIRE(getRawContent() == contents);
   }
@@ -476,18 +458,8 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileXYLZMA", "[compressfiletes
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(rawContentPath(), flow, true, 0);
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
-
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  auto flow = importFlowFile(rawContentPath());
+  trigger();
 
   if (LogTestController::getInstance().contains("compression not supported on 
this platform")) {
     // platform not support LZMA
@@ -505,7 +477,7 @@ TEST_CASE_METHOD(CompressTestController, 
"CompressFileXYLZMA", "[compressfiletes
     flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/x-xz");
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     callback.archive_read();
     std::string contents(reinterpret_cast<char *> 
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
     REQUIRE(getRawContent() == contents);
@@ -521,19 +493,9 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileXYLZMA", "[compressfil
   context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
 
-  core::ProcessSession sessionGenFlowFile(context);
-  std::shared_ptr<core::FlowFile> flow = std::static_pointer_cast < 
core::FlowFile > (sessionGenFlowFile.create());
-  sessionGenFlowFile.import(compressedPath(), flow, true, 0);
+  auto flow = importFlowFile(compressedPath());
   flow->setAttribute(core::SpecialFlowAttribute::MIME_TYPE, 
"application/x-xz");
-  sessionGenFlowFile.flushContent();
-  input->put(flow);
-
-  REQUIRE(processor->getName() == "compresscontent");
-  auto factory = std::make_shared<core::ProcessSessionFactory>(context);
-  processor->onSchedule(context, factory);
-  auto session = std::make_shared<core::ProcessSession>(context);
-  processor->onTrigger(context, session);
-  session->commit();
+  trigger();
 
   if (LogTestController::getInstance().contains("compression not supported on 
this platform")) {
     // platform not support LZMA
@@ -550,7 +512,7 @@ TEST_CASE_METHOD(DecompressTestController, 
"DecompressFileXYLZMA", "[compressfil
     std::string mime;
     REQUIRE(flow1->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime) 
== false);
     ReadCallback callback(gsl::narrow<size_t>(flow1->getSize()));
-    sessionGenFlowFile.read(flow1, &callback);
+    read(flow1, callback);
     std::string contents(reinterpret_cast<char *> (callback.buffer_.data()), 
callback.read_size_);
     REQUIRE(getRawContent() == contents);
   }
@@ -670,13 +632,8 @@ TEST_CASE_METHOD(CompressTestController, "Batch 
CompressFileGZip", "[compressFil
   context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
   context->setProperty(minifi::processors::CompressContent::BatchSize, 
std::to_string(batchSize));
 
-
-  core::ProcessSession sessionGenFlowFile(context);
   for (const auto& content : flowFileContents) {
-    auto flow = sessionGenFlowFile.create();
-    sessionGenFlowFile.importFrom(minifi::io::BufferStream(content), flow);
-    sessionGenFlowFile.flushContent();
-    input->put(flow);
+    importFlowFileFrom(minifi::io::BufferStream(content));
   }
 
   REQUIRE(processor->getName() == "compresscontent");
@@ -716,9 +673,49 @@ TEST_CASE_METHOD(CompressTestController, "Batch 
CompressFileGZip", "[compressFil
     file->getAttribute(core::SpecialFlowAttribute::MIME_TYPE, mime);
     REQUIRE(mime == "application/gzip");
     ReadCallback callback(gsl::narrow<size_t>(file->getSize()));
-    sessionGenFlowFile.read(file, &callback);
+    read(file, callback);
     callback.archive_read();
     std::string content(reinterpret_cast<char *> 
(callback.archive_buffer_.data()), callback.archive_buffer_.size());
     REQUIRE(flowFileContents[idx] == content);
   }
 }
+
+TEST_CASE_METHOD(DecompressTestController, "Invalid archive decompression", 
"[compressfiletest9]") {
+  context->setProperty(minifi::processors::CompressContent::CompressMode, 
toString(CompressionMode::Decompress));
+  SECTION("GZIP") {
+    context->setProperty(minifi::processors::CompressContent::CompressFormat, 
toString(CompressionFormat::GZIP));
+  }
+  SECTION("LZMA") {
+    context->setProperty(minifi::processors::CompressContent::CompressFormat, 
toString(CompressionFormat::LZMA));
+  }
+  SECTION("XZ_LZMA2") {
+    context->setProperty(minifi::processors::CompressContent::CompressFormat, 
toString(CompressionFormat::XZ_LZMA2));
+  }
+  SECTION("BZIP2") {
+    context->setProperty(minifi::processors::CompressContent::CompressFormat, 
toString(CompressionFormat::BZIP2));
+  }
+  context->setProperty(minifi::processors::CompressContent::CompressLevel, 
"9");
+  context->setProperty(minifi::processors::CompressContent::UpdateFileName, 
"true");
+
+  importFlowFileFrom(minifi::io::BufferStream(std::string{"banana bread"}));
+  trigger();
+
+  if (LogTestController::getInstance().contains("compression not supported on 
this platform")) {
+    return;
+  }
+
+  // validate the compress content
+  std::set<std::shared_ptr<core::FlowFile>> expiredFlowRecords;
+  REQUIRE_FALSE(output->poll(expiredFlowRecords));
+  REQUIRE(expiredFlowRecords.empty());
+
+  auto invalid_flow = failure_output->poll(expiredFlowRecords);
+  REQUIRE(invalid_flow);
+  REQUIRE(expiredFlowRecords.empty());
+  {
+    ReadCallback callback(gsl::narrow<size_t>(invalid_flow->getSize()));
+    read(invalid_flow, callback);
+    std::string contents(reinterpret_cast<char *> (callback.buffer_.data()), 
callback.read_size_);
+    REQUIRE(contents == "banana bread");
+  }
+}

Reply via email to