szaszm commented on a change in pull request #807:
URL: https://github.com/apache/nifi-minifi-cpp/pull/807#discussion_r451670403



##########
File path: libminifi/src/core/ProcessGroup.cpp
##########
@@ -353,6 +353,10 @@ void ProcessGroup::getConnections(std::map<std::string, 
std::shared_ptr<Connecta
     connectionMap[connection->getUUIDStr()] = connection;
     connectionMap[connection->getName()] = connection;
   }
+  for (auto processor : processors_) {
+    // processors can also own FlowFiles
+    connectionMap[processor->getUUIDStr()] = processor;

Review comment:
       I think it's confusing to place processors in things like 
`connectionMap` (i.e. treat them like connections) just because they can own 
flow files.

##########
File path: libminifi/src/Connection.cpp
##########
@@ -268,10 +229,12 @@ void Connection::drain(bool delete_permanently) {
   while (!queue_.empty()) {
     std::shared_ptr<core::FlowFile> item = queue_.front();
     queue_.pop();
-    logger_->log_debug("Delete flow file UUID %s from connection %s", 
item->getUUIDStr(), name_);
+    logger_->log_debug("Delete flow file UUID %s from connection %s, because 
it expired", item->getUUIDStr(), name_);
     if (delete_permanently) {
-      if (flow_repository_->Delete(item->getUUIDStr())) {
+      if (item->isStored() && flow_repository_->Delete(item->getUUIDStr())) {
         item->setStoredToRepository(false);
+        auto claim = item->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();

Review comment:
       Shouldn't this call `item->releaseClaim(claim)` instead?

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const 
std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& 
claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : 
claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }

Review comment:
       I suggest making `claim_` `gsl::not_null` unless there is a reason why 
it can not be done.

##########
File path: libminifi/include/core/FlowFile.h
##########
@@ -35,9 +36,58 @@ namespace minifi {
 namespace core {
 
 class FlowFile : public core::Connectable, public ReferenceContainer {
+ private:
+  class FlowFileOwnedResourceClaimPtr{
+   public:
+    FlowFileOwnedResourceClaimPtr() = default;
+    explicit FlowFileOwnedResourceClaimPtr(const 
std::shared_ptr<ResourceClaim>& claim) : claim_(claim) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    explicit FlowFileOwnedResourceClaimPtr(std::shared_ptr<ResourceClaim>&& 
claim) : claim_(std::move(claim)) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(const FlowFileOwnedResourceClaimPtr& ref) : 
claim_(ref.claim_) {
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+    }
+    FlowFileOwnedResourceClaimPtr(FlowFileOwnedResourceClaimPtr&& ref) : 
claim_(std::move(ref.claim_)) {
+      // taking ownership of claim, no need to increment/decrement
+    }
+    FlowFileOwnedResourceClaimPtr& operator=(const 
FlowFileOwnedResourceClaimPtr& ref) = delete;
+    FlowFileOwnedResourceClaimPtr& operator=(FlowFileOwnedResourceClaimPtr&& 
ref) = delete;
+
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const 
FlowFileOwnedResourceClaimPtr& ref) {
+      return set(owner, ref.claim_);
+    }
+    FlowFileOwnedResourceClaimPtr& set(FlowFile& owner, const 
std::shared_ptr<ResourceClaim>& newClaim) {
+      auto oldClaim = claim_;
+      claim_ = newClaim;
+      // the order of increase/release is important
+      if (claim_) claim_->increaseFlowFileRecordOwnedCount();
+      if (oldClaim) owner.releaseClaim(oldClaim);
+      return *this;
+    }
+    const std::shared_ptr<ResourceClaim>& get() const {
+      return claim_;
+    }
+    const std::shared_ptr<ResourceClaim>& operator->() const {
+      return claim_;
+    }
+    operator bool() const noexcept {
+      return static_cast<bool>(claim_);
+    }
+    ~FlowFileOwnedResourceClaimPtr() {
+      // allow the owner FlowFile to manually release the claim
+      // while logging stuff and removing it from repositories
+      assert(!claim_);

Review comment:
       I think it would be cleaner to let this object store a not_null observer 
pointer to its owning flow file and let it call owner->releaseClaim whenever 
needed.

##########
File path: extensions/rocksdb-repos/FlowFileRepository.cpp
##########
@@ -156,14 +156,12 @@ void FlowFileRepository::prune_stored_flowfiles() {
         search->second->put(eventRead);
       } else {
         logger_->log_warn("Could not find connection for %s, path %s ", 
eventRead->getConnectionUuid(), eventRead->getContentFullPath());
-        if (eventRead->getContentFullPath().length() > 0) {
-          if (nullptr != eventRead->getResourceClaim()) {
-            content_repo_->remove(eventRead->getResourceClaim());
-          }
-        }
+        auto claim = eventRead->getResourceClaim();
+        if (claim) claim->decreaseFlowFileRecordOwnedCount();

Review comment:
       Isn't this going to leave content repo entries behind?

##########
File path: libminifi/src/FlowFileRecord.cpp
##########
@@ -118,28 +110,27 @@ FlowFileRecord::~FlowFileRecord() {
     logger_->log_debug("Delete FlowFile UUID %s", uuidStr_);
   else
     logger_->log_debug("Delete SnapShot FlowFile UUID %s", uuidStr_);
-  if (claim_) {
-    releaseClaim(claim_);
-  } else {
+
+  if (!claim_) {
     logger_->log_debug("Claim is null ptr for %s", uuidStr_);
   }
 
+  claim_.set(*this, nullptr);
+
   // Disown stash claims
-  for (const auto &stashPair : stashedContent_) {
-    releaseClaim(stashPair.second);
+  for (auto &stashPair : stashedContent_) {
+    auto& stashClaim = stashPair.second;
+    stashClaim.set(*this, nullptr);
   }
 }
 
 void FlowFileRecord::releaseClaim(std::shared_ptr<ResourceClaim> claim) {
   // Decrease the flow file record owned count for the resource claim
-  claim_->decreaseFlowFileRecordOwnedCount();
-  std::string value;
-  logger_->log_debug("Delete Resource Claim %s, %s, attempt %llu", 
getUUIDStr(), claim_->getContentFullPath(), 
claim_->getFlowFileRecordOwnedCount());
-  if (claim_->getFlowFileRecordOwnedCount() <= 0) {
-    // we cannot rely on the stored variable here since we aren't guaranteed 
atomicity
-    if (flow_repository_ != nullptr && !flow_repository_->Get(uuidStr_, 
value)) {
-      logger_->log_debug("Delete Resource Claim %s", 
claim_->getContentFullPath());
-      content_repo_->remove(claim_);
+  claim->decreaseFlowFileRecordOwnedCount();
+  logger_->log_debug("Detaching Resource Claim %s, %s, attempt %llu", 
getUUIDStr(), claim->getContentFullPath(), 
claim->getFlowFileRecordOwnedCount());
+  if (content_repo_) {
+    if (content_repo_->removeIfOrphaned(claim)) {

Review comment:
       Could use `operator&&`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to