Repository: nifi-minifi-cpp Updated Branches: refs/heads/master 2707409fd -> e4ec7337d
MINIFICPP-253: prune claim map when a removal occurs MINIFICPP-253: use find for the stream count to avoid arbitrarily inflating the map Signed-off-by: Bin Qiu <[email protected]> This closes #175. Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/commit/e4ec7337 Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/tree/e4ec7337 Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/diff/e4ec7337 Branch: refs/heads/master Commit: e4ec7337d27d1264ab7792ca37b7c81bd3c9a14a Parents: 2707409 Author: Marc Parisi <[email protected]> Authored: Fri Nov 3 14:45:48 2017 -0400 Committer: Bin Qiu <[email protected]> Committed: Fri Nov 3 15:46:39 2017 -0700 ---------------------------------------------------------------------- libminifi/include/core/ContentRepository.h | 8 +++- .../rocksdb-tests/DBContentRepositoryTests.cpp | 49 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e4ec7337/libminifi/include/core/ContentRepository.h ---------------------------------------------------------------------- diff --git a/libminifi/include/core/ContentRepository.h b/libminifi/include/core/ContentRepository.h index 54ec8d3..1e7ac62 100644 --- a/libminifi/include/core/ContentRepository.h +++ b/libminifi/include/core/ContentRepository.h @@ -61,6 +61,7 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> { if (count != count_map_.end()) { if (count_map_[str] == 0) { remove(streamId); + count_map_.erase(str); return true; } } @@ -70,7 +71,12 @@ class ContentRepository : public StreamManager<minifi::ResourceClaim> { virtual uint32_t getStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) { std::lock_guard<std::mutex> lock(count_map_mutex_); - return count_map_[streamId->getContentFullPath()]; + auto cnt = count_map_.find(streamId->getContentFullPath()); + if (cnt != count_map_.end()) { + return cnt->second; + } else { + return 0; + } } virtual void incrementStreamCount(const std::shared_ptr<minifi::ResourceClaim> &streamId) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/e4ec7337/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp ---------------------------------------------------------------------- diff --git a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp index c45a037..0dc2a3d 100644 --- a/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp +++ b/libminifi/test/rocksdb-tests/DBContentRepositoryTests.cpp @@ -243,3 +243,52 @@ TEST_CASE("Delete NonExistent Claim", "[TestDBCR5]") { REQUIRE(readstr == "well hello there"); } + +TEST_CASE("Delete Remove Count Claim", "[TestDBCR6]") { + TestController testController; + char format[] = "/tmp/testRepo.XXXXXX"; + char *dir = testController.createTempDirectory(format); + auto content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + auto configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + auto claim = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto claim2 = std::make_shared<minifi::ResourceClaim>(content_repo, dir); + auto stream = content_repo->write(claim); + + stream->writeUTF("well hello there"); + + stream->closeStream(); + + content_repo->stop(); + + // reclaim the memory + content_repo = nullptr; + + content_repo = std::make_shared<core::repository::DatabaseContentRepository>(); + + configuration = std::make_shared<org::apache::nifi::minifi::Configure>(); + configuration->set(minifi::Configure::nifi_dbcontent_repository_directory_default, dir); + REQUIRE(true == content_repo->initialize(configuration)); + + // increment twice. verify we have 2 for the stream count + // and then test the removal and verify that the claim was removed by virtue of obtaining + // its count. + content_repo->incrementStreamCount(claim2); + content_repo->incrementStreamCount(claim2); + REQUIRE(content_repo->getStreamCount(claim2) == 2); + content_repo->decrementStreamCount(claim2); + content_repo->decrementStreamCount(claim2); + REQUIRE(true == content_repo->removeIfOrphaned(claim2)); + REQUIRE(content_repo->getStreamCount(claim2) == 0); + auto read_stream = content_repo->read(claim); + + std::string readstr; + + // -1 tell us we have an invalid stream + read_stream->readUTF(readstr); + + REQUIRE(readstr == "well hello there"); +}
