szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r456470888
##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source,
std::vector<std::shared_p
std::vector<uint8_t> buffer(getpagesize());
try {
- try {
- std::ifstream input{source, std::ios::in | std::ios::binary};
- logger_->log_debug("Opening %s", source);
- if (!input.is_open() || !input.good()) {
- throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: failed to open file \'",
source, "\'"));
+ std::ifstream input{source, std::ios::in | std::ios::binary};
+ logger_->log_debug("Opening %s", source);
+ if (!input.is_open() || !input.good()) {
+ throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: failed to open file \'",
source, "\'"));
+ }
+ if (offset != 0U) {
+ input.seekg(offset, std::ifstream::beg);
+ if (!input.good()) {
+ logger_->log_error("Seeking to %lu failed for file %s (does
file/filesystem support seeking?)", offset, source);
+ throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ",
std::to_string(offset)));
}
- if (offset != 0U) {
- input.seekg(offset, std::ifstream::beg);
- if (!input.good()) {
- logger_->log_error("Seeking to %lu failed for file %s (does
file/filesystem support seeking?)", offset, source);
- throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ",
std::to_string(offset)));
- }
+ }
+ uint64_t startTime = 0U;
+ while (input.good()) {
+ input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+ std::streamsize read = input.gcount();
+ if (read < 0) {
+ throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount
returned negative value");
}
- uint64_t startTime = 0U;
- while (input.good()) {
- input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
- std::streamsize read = input.gcount();
- if (read < 0) {
- throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount
returned negative value");
- }
- if (read == 0) {
- logger_->log_trace("Finished reading input %s", source);
+ if (read == 0) {
+ logger_->log_trace("Finished reading input %s", source);
+ break;
+ } else {
+ logging::LOG_TRACE(logger_) << "Read input of " << read;
+ }
+ uint8_t* begin = buffer.data();
+ uint8_t* end = begin + read;
+ while (true) {
+ startTime = getTimeMillis();
+ uint8_t* delimiterPos = std::find(begin, end,
static_cast<uint8_t>(inputDelimiter));
+ const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+ logging::LOG_TRACE(logger_) << "Read input of " << read << " length is
" << len << " is at end?" << (delimiterPos == end);
+ /*
+ * We do not want to process the rest of the buffer after the last
delimiter if
+ * - we have reached EOF in the file (we would discard it anyway)
+ * - there is nothing to process (the last character in the buffer is
a delimiter)
+ */
+ if (delimiterPos == end && (input.eof() || len == 0)) {
break;
- } else {
- logging::LOG_TRACE(logger_) << "Read input of " << read;
}
- uint8_t* begin = buffer.data();
- uint8_t* end = begin + read;
- while (true) {
- startTime = getTimeMillis();
- uint8_t* delimiterPos = std::find(begin, end,
static_cast<uint8_t>(inputDelimiter));
- const auto len = gsl::narrow<int>(delimiterPos - begin);
-
- logging::LOG_TRACE(logger_) << "Read input of " << read << " length
is " << len << " is at end?" << (delimiterPos == end);
- /*
- * We do not want to process the rest of the buffer after the last
delimiter if
- * - we have reached EOF in the file (we would discard it anyway)
- * - there is nothing to process (the last character in the buffer
is a delimiter)
- */
- if (delimiterPos == end && (input.eof() || len == 0)) {
- break;
- }
-
- /* Create claim and stream if needed and append data */
- if (claim == nullptr) {
- startTime = getTimeMillis();
- claim =
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
- }
- if (stream == nullptr) {
- stream = process_context_->getContentRepository()->write(claim);
- }
- if (stream == nullptr) {
- logger_->log_error("Stream is null");
- rollback();
- return;
- }
- if (stream->write(begin, len) != len) {
- logger_->log_error("Error while writing");
- stream->closeStream();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error
creating Flowfile");
- }
- /* Create a FlowFile if we reached a delimiter */
- if (delimiterPos == end) {
- break;
- }
- flowFile = std::static_pointer_cast<FlowFileRecord>(create());
- flowFile->setSize(stream->getSize());
- flowFile->setOffset(0);
- if (flowFile->getResourceClaim() != nullptr) {
- /* Remove the old claim */
- flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flowFile->clearResourceClaim();
- }
- flowFile->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
- logging::LOG_DEBUG(logger_) << "Import offset " <<
flowFile->getOffset() << " length " << flowFile->getSize() << " content " <<
flowFile->getResourceClaim()->getContentFullPath()
- << ", FlowFile UUID " <<
flowFile->getUUIDStr();
+ /* Create claim and stream if needed and append data */
+ if (claim == nullptr) {
+ startTime = getTimeMillis();
+ claim =
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+ }
+ if (stream == nullptr) {
+ stream = process_context_->getContentRepository()->write(claim);
+ }
+ if (stream == nullptr) {
+ logger_->log_error("Stream is null");
+ rollback();
+ return;
+ }
+ if (stream->write(begin, len) != len) {
+ logger_->log_error("Error while writing");
stream->closeStream();
- std::string details =
process_context_->getProcessorNode()->getName() + " modify flow record content
" + flowFile->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flowFile, details, endTime -
startTime);
- flows.push_back(flowFile);
-
- /* Reset these to start processing the next FlowFile with a clean
slate */
- flowFile.reset();
- stream.reset();
- claim.reset();
-
- /* Skip delimiter */
- begin = delimiterPos + 1;
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error
creating Flowfile");
+ }
+
+ /* Create a FlowFile if we reached a delimiter */
+ if (delimiterPos == end) {
+ break;
}
+ flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+ flowFile->setSize(stream->getSize());
+ flowFile->setOffset(0);
+ flowFile->setResourceClaim(claim);
+ logging::LOG_DEBUG(logger_) << "Import offset " <<
flowFile->getOffset() << " length " << flowFile->getSize() << " content " <<
flowFile->getResourceClaim()->getContentFullPath()
+ << ", FlowFile UUID " <<
flowFile->getUUIDStr();
+ stream->closeStream();
+ std::string details = process_context_->getProcessorNode()->getName()
+ " modify flow record content " + flowFile->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flowFile, details, endTime -
startTime);
+ flows.push_back(flowFile);
+
+ /* Reset these to start processing the next FlowFile with a clean
slate */
+ flowFile.reset();
+ stream.reset();
+ claim.reset();
+
+ /* Skip delimiter */
+ begin = delimiterPos + 1;
}
- } catch (std::exception &exception) {
- logger_->log_debug("Caught Exception %s", exception.what());
- throw;
- } catch (...) {
- logger_->log_debug("Caught Exception during process session write");
- throw;
}
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
} catch (...) {
- if (flowFile != nullptr && claim != nullptr &&
flowFile->getResourceClaim() == claim) {
- flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flowFile->clearResourceClaim();
- }
Review comment:
good point
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]