szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r453579663
##########
File path: libminifi/include/core/Repository.h
##########
@@ -228,6 +232,8 @@ class Repository : public virtual
core::SerializableComponent, public core::Trac
Repository &operator=(const Repository &parent) = delete;
protected:
+ std::map<std::string, std::shared_ptr<core::Connectable>> containers;
Review comment:
This could use a comment, "containers" is not clear IMO. What can be
containers, and what not? Does it include flow files, connections, processors,
S2S client, etc.? All of these are derived from `Connectable`.
##########
File path: libminifi/test/persistence-tests/PersistenceTests.cpp
##########
@@ -0,0 +1,218 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <chrono>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "FlowFileRepository.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "../../extensions/libarchive/MergeContent.h"
+#include "../test/BufferReader.h"
+
+using Connection = minifi::Connection;
+using MergeContent = minifi::processors::MergeContent;
+
+struct TestFlow{
+ TestFlow(const std::shared_ptr<core::repository::FlowFileRepository>&
ff_repository, const std::shared_ptr<core::ContentRepository>& content_repo,
const std::shared_ptr<core::Repository>& prov_repo)
+ : ff_repository(ff_repository), content_repo(content_repo),
prov_repo(prov_repo) {
+ std::shared_ptr<core::controller::ControllerServiceProvider>
controller_services_provider = nullptr;
+
+ // setup MERGE processor
+ {
+ merge = std::make_shared<MergeContent>("MergeContent", mergeProcUUID());
+ merge->initialize();
+ merge->setAutoTerminatedRelationships({{"original", "d"}});
+
+ merge->setProperty(MergeContent::MergeFormat, MERGE_FORMAT_CONCAT_VALUE);
+ merge->setProperty(MergeContent::MergeStrategy, MERGE_STRATEGY_BIN_PACK);
+ merge->setProperty(MergeContent::DelimiterStrategy,
DELIMITER_STRATEGY_TEXT);
+ merge->setProperty(MergeContent::MinEntries, "3");
+ merge->setProperty(MergeContent::Header, "_Header_");
+ merge->setProperty(MergeContent::Footer, "_Footer_");
+ merge->setProperty(MergeContent::Demarcator, "_Demarcator_");
+ merge->setProperty(MergeContent::MaxBinAge, "1 h");
+
+ std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(merge);
+ mergeContext = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo, ff_repository, content_repo);
+ }
+
+ // setup INPUT processor
+ {
+ inputProcessor = std::make_shared<core::Processor>("source",
inputProcUUID());
+ std::shared_ptr<core::ProcessorNode> node =
std::make_shared<core::ProcessorNode>(inputProcessor);
+ inputContext = std::make_shared<core::ProcessContext>(node,
controller_services_provider, prov_repo,
+ ff_repository,
content_repo);
+ }
+
+ // setup Input Connection
+ {
+ input = std::make_shared<Connection>(ff_repository, content_repo,
"Input", inputConnUUID());
+ input->setRelationship({"input", "d"});
+ input->setDestinationUUID(mergeProcUUID());
+ input->setSourceUUID(inputProcUUID());
+ inputProcessor->addConnection(input);
+ }
+
+ // setup Output Connection
+ {
+ output = std::make_shared<Connection>(ff_repository, content_repo,
"Output", outputConnUUID());
+ output->setRelationship(MergeContent::Merge);
+ output->setSourceUUID(mergeProcUUID());
+ }
+
+ // setup ProcessGroup
+ {
+ root =
std::make_shared<core::ProcessGroup>(core::ProcessGroupType::ROOT_PROCESS_GROUP,
"root");
+ root->addProcessor(merge);
+ root->addConnection(input);
+ root->addConnection(output);
+ }
+
+ // prepare Merge Processor for execution
+ merge->setScheduledState(core::ScheduledState::RUNNING);
+ merge->onSchedule(mergeContext.get(), new
core::ProcessSessionFactory(mergeContext));
+ }
+ void write(const std::string& data) {
+ minifi::io::DataStream stream(reinterpret_cast<const
uint8_t*>(data.c_str()), data.length());
+ core::ProcessSession sessionGenFlowFile(inputContext);
+ std::shared_ptr<core::FlowFile> flow =
std::static_pointer_cast<core::FlowFile>(sessionGenFlowFile.create());
+ sessionGenFlowFile.importFrom(stream, flow);
+ sessionGenFlowFile.transfer(flow, {"input", "d"});
+ sessionGenFlowFile.commit();
Review comment:
I think assertions on the `ResourceClaim` refcount would be useful
documentation here.
##########
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##########
@@ -148,22 +148,27 @@ void FlowFileRepository::prune_stored_flowfiles() {
std::string key = it->key().ToString();
if (eventRead->DeSerialize(reinterpret_cast<const uint8_t
*>(it->value().data()), it->value().size())) {
logger_->log_debug("Found connection for %s, path %s ",
eventRead->getConnectionUuid(), eventRead->getContentFullPath());
- auto search = connectionMap.find(eventRead->getConnectionUuid());
- if (!corrupt_checkpoint && search != connectionMap.end()) {
+ bool found = false;
+ auto search = containers.find(eventRead->getConnectionUuid());
+ found = (search != containers.end());
+ if (!found) {
+ // for backward compatibility
+ search = connectionMap.find(eventRead->getConnectionUuid());
+ found = (search != connectionMap.end());
+ }
Review comment:
We should document if `connectionMap` is now deprecated, for example
near its declaration.
##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
else
logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
- if (claim_) {
- releaseClaim(claim_);
- } else {
+
+ if (!claim_) {
logger_->log_debug("Claim is null ptr for %s", uuidStr_);
}
+ claim_.set(*this, nullptr);
+
// Disown stash claims
- for (const auto &stashPair : stashedContent_) {
- releaseClaim(stashPair.second);
+ for (auto &stashPair : stashedContent_) {
+ auto& stashClaim = stashPair.second;
+ stashClaim.set(*this, nullptr);
}
}
void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
// Decrease the flow file record owned count for the resource claim
- claim_->decreaseFlowFileRecordOwnedCount();
- std::string value;
- logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu",
getUUIDStr(), claim_->getContentFullPath(),
claim_->getFlowFileRecordOwnedCount());
- if (claim_->getFlowFileRecordOwnedCount() <= 0) {
- // we cannot rely on the stored variable here since we aren't guaranteed
atomicity
- if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_,
value)) {
- logger_->log_debug("Delete Resource Claim %s",
claim_->getContentFullPath());
- content_repo_->remove(claim_);
+ claim->decreaseFlowFileRecordOwnedCount();
+ logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu",
getUUIDStr(), claim->getContentFullPath(),
claim->getFlowFileRecordOwnedCount());
Review comment:
Would you mind changing `"%llu"` to `"%" PRIu64`?
`claim->getFlowFileRecordOwnedCount()` returns `uint64_t`.
##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +35,56 @@ namespace minifi {
namespace core {
class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+ class FlowFileOwnedResourceClaimPtr{
+ public:
+ FlowFileOwnedResourceClaimPtr() = default;
+ explicit FlowFileOwnedResourceClaimPtr(const
std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&&
claim) : claim_(std::move(claim)) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) :
claim_(ref.claim_) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) :
claim_(std::move(ref.claim_)) {
+ // taking ownership of claim, no need to increment/decrement
+ }
+ FlowFileOwnedResourceClaimPtr& operator=(const
FlowFileOwnedResourceClaimPtr& ref) = delete;
+ FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&&
ref) = delete;
+
+ FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const
FlowFileOwnedResourceClaimPtr& ref) {
+ return set(owner, ref.claim_);
+ }
+ FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const
std::shared_ptr<ResourceClaim>& newClaim) {
+ auto oldClaim = claim_;
+ claim_ = newClaim;
+ // the order of increase/release is important
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ if (oldClaim) owner.releaseClaim(oldClaim);
Review comment:
Why is it important? Could you add the explanation to the comment?
##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -248,35 +237,27 @@ void ProcessSession::penalize(const
std::shared_ptr<core::FlowFile> &flow) {
void ProcessSession::transfer(const std::shared_ptr<core::FlowFile> &flow,
Relationship relationship) {
logging::LOG_INFO(logger_) << "Transferring " << flow->getUUIDStr() << "
from " << process_context_->getProcessorNode()->getName() << " to relationship
" << relationship.getName();
_transferRelationship[flow->getUUIDStr()] = relationship;
+ flow->setDeleted(false);
Review comment:
Shouldn't we also remove the flow file from `_deletedFlowFiles` if
present? Same with `add`, but I think it's less likely that we want to add a
deleted flow file again.
##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -211,15 +206,6 @@ std::shared_ptr<core::FlowFile>
ProcessSession::clone(const std::shared_ptr<core
void ProcessSession::remove(const std::shared_ptr<core::FlowFile> &flow) {
flow->setDeleted(true);
- if (flow->getResourceClaim() != nullptr) {
- flow->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- logger_->log_debug("Auto terminated %s %" PRIu64 " %s",
flow->getResourceClaim()->getContentFullPath(),
flow->getResourceClaim()->getFlowFileRecordOwnedCount(), flow->getUUIDStr());
- } else {
- logger_->log_debug("Flow does not contain content. no resource claim to
decrement.");
Review comment:
I suggest keeping the debug logs.
##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -540,111 +481,97 @@ void ProcessSession::import(const std::string& source,
std::vector<std::shared_p
std::vector<uint8_t> buffer(getpagesize());
try {
- try {
- std::ifstream input{source, std::ios::in | std::ios::binary};
- logger_->log_debug("Opening %s", source);
- if (!input.is_open() || !input.good()) {
- throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: failed to open file \'",
source, "\'"));
+ std::ifstream input{source, std::ios::in | std::ios::binary};
+ logger_->log_debug("Opening %s", source);
+ if (!input.is_open() || !input.good()) {
+ throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: failed to open file \'",
source, "\'"));
+ }
+ if (offset != 0U) {
+ input.seekg(offset, std::ifstream::beg);
+ if (!input.good()) {
+ logger_->log_error("Seeking to %lu failed for file %s (does
file/filesystem support seeking?)", offset, source);
+ throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ",
std::to_string(offset)));
}
- if (offset != 0U) {
- input.seekg(offset, std::ifstream::beg);
- if (!input.good()) {
- logger_->log_error("Seeking to %lu failed for file %s (does
file/filesystem support seeking?)", offset, source);
- throw Exception(FILE_OPERATION_EXCEPTION,
utils::StringUtils::join_pack("File Import Error: Couldn't seek to offset ",
std::to_string(offset)));
- }
+ }
+ uint64_t startTime = 0U;
+ while (input.good()) {
+ input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
+ std::streamsize read = input.gcount();
+ if (read < 0) {
+ throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount
returned negative value");
}
- uint64_t startTime = 0U;
- while (input.good()) {
- input.read(reinterpret_cast<char*>(buffer.data()), buffer.size());
- std::streamsize read = input.gcount();
- if (read < 0) {
- throw Exception(FILE_OPERATION_EXCEPTION, "std::ifstream::gcount
returned negative value");
- }
- if (read == 0) {
- logger_->log_trace("Finished reading input %s", source);
+ if (read == 0) {
+ logger_->log_trace("Finished reading input %s", source);
+ break;
+ } else {
+ logging::LOG_TRACE(logger_) << "Read input of " << read;
+ }
+ uint8_t* begin = buffer.data();
+ uint8_t* end = begin + read;
+ while (true) {
+ startTime = getTimeMillis();
+ uint8_t* delimiterPos = std::find(begin, end,
static_cast<uint8_t>(inputDelimiter));
+ const auto len = gsl::narrow<int>(delimiterPos - begin);
+
+ logging::LOG_TRACE(logger_) << "Read input of " << read << " length is
" << len << " is at end?" << (delimiterPos == end);
+ /*
+ * We do not want to process the rest of the buffer after the last
delimiter if
+ * - we have reached EOF in the file (we would discard it anyway)
+ * - there is nothing to process (the last character in the buffer is
a delimiter)
+ */
+ if (delimiterPos == end && (input.eof() || len == 0)) {
break;
- } else {
- logging::LOG_TRACE(logger_) << "Read input of " << read;
}
- uint8_t* begin = buffer.data();
- uint8_t* end = begin + read;
- while (true) {
- startTime = getTimeMillis();
- uint8_t* delimiterPos = std::find(begin, end,
static_cast<uint8_t>(inputDelimiter));
- const auto len = gsl::narrow<int>(delimiterPos - begin);
-
- logging::LOG_TRACE(logger_) << "Read input of " << read << " length
is " << len << " is at end?" << (delimiterPos == end);
- /*
- * We do not want to process the rest of the buffer after the last
delimiter if
- * - we have reached EOF in the file (we would discard it anyway)
- * - there is nothing to process (the last character in the buffer
is a delimiter)
- */
- if (delimiterPos == end && (input.eof() || len == 0)) {
- break;
- }
-
- /* Create claim and stream if needed and append data */
- if (claim == nullptr) {
- startTime = getTimeMillis();
- claim =
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
- }
- if (stream == nullptr) {
- stream = process_context_->getContentRepository()->write(claim);
- }
- if (stream == nullptr) {
- logger_->log_error("Stream is null");
- rollback();
- return;
- }
- if (stream->write(begin, len) != len) {
- logger_->log_error("Error while writing");
- stream->closeStream();
- throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error
creating Flowfile");
- }
- /* Create a FlowFile if we reached a delimiter */
- if (delimiterPos == end) {
- break;
- }
- flowFile = std::static_pointer_cast<FlowFileRecord>(create());
- flowFile->setSize(stream->getSize());
- flowFile->setOffset(0);
- if (flowFile->getResourceClaim() != nullptr) {
- /* Remove the old claim */
- flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flowFile->clearResourceClaim();
- }
- flowFile->setResourceClaim(claim);
- claim->increaseFlowFileRecordOwnedCount();
- logging::LOG_DEBUG(logger_) << "Import offset " <<
flowFile->getOffset() << " length " << flowFile->getSize() << " content " <<
flowFile->getResourceClaim()->getContentFullPath()
- << ", FlowFile UUID " <<
flowFile->getUUIDStr();
+ /* Create claim and stream if needed and append data */
+ if (claim == nullptr) {
+ startTime = getTimeMillis();
+ claim =
std::make_shared<ResourceClaim>(process_context_->getContentRepository());
+ }
+ if (stream == nullptr) {
+ stream = process_context_->getContentRepository()->write(claim);
+ }
+ if (stream == nullptr) {
+ logger_->log_error("Stream is null");
+ rollback();
+ return;
+ }
+ if (stream->write(begin, len) != len) {
+ logger_->log_error("Error while writing");
stream->closeStream();
- std::string details =
process_context_->getProcessorNode()->getName() + " modify flow record content
" + flowFile->getUUIDStr();
- uint64_t endTime = getTimeMillis();
- provenance_report_->modifyContent(flowFile, details, endTime -
startTime);
- flows.push_back(flowFile);
-
- /* Reset these to start processing the next FlowFile with a clean
slate */
- flowFile.reset();
- stream.reset();
- claim.reset();
-
- /* Skip delimiter */
- begin = delimiterPos + 1;
+ throw Exception(FILE_OPERATION_EXCEPTION, "File Export Error
creating Flowfile");
+ }
+
+ /* Create a FlowFile if we reached a delimiter */
+ if (delimiterPos == end) {
+ break;
}
+ flowFile = std::static_pointer_cast<FlowFileRecord>(create());
+ flowFile->setSize(stream->getSize());
+ flowFile->setOffset(0);
+ flowFile->setResourceClaim(claim);
+ logging::LOG_DEBUG(logger_) << "Import offset " <<
flowFile->getOffset() << " length " << flowFile->getSize() << " content " <<
flowFile->getResourceClaim()->getContentFullPath()
+ << ", FlowFile UUID " <<
flowFile->getUUIDStr();
+ stream->closeStream();
+ std::string details = process_context_->getProcessorNode()->getName()
+ " modify flow record content " + flowFile->getUUIDStr();
+ uint64_t endTime = getTimeMillis();
+ provenance_report_->modifyContent(flowFile, details, endTime -
startTime);
+ flows.push_back(flowFile);
+
+ /* Reset these to start processing the next FlowFile with a clean
slate */
+ flowFile.reset();
+ stream.reset();
+ claim.reset();
+
+ /* Skip delimiter */
+ begin = delimiterPos + 1;
}
- } catch (std::exception &exception) {
- logger_->log_debug("Caught Exception %s", exception.what());
- throw;
- } catch (...) {
- logger_->log_debug("Caught Exception during process session write");
- throw;
}
+ } catch (std::exception &exception) {
+ logger_->log_debug("Caught Exception %s", exception.what());
+ throw;
} catch (...) {
- if (flowFile != nullptr && claim != nullptr &&
flowFile->getResourceClaim() == claim) {
- flowFile->getResourceClaim()->decreaseFlowFileRecordOwnedCount();
- flowFile->clearResourceClaim();
- }
Review comment:
I think we still need to clean up the resource claim
(`clearResourceClaim()`) of the flow file before proceeding, because the flow
file destructor will not do so. Is this done somewhere else?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]