adamdebreceni commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r454836324



##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source, 
std::vector<std::shared_p
 
   std::vector<uint8_t> buffer(getpagesize());
   try {
-    try {
-      std::ifstream input{source, std::ios::in | std::ios::binary};
-      logger_->log_debug("Opening %s", source);
-      if (!input.is_open() || !input.good()) {
-        throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+    std::ifstream input{source, std::ios::in | std::ios::binary};
+    logger_->log_debug("Opening %s", source);
+    if (!input.is_open() || !input.good()) {
+      throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: failed to open file \'", 
source, "\'"));
+    }
+    if (offset != 0U) {
+      input.seekg(offset, std::ifstream::beg);
+      if (!input.good()) {
+        logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
+        throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
       }
-      if (offset != 0U) {
-        input.seekg(offset, std::ifstream::beg);
-        if (!input.good()) {
-          logger_->log_error("Seeking to %lu failed for file %s (does 
file/filesystem support seeking?)", offset, source);
-          throw Exception(FILE_OPERATION_EXCEPTION, 
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ", 
std::to_string(offset)));
-        }
+    }
+    uint64_t startTime = 0U;
+    while (input.good()) {
+      input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+      std::streamsize read = input.gcount();
+      if (read < 0) {
+        throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
       }
-      uint64_t startTime = 0U;
-      while (input.good()) {
-        input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
-        std::streamsize read = input.gcount();
-        if (read < 0) {
-          throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount 
returned negative value");
-        }
-        if (read == 0) {
-          logger_->log_trace("Finished reading input %s", source);
+      if (read == 0) {
+        logger_->log_trace("Finished reading input %s", source);
+        break;
+      } else {
+        logging::LOG_TRACE(logger_) << "Read input of " << read;
+      }
+      uint8_t* begin = buffer.data();
+      uint8_t* end = begin + read;
+      while (true) {
+        startTime = getTimeMillis();
+        uint8_t* delimiterPos = std::find(begin, end, 
static_cast<uint8_t>(inputDelimiter));
+        const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+        logging::LOG_TRACE(logger_) << "Read input of " << read << " length is 
" << len << " is at end?" << (delimiterPos == end);
+        /*
+         * We do not want to process the rest of the buffer after the last 
delimiter if
+         *  - we have reached EOF in the file (we would discard it anyway)
+         *  - there is nothing to process (the last character in the buffer is 
a delimiter)
+         */
+        if (delimiterPos == end && (input.eof() || len == 0)) {
           break;
-        } else {
-          logging::LOG_TRACE(logger_) << "Read input of " << read;
         }
-        uint8_t* begin = buffer.data();
-        uint8_t* end = begin + read;
-        while (true) {
-          startTime = getTimeMillis();
-          uint8_t* delimiterPos = std::find(begin, end, 
static_cast<uint8_t>(inputDelimiter));
-          const auto len = gsl::narrow<int>(delimiterPos - begin);
-
-          logging::LOG_TRACE(logger_) << "Read input of " << read << " length 
is " << len << " is at end?" << (delimiterPos == end);
-          /*
-           * We do not want to process the rest of the buffer after the last 
delimiter if
-           *  - we have reached EOF in the file (we would discard it anyway)
-           *  - there is nothing to process (the last character in the buffer 
is a delimiter)
-           */
-          if (delimiterPos == end && (input.eof() || len == 0)) {
-            break;
-          }
-
-          /* Create claim and stream if needed and append data */
-          if (claim == nullptr) {
-            startTime = getTimeMillis();
-            claim = 
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
-          }
-          if (stream == nullptr) {
-            stream = process_context_->getContentRepository()->write(claim);
-          }
-          if (stream == nullptr) {
-            logger_->log_error("Stream is null");
-            rollback();
-            return;
-          }
-          if (stream->write(begin, len) != len) {
-            logger_->log_error("Error while writing");
-            stream->closeStream();
-            throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error 
creating Flowfile");
-          }
 
-          /* Create a FlowFile if we reached a delimiter */
-          if (delimiterPos == end) {
-            break;
-          }
-          flowFile = std::static_pointer_cast<FlowFileRecord>(create());
-          flowFile->setSize(stream->getSize());
-          flowFile->setOffset(0);
-          if (flowFile->getResourceClaim() != nullptr) {
-            /* Remove the old claim */
-            flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-            flowFile->clearResourceClaim();
-          }
-          flowFile->setResourceClaim(claim);
-          claim->increaseFlowFileRecordOwnedCount();
-          logging::LOG_DEBUG(logger_) << "Import offset " << 
flowFile->getOffset() << " length " << flowFile->getSize() << " content " << 
flowFile->getResourceClaim()->getContentFullPath()
-                                      << ", FlowFile UUID " << 
flowFile->getUUIDStr();
+        /* Create claim and stream if needed and append data */
+        if (claim == nullptr) {
+          startTime = getTimeMillis();
+          claim = 
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+        }
+        if (stream == nullptr) {
+          stream = process_context_->getContentRepository()->write(claim);
+        }
+        if (stream == nullptr) {
+          logger_->log_error("Stream is null");
+          rollback();
+          return;
+        }
+        if (stream->write(begin, len) != len) {
+          logger_->log_error("Error while writing");
           stream->closeStream();
-          std::string details = 
process_context_->getProcessorNode()->getName() + " modify flow record content 
" + flowFile->getUUIDStr();
-          uint64_t endTime = getTimeMillis();
-          provenance_report_->modifyContent(flowFile, details, endTime - 
startTime);
-          flows.push_back(flowFile);
-
-          /* Reset these to start processing the next FlowFile with a clean 
slate */
-          flowFile.reset();
-          stream.reset();
-          claim.reset();
-
-          /* Skip delimiter */
-          begin = delimiterPos + 1;
+          throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error 
creating Flowfile");
+        }
+
+        /* Create a FlowFile if we reached a delimiter */
+        if (delimiterPos == end) {
+          break;
         }
+        flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+        flowFile->setSize(stream->getSize());
+        flowFile->setOffset(0);
+        flowFile->setResourceClaim(claim);
+        logging::LOG_DEBUG(logger_) << "Import offset " << 
flowFile->getOffset() << " length " << flowFile->getSize() << " content " << 
flowFile->getResourceClaim()->getContentFullPath()
+                                    << ", FlowFile UUID " << 
flowFile->getUUIDStr();
+        stream->closeStream();
+        std::string details = process_context_->getProcessorNode()->getName() 
+ " modify flow record content " + flowFile->getUUIDStr();
+        uint64_t endTime = getTimeMillis();
+        provenance_report_->modifyContent(flowFile, details, endTime - 
startTime);
+        flows.push_back(flowFile);
+
+        /* Reset these to start processing the next FlowFile with a clean 
slate */
+        flowFile.reset();
+        stream.reset();
+        claim.reset();
+
+        /* Skip delimiter */
+        begin = delimiterPos + 1;
       }
-    } catch (std::exception &exception) {
-      logger_->log_debug("Caught Exception %s", exception.what());
-      throw;
-    } catch (...) {
-      logger_->log_debug("Caught Exception during process session write");
-      throw;
     }
+  } catch (std::exception &exception) {
+    logger_->log_debug("Caught Exception %s", exception.what());
+    throw;
   } catch (...) {
-    if (flowFile != nullptr && claim != nullptr && 
flowFile->getResourceClaim() == claim) {
-      flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
-      flowFile->clearResourceClaim();
-    }

Review comment:
       the whole point of this story is that once the FlowFile assumes 
ownership over a ResourceClaim it should clean up after itself, currently this 
cleanup is in the `FlowFileRecord::~FlowFileRecord` which is arguably not the 
best place but only a `FlowFileRecord` can notify the content repository




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to