Repository: nifi-minifi-cpp
Updated Branches:
  refs/heads/master 2da613875 -> 6220b37c1


MINIFICPP-430 Fixed improper size reporting/failed read for importing files or 
filesystems which do not support seek operations

This closes #282.

Signed-off-by: Marc Parisi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/6220b37c
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/6220b37c
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/6220b37c

Branch: refs/heads/master
Commit: 6220b37c1076fbe22f8a99f32cbe285385c25e5d
Parents: 2da6138
Author: Andrew I. Christianson <[email protected]>
Authored: Fri Mar 16 12:18:28 2018 -0400
Committer: Marc Parisi <[email protected]>
Committed: Sun Mar 18 21:16:15 2018 -0400

----------------------------------------------------------------------
 libminifi/src/core/ProcessSession.cpp | 26 ++++++++--
 libminifi/test/unit/GetFileTests.cpp  | 82 ++++++++++++++++++++++++++++++
 2 files changed, 103 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6220b37c/libminifi/src/core/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessSession.cpp 
b/libminifi/src/core/ProcessSession.cpp
index bd98db6..fde2bc6 100644
--- a/libminifi/src/core/ProcessSession.cpp
+++ b/libminifi/src/core/ProcessSession.cpp
@@ -417,10 +417,18 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr<core::Flow
       rollback();
       return;
     }
-    if (input.is_open()) {
-      // Open the source file and stream to the flow file
-      input.seekg(offset);
+    if (input.is_open() && input.good()) {
       bool invalidWrite = false;
+      // Open the source file and stream to the flow file
+      if (offset != 0) {
+        input.seekg(offset);
+        if (!input.good()) {
+          logger_->log_error("Seeking to %d failed for file %s (does 
file/filesystem support seeking?)",
+                             offset,
+                             source);
+          invalidWrite = true;
+        }
+      }
       while (input.good()) {
         input.read(reinterpret_cast<char*>(charBuffer.data()), size);
         if (input) {
@@ -494,8 +502,16 @@ void ProcessSession::import(std::string source, 
std::vector<std::shared_ptr<Flow
     std::ifstream input;
     logger_->log_debug("Opening %s", source);
     input.open(source.c_str(), std::fstream::in | std::fstream::binary);
-    if (input.is_open()) {
-      input.seekg(offset, input.beg);
+    if (input.is_open() && input.good()) {
+      if (offset != 0) {
+        input.seekg(offset, input.beg);
+        if (!input.good()) {
+          logger_->log_error("Seeking to %d failed for file %s (does 
file/filesystem support seeking?)",
+                             offset,
+                             source);
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
+        }
+      }
       while (input.good()) {
         bool invalidWrite = false;
         flowFile = std::static_pointer_cast<FlowFileRecord>(create());

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/6220b37c/libminifi/test/unit/GetFileTests.cpp
----------------------------------------------------------------------
diff --git a/libminifi/test/unit/GetFileTests.cpp 
b/libminifi/test/unit/GetFileTests.cpp
new file mode 100644
index 0000000..6e8aa25
--- /dev/null
+++ b/libminifi/test/unit/GetFileTests.cpp
@@ -0,0 +1,82 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <utility>
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+
+#include "../TestBase.h"
+#include "processors/LogAttribute.h"
+#include "processors/GetFile.h"
+
+TEST_CASE("GetFile: FIFO", "[getFileFifo]") { // NOLINT
+  TestController testController;
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::GetFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+
+  auto plan = testController.createPlan();
+  auto repo = std::make_shared<TestRepository>();
+
+  // Define directory for input
+  std::string in_dir("/tmp/gt.XXXXXX");
+  REQUIRE(testController.createTempDirectory(&in_dir[0]) != nullptr);
+
+  // Define test input file
+  std::string in_file(in_dir);
+  in_file.append("/testfifo");
+
+  // Build MiNiFi processing graph
+  auto get_file = plan->addProcessor(
+      "GetFile",
+      "Get");
+  plan->setProperty(
+      get_file,
+      processors::GetFile::Directory.getName(), in_dir);
+  plan->setProperty(
+      get_file,
+      processors::GetFile::KeepSourceFile.getName(),
+      "true");
+  plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+
+  // Write test input
+  REQUIRE(0 == mkfifo(in_file.c_str(), 0777));
+
+  // Run test flow
+  std::thread write_thread([&] {
+    std::ofstream in_file_stream(in_file);
+    in_file_stream << "The quick brown fox jumps over the lazy dog" << 
std::endl;
+  });
+
+  plan->runNextProcessor();  // Get
+  plan->runNextProcessor();  // Log
+
+  write_thread.join();
+
+  // Check log output
+  REQUIRE(LogTestController::getInstance().contains("Size:44 Offset:0"));
+}
+

Reply via email to