szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r451670403
##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -353,6 +353,10 @@ void ProcessGroup::getConnections(std::map<std::string,
std::shared_ptr<Connecta
connectionMap[connection->getUUIDStr()] = connection;
connectionMap[connection->getName()] = connection;
}
+ for (auto processor : processors_) {
+ // processors can also own FlowFiles
+ connectionMap[processor->getUUIDStr()] = processor;
Review comment:
I think it's confusing to place processors in things like
`connectionMap` (i.e. treat them like connections) just because they can own
flow files.
##########
File path: libminifi/src/Connection.cpp
##########
@@ -268,10 +229,12 @@ void Connection::drain(bool delete_permanently) {
while (!queue_.empty()) {
std::shared_ptr<core::FlowFile> item = queue_.front();
queue_.pop();
- logger_->log_debug("Delete flow file UUID %s from connection %s",
item->getUUIDStr(), name_);
+ logger_->log_debug("Delete flow file UUID %s from connection %s, because
it expired", item->getUUIDStr(), name_);
if (delete_permanently) {
- if (flow_repository_->Delete(item->getUUIDStr())) {
+ if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
item->setStoredToRepository(false);
+ auto claim = item->getResourceClaim();
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
Review comment:
Shouldn't this call `item->releaseClaim(claim)` instead?
##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
namespace core {
class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+ class FlowFileOwnedResourceClaimPtr{
+ public:
+ FlowFileOwnedResourceClaimPtr() = default;
+ explicit FlowFileOwnedResourceClaimPtr(const
std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&&
claim) : claim_(std::move(claim)) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) :
claim_(ref.claim_) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
Review comment:
I suggest making `claim_` `gsl::not_null` unless there is a reason why
it can not be done.
##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
namespace core {
class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+ class FlowFileOwnedResourceClaimPtr{
+ public:
+ FlowFileOwnedResourceClaimPtr() = default;
+ explicit FlowFileOwnedResourceClaimPtr(const
std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&&
claim) : claim_(std::move(claim)) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) :
claim_(ref.claim_) {
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ }
+ FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) :
claim_(std::move(ref.claim_)) {
+ // taking ownership of claim, no need to increment/decrement
+ }
+ FlowFileOwnedResourceClaimPtr& operator=(const
FlowFileOwnedResourceClaimPtr& ref) = delete;
+ FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&&
ref) = delete;
+
+ FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const
FlowFileOwnedResourceClaimPtr& ref) {
+ return set(owner, ref.claim_);
+ }
+ FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const
std::shared_ptr<ResourceClaim>& newClaim) {
+ auto oldClaim = claim_;
+ claim_ = newClaim;
+ // the order of increase/release is important
+ if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+ if (oldClaim) owner.releaseClaim(oldClaim);
+ return *this;
+ }
+ const std::shared_ptr<ResourceClaim>& get() const {
+ return claim_;
+ }
+ const std::shared_ptr<ResourceClaim>& operator->() const {
+ return claim_;
+ }
+ operator bool() const noexcept {
+ return static_cast<bool>(claim_);
+ }
+ ~FlowFileOwnedResourceClaimPtr() {
+ // allow the owner FlowFile to manually release the claim
+ // while logging stuff and removing it from repositories
+ assert(!claim_);
Review comment:
I think it would be cleaner to let this object store a not_null observer
pointer to its owning flow file and let it call owner->releaseClaim whenever
needed.
##########
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##########
@@ -156,14 +156,12 @@ void FlowFileRepository::prune_stored_flowfiles() {
search->second->put(eventRead);
} else {
logger_->log_warn("Could not find connection for %s, path %s ",
eventRead->getConnectionUuid(), eventRead->getContentFullPath());
- if (eventRead->getContentFullPath().length() > 0) {
- if (nullptr != eventRead->getResourceClaim()) {
- content_repo_->remove(eventRead->getResourceClaim());
- }
- }
+ auto claim = eventRead->getResourceClaim();
+ if (claim) claim->decreaseFlowFileRecordOwnedCount();
Review comment:
Isn't this going to leave content repo entries behind?
##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
else
logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
- if (claim_) {
- releaseClaim(claim_);
- } else {
+
+ if (!claim_) {
logger_->log_debug("Claim is null ptr for %s", uuidStr_);
}
+ claim_.set(*this, nullptr);
+
// Disown stash claims
- for (const auto &stashPair : stashedContent_) {
- releaseClaim(stashPair.second);
+ for (auto &stashPair : stashedContent_) {
+ auto& stashClaim = stashPair.second;
+ stashClaim.set(*this, nullptr);
}
}
void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
// Decrease the flow file record owned count for the resource claim
- claim_->decreaseFlowFileRecordOwnedCount();
- std::string value;
- logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu",
getUUIDStr(), claim_->getContentFullPath(),
claim_->getFlowFileRecordOwnedCount());
- if (claim_->getFlowFileRecordOwnedCount() <= 0) {
- // we cannot rely on the stored variable here since we aren't guaranteed
atomicity
- if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_,
value)) {
- logger_->log_debug("Delete Resource Claim %s",
claim_->getContentFullPath());
- content_repo_->remove(claim_);
+ claim->decreaseFlowFileRecordOwnedCount();
+ logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu",
getUUIDStr(), claim->getContentFullPath(),
claim->getFlowFileRecordOwnedCount());
+ if (content_repo_) {
+ if (content_repo_->removeIfOrphaned(claim)) {
Review comment:
Could use `operator&&`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]