This is an automated email from the ASF dual-hosted git repository. mmartell pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/develop by this push: new c5aee1b GEODE-9804: new register interest tests (#894) c5aee1b is described below commit c5aee1b68a0cfeda2480afb4b27bd2b12fae0d8f Author: Michael Martell <mmart...@pivotal.io> AuthorDate: Fri Nov 19 17:30:22 2021 -0800 GEODE-9804: new register interest tests (#894) * Fixes incorrect fetching of initial entries. * Makes InterestResultPolicy a proper enum. * Adds integration benchmark for register interest. * Fixes for MSVC errors. * Don't ask for keys if not caching locally. * Replace sleep with CacheListener events. * Use latches for syncrhonization with listener. * Change boost loglevel to warning. * Encapsulate synchronization in CountdownCacheListener. Co-authored-by: Jacob Barrett <jbarr...@pivotal.io> --- .../testThinClientIntResPolKeysInv.cpp | 1 - cppcache/integration/benchmark/CMakeLists.txt | 4 +- cppcache/integration/benchmark/RegionBM.cpp | 15 +- .../integration/benchmark/RegisterInterestBM.cpp | 156 ++++++++++ cppcache/integration/test/RegisterKeysTest.cpp | 258 ++++++++++++++++ cppcache/src/InterestResultPolicy.cpp | 31 -- cppcache/src/InterestResultPolicy.hpp | 39 +-- cppcache/src/LocalRegion.cpp | 3 +- cppcache/src/LocalRegion.hpp | 4 +- cppcache/src/TcrChunkedContext.hpp | 1 - cppcache/src/TcrMessage.cpp | 129 ++++---- cppcache/src/TcrMessage.hpp | 29 +- cppcache/src/ThinClientRegion.cpp | 342 ++++++++------------- cppcache/test/CMakeLists.txt | 1 - cppcache/test/InterestResultPolicyTest.cpp | 34 -- cppcache/test/TcrMessageTest.cpp | 123 ++++++-- 16 files changed, 754 insertions(+), 416 deletions(-) diff --git a/cppcache/integration-test/testThinClientIntResPolKeysInv.cpp b/cppcache/integration-test/testThinClientIntResPolKeysInv.cpp index d918856..210b6a3 100644 --- a/cppcache/integration-test/testThinClientIntResPolKeysInv.cpp +++ b/cppcache/integration-test/testThinClientIntResPolKeysInv.cpp @@ -360,7 +360,6 @@ END_TASK_DEFINITION DUNIT_TASK_DEFINITION(CLIENT2, StepEight) { auto regPtr1 = getHelper()->getRegion(regionNames[1]); - // regPtr1->registerRegex(regexWildcard); regPtr1->registerAllKeys(); verifyInvalid(regionNames[1], keys[0]); diff --git a/cppcache/integration/benchmark/CMakeLists.txt b/cppcache/integration/benchmark/CMakeLists.txt index 88254d2..0702ae4 100644 --- a/cppcache/integration/benchmark/CMakeLists.txt +++ b/cppcache/integration/benchmark/CMakeLists.txt @@ -16,7 +16,9 @@ add_executable(cpp-integration-benchmark main.cpp RegionBM.cpp - PdxTypeBM.cpp) + PdxTypeBM.cpp + RegisterInterestBM.cpp +) target_link_libraries(cpp-integration-benchmark PUBLIC diff --git a/cppcache/integration/benchmark/RegionBM.cpp b/cppcache/integration/benchmark/RegionBM.cpp index 7fef624..da1b910 100644 --- a/cppcache/integration/benchmark/RegionBM.cpp +++ b/cppcache/integration/benchmark/RegionBM.cpp @@ -19,21 +19,16 @@ #include <framework/Cluster.h> #include <framework/Gfsh.h> -// Disable warning for "extra qualifications" here. One of the boost log -// headers triggers this warning. Note: use of disable pragma here is -// intentional - attempts to use push/pop as you ordinarily should just -// yielded a gripe from the MS tools that "warning number '4596' is not a -// valid compiler warning". re-enabling the warning after the include -// fails in the same way, so just leave it disabled for the rest of the -// file. This is safe, since the warning can only trigger inside a class -// declaration, of which there are none in this file. -#ifdef WIN32 +#ifdef _MSC_VER +#pragma warning(push) #pragma warning(disable : 4596) #endif - #include <boost/log/core.hpp> #include <boost/log/expressions.hpp> #include <boost/log/trivial.hpp> +#ifdef _MSC_VER +#pragma warning(pop) +#endif #include <geode/Cache.hpp> #include <geode/CacheableString.hpp> diff --git a/cppcache/integration/benchmark/RegisterInterestBM.cpp b/cppcache/integration/benchmark/RegisterInterestBM.cpp new file mode 100644 index 0000000..7f6154b --- /dev/null +++ b/cppcache/integration/benchmark/RegisterInterestBM.cpp @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <benchmark/benchmark.h> +#include <framework/Cluster.h> +#include <framework/Gfsh.h> + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4596) +#endif +#include <boost/log/core.hpp> +#include <boost/log/expressions.hpp> +#include <boost/log/trivial.hpp> +#ifdef _MSC_VER +#pragma warning(pop) +#endif + +#include <geode/Cache.hpp> +#include <geode/CacheableString.hpp> +#include <geode/PoolManager.hpp> +#include <geode/RegionFactory.hpp> +#include <geode/RegionShortcut.hpp> + +namespace { + +using apache::geode::client::Cache; +using apache::geode::client::CacheableString; +using apache::geode::client::HashMapOfCacheable; +using apache::geode::client::Region; +using apache::geode::client::RegionShortcut; + +class RegisterInterestBM : public benchmark::Fixture { + public: + RegisterInterestBM() { + boost::log::core::get()->set_filter(boost::log::trivial::severity >= + boost::log::trivial::warning); + + BOOST_LOG_TRIVIAL(info) << "constructed"; + } + + ~RegisterInterestBM() noexcept override { + BOOST_LOG_TRIVIAL(info) << "destructed"; + } + + using benchmark::Fixture::SetUp; + void SetUp(benchmark::State& state) override { + BOOST_LOG_TRIVIAL(info) << "starting cluster"; + cluster = std::unique_ptr<Cluster>( + new Cluster(::Name{name_}, LocatorCount{1}, ServerCount{4})); + cluster->start(); + cluster->getGfsh() + .create() + .region() + .withName("region") + .withType("PARTITION") + .execute(); + + cache = std::unique_ptr<Cache>(new Cache(cluster->createCache( + {{"log-level", "finer"}}, Cluster::SubscriptionState::Enabled))); + region = cache->createRegionFactory(RegionShortcut::PROXY) + .setPoolName("default") + .setCachingEnabled(true) + .create("region"); + + BOOST_LOG_TRIVIAL(info) + << "filling region with " << state.range(0) << " keys"; + HashMapOfCacheable map; + const auto batchSize = 10000; + map.reserve(batchSize); + for (auto i = 0; i < state.range(0); ++i) { + map.emplace( + std::make_shared<CacheableString>("key" + std::to_string(i)), + std::make_shared<CacheableString>("value" + std::to_string(i))); + if (0 == i % batchSize) { + region->putAll(map); + map.clear(); + } + } + if (!map.empty()) { + region->putAll(map); + map.clear(); + } + BOOST_LOG_TRIVIAL(info) << "region ready"; + } + + using benchmark::Fixture::TearDown; + void TearDown(benchmark::State&) override { + BOOST_LOG_TRIVIAL(info) << "stopping cluster"; + region = nullptr; + cache = nullptr; + cluster = nullptr; + } + + protected: + void SetName(const char* name) { + name_ = name; + + Benchmark::SetName(name); + } + + void unregisterInterestAllKeys(benchmark::State& state) { + state.PauseTiming(); + region->unregisterAllKeys(); + state.ResumeTiming(); + } + + std::unique_ptr<Cluster> cluster; + std::unique_ptr<Cache> cache; + std::shared_ptr<Region> region; + + private: + std::string name_; +}; + +BENCHMARK_DEFINE_F(RegisterInterestBM, registerInterestAllKeys) +(benchmark::State& state) { + for (auto _ : state) { + region->registerAllKeys(); + unregisterInterestAllKeys(state); + } +} +BENCHMARK_REGISTER_F(RegisterInterestBM, registerInterestAllKeys) + ->Unit(benchmark::kMillisecond) + ->Repetitions(1) + ->Iterations(10) + ->Arg(1000000); + +BENCHMARK_DEFINE_F(RegisterInterestBM, registerInterestAllKeysInitialValues) +(benchmark::State& state) { + for (auto _ : state) { + region->registerAllKeys(false, true); + unregisterInterestAllKeys(state); + } +} +BENCHMARK_REGISTER_F(RegisterInterestBM, registerInterestAllKeysInitialValues) + ->Unit(benchmark::kMillisecond) + ->Repetitions(1) + ->Iterations(10) + ->Arg(1000000); + +} // namespace diff --git a/cppcache/integration/test/RegisterKeysTest.cpp b/cppcache/integration/test/RegisterKeysTest.cpp index f1dfaa2..42f7d7a 100644 --- a/cppcache/integration/test/RegisterKeysTest.cpp +++ b/cppcache/integration/test/RegisterKeysTest.cpp @@ -16,6 +16,9 @@ #include <gmock/gmock.h> +#include <chrono> +#include <thread> + #include <boost/thread/latch.hpp> #include <gtest/gtest.h> @@ -39,12 +42,16 @@ namespace { using apache::geode::client::binary_semaphore; using apache::geode::client::Cache; using apache::geode::client::CacheableInt16; +using apache::geode::client::CacheableInt32; using apache::geode::client::CacheableKey; using apache::geode::client::CacheableString; using apache::geode::client::CacheFactory; +using apache::geode::client::CacheListener; using apache::geode::client::CacheListenerMock; +using apache::geode::client::EntryEvent; using apache::geode::client::IllegalStateException; using apache::geode::client::Region; +using apache::geode::client::RegionEvent; using apache::geode::client::RegionShortcut; using ::testing::_; @@ -52,6 +59,46 @@ using ::testing::DoAll; using ::testing::InvokeWithoutArgs; using ::testing::Return; +constexpr size_t kNumKeys = 100; + +class CountdownCacheListener : public CacheListener { + private: + size_t expectedCount_; + boost::latch allKeysInvalidateLatch_; + boost::latch allKeysUpdatedLatch_; + + public: + explicit CountdownCacheListener(size_t expectedCount) + : expectedCount_(expectedCount), + allKeysInvalidateLatch_(expectedCount), + allKeysUpdatedLatch_(expectedCount) {} + + void afterUpdate(const EntryEvent&) override { + allKeysUpdatedLatch_.count_down(); + } + + void afterInvalidate(const EntryEvent&) override { + allKeysInvalidateLatch_.count_down(); + } + + void reset() { + allKeysInvalidateLatch_.reset(expectedCount_); + allKeysUpdatedLatch_.reset(expectedCount_); + } + + template <class Rep, class Period> + boost::cv_status waitForUpdates( + const boost::chrono::duration<Rep, Period>& rel_time) { + return allKeysUpdatedLatch_.wait_for(rel_time); + } + + template <class Rep, class Period> + boost::cv_status waitForInvalidates( + const boost::chrono::duration<Rep, Period>& rel_time) { + return allKeysInvalidateLatch_.wait_for(rel_time); + } +}; + Cache createTestCache() { CacheFactory cacheFactory; return cacheFactory.set("log-level", "none") @@ -575,4 +622,215 @@ TEST(RegisterKeysTest, RegisterAnyWithProxyRegion) { cache.close(); } +apache::geode::client::Cache createCache() { + return apache::geode::client::CacheFactory() + .set("log-level", "debug") + .set("log-file", "c:/temp/RegisterKeysTest.log") + .set("statistic-sampling-enabled", "false") + .create(); +} + +std::shared_ptr<apache::geode::client::Pool> createPool( + Cluster& cluster, apache::geode::client::Cache& cache) { + auto poolFactory = cache.getPoolManager().createFactory(); + cluster.applyLocators(poolFactory); + poolFactory.setSubscriptionEnabled(true); // Per the customer. + return poolFactory.create("default"); +} + +std::shared_ptr<apache::geode::client::Region> setupRegion( + apache::geode::client::Cache& cache, + const std::shared_ptr<apache::geode::client::Pool>& pool) { + auto region = + cache + .createRegionFactory(apache::geode::client::RegionShortcut:: + CACHING_PROXY) // Per the customer. + .setPoolName(pool->getName()) + .create("region"); + + return region; +} + +TEST(RegisterKeysTest, DontReceiveValues) { + Cluster cluster{LocatorCount{1}, ServerCount{1}}; + + cluster.start(); + + cluster.getGfsh() + .create() + .region() + .withName("region") + .withType("PARTITION") + .execute(); + + auto cache1 = createCache(); + auto pool1 = createPool(cluster, cache1); + auto region1 = setupRegion(cache1, pool1); + auto attrMutator = region1->getAttributesMutator(); + + auto listener = std::make_shared<CountdownCacheListener>(kNumKeys); + + attrMutator->setCacheListener(listener); + + auto cache2 = createCache(); + auto pool2 = createPool(cluster, cache2); + auto region2 = setupRegion(cache2, pool2); + + for (auto i = 0U; i < kNumKeys; i++) { + region2->put(CacheableInt32::create(i), CacheableInt32::create(i)); + } + + region1->registerAllKeys(false, false, false); + + for (auto i = 0U; i < kNumKeys; i++) { + auto hasKey = region1->containsKey(CacheableInt32::create(i)); + EXPECT_FALSE(hasKey); + } + + for (auto i = 0U; i < kNumKeys; i++) { + auto value = region1->get(CacheableInt32::create(i)); + } + + listener->reset(); + + for (auto i = 0U; i < kNumKeys; i++) { + region2->put(CacheableInt32::create(i), CacheableInt32::create(i + 1000)); + } + + EXPECT_EQ(boost::cv_status::no_timeout, + listener->waitForInvalidates(boost::chrono::seconds(60))); + + for (auto i = 0U; i < kNumKeys; i++) { + auto hasKey = region1->containsKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasKey); + + auto hasValue = region1->containsValueForKey(CacheableInt32::create(i)); + EXPECT_FALSE(hasValue); + } +} + +TEST(RegisterKeysTest, ReceiveValuesLocalInvalidate) { + Cluster cluster{LocatorCount{1}, ServerCount{1}}; + + cluster.start(); + + cluster.getGfsh() + .create() + .region() + .withName("region") + .withType("PARTITION") + .execute(); + + auto cache1 = createCache(); + auto pool1 = createPool(cluster, cache1); + auto region1 = setupRegion(cache1, pool1); + auto attrMutator = region1->getAttributesMutator(); + + auto listener = std::make_shared<CountdownCacheListener>(kNumKeys); + attrMutator->setCacheListener(listener); + + auto cache2 = createCache(); + auto pool2 = createPool(cluster, cache2); + auto region2 = setupRegion(cache2, pool2); + + for (auto i = 0U; i < kNumKeys; i++) { + region2->put(CacheableInt32::create(i), CacheableInt32::create(i)); + } + + region1->registerAllKeys(false, true, true); + + for (auto i = 0U; i < kNumKeys; i++) { + auto hasKey = region1->containsKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasKey); + + auto hasValue = region1->containsValueForKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasValue); + } + + for (auto i = 0U; i < kNumKeys; i++) { + region1->localInvalidate(CacheableInt32::create(i)); + } + + for (auto i = 0U; i < kNumKeys; i++) { + auto hasKey = region1->containsKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasKey); + + auto hasValue = region1->containsValueForKey(CacheableInt32::create(i)); + EXPECT_FALSE(hasValue); + } + + listener->reset(); + + for (auto i = 0U; i < kNumKeys; i++) { + region2->put(CacheableInt32::create(i), CacheableInt32::create(i + 2000)); + } + + EXPECT_EQ(boost::cv_status::no_timeout, + listener->waitForUpdates(boost::chrono::minutes(1))); + + for (auto i = 0U; i < kNumKeys; i++) { + auto hasKey = region1->containsKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasKey); + + auto hasValue = region1->containsValueForKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasValue); + } +} + +TEST(RegisterKeysTest, ReceiveValues) { + Cluster cluster{LocatorCount{1}, ServerCount{1}}; + + cluster.start(); + + cluster.getGfsh() + .create() + .region() + .withName("region") + .withType("PARTITION") + .execute(); + + auto cache1 = createCache(); + auto pool1 = createPool(cluster, cache1); + auto region1 = setupRegion(cache1, pool1); + auto attrMutator = region1->getAttributesMutator(); + + auto listener = std::make_shared<CountdownCacheListener>(kNumKeys); + attrMutator->setCacheListener(listener); + + auto cache2 = createCache(); + auto pool2 = createPool(cluster, cache2); + auto region2 = setupRegion(cache2, pool2); + + for (auto i = 0U; i < kNumKeys; i++) { + region2->put(CacheableInt32::create(i), CacheableInt32::create(i)); + } + + region1->registerAllKeys(false, false, true); + + for (auto i = 0U; i < kNumKeys; i++) { + auto hasKey = region1->containsKey(CacheableInt32::create(i)); + EXPECT_FALSE(hasKey); + + auto hasValue = region1->containsValueForKey(CacheableInt32::create(i)); + EXPECT_FALSE(hasValue); + } + + listener->reset(); + + for (auto i = 0U; i < kNumKeys; i++) { + region2->put(CacheableInt32::create(i), CacheableInt32::create(i + 2000)); + } + + EXPECT_EQ(boost::cv_status::no_timeout, + listener->waitForUpdates(boost::chrono::seconds(60))); + + for (auto i = 0U; i < kNumKeys; i++) { + auto hasKey = region1->containsKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasKey); + + auto hasValue = region1->containsValueForKey(CacheableInt32::create(i)); + EXPECT_TRUE(hasValue); + } +} + } // namespace diff --git a/cppcache/src/InterestResultPolicy.cpp b/cppcache/src/InterestResultPolicy.cpp deleted file mode 100644 index b043896..0000000 --- a/cppcache/src/InterestResultPolicy.cpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "InterestResultPolicy.hpp" - -namespace apache { -namespace geode { -namespace client { - -char InterestResultPolicy::nextOrdinal = 0; -InterestResultPolicy InterestResultPolicy::NONE; -InterestResultPolicy InterestResultPolicy::KEYS; -InterestResultPolicy InterestResultPolicy::KEYS_VALUES; - -} // namespace client -} // namespace geode -} // namespace apache diff --git a/cppcache/src/InterestResultPolicy.hpp b/cppcache/src/InterestResultPolicy.hpp index b8ac52a..f066a13 100644 --- a/cppcache/src/InterestResultPolicy.hpp +++ b/cppcache/src/InterestResultPolicy.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_INTERESTRESULTPOLICY_H_ -#define GEODE_INTERESTRESULTPOLICY_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,34 +15,28 @@ * limitations under the License. */ -/** - * @file - */ -#include <geode/internal/geode_globals.hpp> +#pragma once + +#ifndef GEODE_INTERESTRESULTPOLICY_H_ +#define GEODE_INTERESTRESULTPOLICY_H_ + +#include <cstdint> namespace apache { namespace geode { namespace client { + /** - * @class InterestResultPolicy InterestResultPolicy.hpp * Policy class for interest result. + * + * Note: Special DataSeralizableFixedId(37) type. */ -class InterestResultPolicy { - // public static methods - public: - static char nextOrdinal; - - static InterestResultPolicy NONE; - static InterestResultPolicy KEYS; - static InterestResultPolicy KEYS_VALUES; - - char ordinal; - - char getOrdinal() { return ordinal; } - - private: - InterestResultPolicy() { ordinal = nextOrdinal++; } +enum class InterestResultPolicy : int8_t { + NONE = 0, + KEYS = 1, + KEYS_VALUES = 2 }; + } // namespace client } // namespace geode } // namespace apache diff --git a/cppcache/src/LocalRegion.cpp b/cppcache/src/LocalRegion.cpp index 4ee67db..38c2096 100644 --- a/cppcache/src/LocalRegion.cpp +++ b/cppcache/src/LocalRegion.cpp @@ -3269,10 +3269,9 @@ void LocalRegion::clearKeysOfInterestRegex( return; } - static const std::string ALL_KEYS_REGEX = ".*"; for (const auto& kv : interest_list) { const auto& regex = kv.first; - if (regex == ALL_KEYS_REGEX) { + if (regex == kAllKeysRegex) { localClear(); break; } else { diff --git a/cppcache/src/LocalRegion.hpp b/cppcache/src/LocalRegion.hpp index 46e8891..f124eff 100644 --- a/cppcache/src/LocalRegion.hpp +++ b/cppcache/src/LocalRegion.hpp @@ -40,6 +40,7 @@ #include "EntriesMap.hpp" #include "EventType.hpp" +#include "InterestResultPolicy.hpp" #include "RegionInternal.hpp" #include "RegionStats.hpp" #include "TSSTXStateWrapper.hpp" @@ -75,7 +76,6 @@ namespace client { class CreateActions; class DestroyActions; class InvalidateActions; -class InterestResultPolicy; class PutActions; class PutActionsTx; class RemoveActions; @@ -85,6 +85,8 @@ typedef std::unordered_map<std::shared_ptr<CacheableKey>, std::pair<std::shared_ptr<Cacheable>, int>> MapOfOldValue; +static const std::string kAllKeysRegex = ".*"; + /** * @class LocalRegion LocalRegion.hpp * diff --git a/cppcache/src/TcrChunkedContext.hpp b/cppcache/src/TcrChunkedContext.hpp index 0b9eec8..b90e87e 100644 --- a/cppcache/src/TcrChunkedContext.hpp +++ b/cppcache/src/TcrChunkedContext.hpp @@ -25,7 +25,6 @@ #include <ace/Semaphore.h> -#include "AppDomainContext.hpp" #include "Utils.hpp" #include "util/concurrent/binary_semaphore.hpp" diff --git a/cppcache/src/TcrMessage.cpp b/cppcache/src/TcrMessage.cpp index 6fc7b93..04816fa 100644 --- a/cppcache/src/TcrMessage.cpp +++ b/cppcache/src/TcrMessage.cpp @@ -166,7 +166,7 @@ TcrMessage::TcrMessage() m_isSecurityHeaderAdded(false), m_isMetaRegion(false), m_decodeAll(false), - m_interestPolicy(0), + m_interestPolicy(InterestResultPolicy::NONE), m_isDurable(false), m_receiveValues(false), m_hasCqsPart(false), @@ -185,13 +185,7 @@ const std::vector<std::shared_ptr<CacheableKey>>* TcrMessage::getKeys() const { const std::string& TcrMessage::getRegex() const { return m_regex; } InterestResultPolicy TcrMessage::getInterestResultPolicy() const { - if (m_interestPolicy == 2) { - return InterestResultPolicy::KEYS_VALUES; - } else if (m_interestPolicy == 1) { - return InterestResultPolicy::KEYS; - } else { - return InterestResultPolicy::NONE; - } + return m_interestPolicy; } bool TcrMessage::forPrimary() const { @@ -327,7 +321,7 @@ void TcrMessage::writeInterestResultPolicyPart(InterestResultPolicy policy) { m_request->write(static_cast<int8_t>(1)); // isObject m_request->write(static_cast<int8_t>(DSCode::FixedIDByte)); m_request->write(static_cast<int8_t>(DSCode::InterestResultPolicy)); - m_request->write(static_cast<int8_t>(policy.getOrdinal())); + m_request->write(static_cast<int8_t>(policy)); } void TcrMessage::writeIntPart(int32_t intValue) { @@ -2081,9 +2075,15 @@ TcrMessageRegisterInterestList::TcrMessageRegisterInterestList( m_regionName = region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); m_region = region; - m_timeout = DEFAULT_TIMEOUT; m_isDurable = isDurable; m_receiveValues = receiveValues; + m_interestPolicy = interestPolicy; + + if (!(interestPolicy == InterestResultPolicy::NONE || + interestPolicy == InterestResultPolicy::KEYS_VALUES)) { + throw IllegalArgumentException( + "interestPolicy must be NONE or KEYS_VALUES."); + } writeHeader(m_msgType, 6); @@ -2098,8 +2098,8 @@ TcrMessageRegisterInterestList::TcrMessageRegisterInterestList( // Part 4 auto cal = CacheableArrayList::create(); - for (auto&& key : keys) { - if (key == nullptr) { + for (const auto& key : keys) { + if (!key) { throw IllegalArgumentException( "keys in the interest list cannot be nullptr"); } @@ -2121,13 +2121,12 @@ TcrMessageRegisterInterestList::TcrMessageRegisterInterestList( writeObjectPart(byteArr); writeMessageLength(); - m_interestPolicy = interestPolicy.ordinal; } TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList( DataOutput* dataOutput, const Region* region, const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable, - bool receiveValues, InterestResultPolicy interestPolicy, + ThinClientBaseDM* connectionDM) { m_request.reset(dataOutput); m_msgType = TcrMessage::UNREGISTER_INTEREST_LIST; @@ -2136,24 +2135,29 @@ TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList( m_regionName = region == nullptr ? "INVALID_REGION_NAME" : region->getFullPath(); m_region = region; - m_timeout = DEFAULT_TIMEOUT; m_isDurable = isDurable; - m_receiveValues = receiveValues; - auto numInItrestList = keys.size(); - assert(numInItrestList != 0); - uint32_t numOfParts = 2 + static_cast<uint32_t>(numInItrestList); + auto numberOfKeys = static_cast<uint32_t>(keys.size()); + assert(numberOfKeys != 0); + auto numOfParts = 4 + numberOfKeys; - numOfParts += 2; writeHeader(m_msgType, numOfParts); + + // part 0 writeRegionPart(m_regionName); - writeBytePart(0); // isClosing + + // part 1 + writeBytePart(0); // isClosing + + // part 2 writeBytePart(isDurable ? 1 : 0); // keepalive - writeIntPart(static_cast<int32_t>(numInItrestList)); + // part 3 + writeIntPart(static_cast<int32_t>(numberOfKeys)); - for (uint32_t i = 0; i < numInItrestList; i++) { - if (keys[i] == nullptr) { + // part N + for (decltype(numberOfKeys) i = 0; i < numberOfKeys; i++) { + if (!keys[i]) { throw IllegalArgumentException( "keys in the interest list cannot be nullptr"); } @@ -2161,7 +2165,6 @@ TcrMessageUnregisterInterestList::TcrMessageUnregisterInterestList( } writeMessageLength(); - m_interestPolicy = interestPolicy.ordinal; } TcrMessageCreateRegion::TcrMessageCreateRegion( @@ -2181,64 +2184,82 @@ TcrMessageCreateRegion::TcrMessageCreateRegion( m_regionName = str2; } -TcrMessageRegisterInterest::TcrMessageRegisterInterest( - DataOutput* dataOutput, const std::string& str1, const std::string& str2, - InterestResultPolicy interestPolicy, bool isDurable, bool isCachingEnabled, - bool receiveValues, ThinClientBaseDM* connectionDM) { +TcrMessageRegisterInterestRegex::TcrMessageRegisterInterestRegex( + DataOutput* dataOutput, const std::string& regionName, + const std::string& regex, InterestResultPolicy interestPolicy, + bool isDurable, bool isCachingEnabled, bool receiveValues, + ThinClientBaseDM* connectionDM) { m_request.reset(dataOutput); m_msgType = TcrMessage::REGISTER_INTEREST; m_tcdm = connectionDM; m_isDurable = isDurable; m_receiveValues = receiveValues; + m_regionName = regionName; + m_regex = regex; + m_interestPolicy = interestPolicy; - uint32_t numOfParts = 7; + writeHeader(m_msgType, 7); - writeHeader(m_msgType, numOfParts); + // part 0 + writeRegionPart(regionName); + + // part 1 + writeIntPart(kREGULAR_EXPRESSION); // InterestType + + // part 2 + writeInterestResultPolicyPart(interestPolicy); - writeRegionPart(str1); // region name - writeIntPart(kREGULAR_EXPRESSION); // InterestType - writeInterestResultPolicyPart(interestPolicy); // InterestResultPolicy + // part 3 writeBytePart(isDurable ? 1 : 0); - writeRegionPart(str2); // regexp string + + // part 4 + writeRegionPart(regex); // regexp string int8_t bytes[2]; + + // part 5 std::shared_ptr<CacheableBytes> byteArr = nullptr; bytes[0] = receiveValues ? 0 : 1; byteArr = CacheableBytes::create(std::vector<int8_t>(bytes, bytes + 1)); writeObjectPart(byteArr); + + // part 6 bytes[0] = isCachingEnabled ? 1 : 0; // region data policy bytes[1] = 0; // serializevalues byteArr = CacheableBytes::create(std::vector<int8_t>(bytes, bytes + 2)); writeObjectPart(byteArr); writeMessageLength(); - m_regionName = str1; - m_regex = str2; - m_interestPolicy = interestPolicy.ordinal; } -TcrMessageUnregisterInterest::TcrMessageUnregisterInterest( - DataOutput* dataOutput, const std::string& str1, const std::string& str2, - InterestResultPolicy interestPolicy, bool isDurable, bool receiveValues, - ThinClientBaseDM* connectionDM) { +TcrMessageUnregisterInterestRegex::TcrMessageUnregisterInterestRegex( + DataOutput* dataOutput, const std::string& regionName, + const std::string& regex, bool isDurable, ThinClientBaseDM* connectionDM) { m_request.reset(dataOutput); m_msgType = TcrMessage::UNREGISTER_INTEREST; m_tcdm = connectionDM; m_isDurable = isDurable; - m_receiveValues = receiveValues; + m_regionName = regionName; + m_regex = regex; + + writeHeader(m_msgType, 5); + + // part 0 + writeRegionPart(regionName); + + // part 1 + writeIntPart(kREGULAR_EXPRESSION); + + // part 2 + writeRegionPart(regex); + + // part 3 + writeBytePart(0); // isClosing + + // part 4 + writeBytePart(isDurable ? 1 : 0); // keepalive - uint32_t numOfParts = 3; - numOfParts += 2; - writeHeader(m_msgType, numOfParts); - writeRegionPart(str1); // region name - writeIntPart(kREGULAR_EXPRESSION); // InterestType - writeRegionPart(str2); // regexp string - writeBytePart(0); // isClosing - writeBytePart(isDurable ? 1 : 0); // keepalive writeMessageLength(); - m_regionName = str1; - m_regex = str2; - m_interestPolicy = interestPolicy.ordinal; } TcrMessageTxSynchronization::TcrMessageTxSynchronization(DataOutput* dataOutput, diff --git a/cppcache/src/TcrMessage.hpp b/cppcache/src/TcrMessage.hpp index 9641444..8d9d44c 100644 --- a/cppcache/src/TcrMessage.hpp +++ b/cppcache/src/TcrMessage.hpp @@ -434,7 +434,7 @@ class TcrMessage { bool m_isMetaRegion; /** used only when decoding reply message, if false, decode header only */ bool m_decodeAll; - char m_interestPolicy; + InterestResultPolicy m_interestPolicy; bool m_isDurable; bool m_receiveValues; bool m_hasCqsPart; @@ -577,9 +577,7 @@ class TcrMessageUnregisterInterestList : public TcrMessage { TcrMessageUnregisterInterestList( DataOutput* dataOutput, const Region* region, const std::vector<std::shared_ptr<CacheableKey>>& keys, - bool isDurable = false, bool receiveValues = true, - InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, - ThinClientBaseDM* connectionDM = nullptr); + bool isDurable = false, ThinClientBaseDM* connectionDM = nullptr); ~TcrMessageUnregisterInterestList() override = default; }; @@ -607,26 +605,27 @@ class TcrMessageCreateRegion : public TcrMessage { ~TcrMessageCreateRegion() override = default; }; -class TcrMessageRegisterInterest : public TcrMessage { +class TcrMessageRegisterInterestRegex : public TcrMessage { public: - TcrMessageRegisterInterest( - DataOutput* dataOutput, const std::string& str1, const std::string& str2, + TcrMessageRegisterInterestRegex( + DataOutput* dataOutput, const std::string& regionName, + const std::string& regex, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, bool isDurable = false, bool isCachingEnabled = false, bool receiveValues = true, ThinClientBaseDM* connectionDM = nullptr); - ~TcrMessageRegisterInterest() override = default; + ~TcrMessageRegisterInterestRegex() override = default; }; -class TcrMessageUnregisterInterest : public TcrMessage { +class TcrMessageUnregisterInterestRegex : public TcrMessage { public: - TcrMessageUnregisterInterest( - DataOutput* dataOutput, const std::string& str1, const std::string& str2, - InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, - bool isDurable = false, bool receiveValues = true, - ThinClientBaseDM* connectionDM = nullptr); + TcrMessageUnregisterInterestRegex(DataOutput* dataOutput, + const std::string& regionName, + const std::string& regex, + bool isDurable = false, + ThinClientBaseDM* connectionDM = nullptr); - ~TcrMessageUnregisterInterest() override = default; + ~TcrMessageUnregisterInterestRegex() override = default; }; class TcrMessageTxSynchronization : public TcrMessage { diff --git a/cppcache/src/ThinClientRegion.cpp b/cppcache/src/ThinClientRegion.cpp index 562a89e..682e01a 100644 --- a/cppcache/src/ThinClientRegion.cpp +++ b/cppcache/src/ThinClientRegion.cpp @@ -339,10 +339,11 @@ void ThinClientRegion::initTCR() { } void ThinClientRegion::registerKeys( - const std::vector<std::shared_ptr<CacheableKey>>& keys, bool isDurable, - bool getInitialValues, bool receiveValues) { - auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); - if (pool != nullptr) { + const std::vector<std::shared_ptr<CacheableKey>>& keys, + const bool isDurable, const bool getInitialValues, + const bool receiveValues) { + if (auto pool = + m_cacheImpl->getPoolManager().find(getAttributes().getPoolName())) { if (!pool->getSubscriptionEnabled()) { LOGERROR( "Registering keys is supported " @@ -353,12 +354,14 @@ void ThinClientRegion::registerKeys( "only if pool subscription-enabled attribute is true."); } } + if (keys.empty()) { LOGERROR("Register keys list is empty"); throw IllegalArgumentException( "Register keys " "keys vector is empty"); } + if (isDurable && !isDurableClient()) { LOGERROR( "Register keys durable flag is only applicable for durable clients"); @@ -366,6 +369,7 @@ void ThinClientRegion::registerKeys( "Durable flag only applicable for " "durable clients"); } + if (getInitialValues && !m_regionAttributes.getCachingEnabled()) { LOGERROR( "Register keys getInitialValues flag is only applicable for caching" @@ -374,16 +378,15 @@ void ThinClientRegion::registerKeys( "getInitialValues flag only applicable for caching clients"); } - InterestResultPolicy interestPolicy = InterestResultPolicy::NONE; - if (getInitialValues) { - interestPolicy = InterestResultPolicy::KEYS_VALUES; - } + const auto interestPolicy = getInitialValues + ? InterestResultPolicy::KEYS_VALUES + : InterestResultPolicy::NONE; - LOGDEBUG("ThinClientRegion::registerKeys : interestpolicy is %d", - interestPolicy.ordinal); + LOGDEBUG("ThinClientRegion::registerKeys : interestPolicy is %d", + interestPolicy); - GfErrType err = registerKeysNoThrow(keys, true, nullptr, isDurable, - interestPolicy, receiveValues); + const auto err = registerKeysNoThrow(keys, true, nullptr, isDurable, + interestPolicy, receiveValues); if (m_tcrdm->isFatalError(err)) { throwExceptionIfError("Region::registerKeys", err); @@ -421,111 +424,64 @@ void ThinClientRegion::unregisterKeys( "Unregister keys " "keys vector is empty"); } - GfErrType err = unregisterKeysNoThrow(keys); + const auto err = unregisterKeysNoThrow(keys); throwExceptionIfError("Region::unregisterKeys", err); } -void ThinClientRegion::registerAllKeys(bool isDurable, bool getInitialValues, - bool receiveValues) { - auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); - if (pool != nullptr) { +void ThinClientRegion::registerAllKeys(const bool isDurable, + const bool getInitialValues, + const bool receiveValues) { + registerRegex(kAllKeysRegex, isDurable, getInitialValues, receiveValues); +} + +void ThinClientRegion::registerRegex(const std::string& regex, + const bool isDurable, + const bool getInitialValues, + const bool receiveValues) { + if (auto pool = + m_cacheImpl->getPoolManager().find(getAttributes().getPoolName())) { if (!pool->getSubscriptionEnabled()) { LOGERROR( - "Register all keys is supported only " - "if subscription-enabled attribute is true for pool " + + "Register regex is supported only if " + "subscription-enabled attribute is true for pool " + pool->getName()); throw UnsupportedOperationException( - "Register all keys is supported only " - "if pool subscription-enabled attribute is true."); + "Register regex is supported only if " + "pool subscription-enabled attribute is true."); } } + if (isDurable && !isDurableClient()) { LOGERROR( - "Register all keys durable flag is only applicable for durable " - "clients"); + "Register regex durable flag is only applicable for durable clients"); throw IllegalStateException( "Durable flag only applicable for durable clients"); } - if (getInitialValues && !m_regionAttributes.getCachingEnabled()) { + const auto caching = m_regionAttributes.getCachingEnabled(); + if (getInitialValues && !caching) { LOGERROR( - "Register all keys getInitialValues flag is only applicable for caching" + "Register regex getInitialValues flag is only applicable for caching" "clients"); throw IllegalStateException( "getInitialValues flag only applicable for caching clients"); } - InterestResultPolicy interestPolicy = InterestResultPolicy::NONE; - if (getInitialValues) { - interestPolicy = InterestResultPolicy::KEYS_VALUES; - } else { - interestPolicy = InterestResultPolicy::KEYS; - } - - LOGDEBUG("ThinClientRegion::registerAllKeys : interestpolicy is %d", - interestPolicy.ordinal); - - std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>> resultKeys; - // if we need to fetch initial data, then we get the keys in - // that call itself using the special GET_ALL message and do not need - // to get the keys in the initial register interest call - GfErrType err = - registerRegexNoThrow(".*", true, nullptr, isDurable, resultKeys, - interestPolicy, receiveValues); - - if (m_tcrdm->isFatalError(err)) { - throwExceptionIfError("Region::registerAllKeys", err); - } - - // Get the entries from the server using a special GET_ALL message - throwExceptionIfError("Region::registerAllKeys", err); -} - -void ThinClientRegion::registerRegex(const std::string& regex, bool isDurable, - bool getInitialValues, - bool receiveValues) { - auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); - if (pool != nullptr) { - if (!pool->getSubscriptionEnabled()) { - LOGERROR( - "Register regex is supported only if " - "subscription-enabled attribute is true for pool " + - pool->getName()); - throw UnsupportedOperationException( - "Register regex is supported only if " - "pool subscription-enabled attribute is true."); - } - } - if (isDurable && !isDurableClient()) { - LOGERROR("Register regex durable flag only applicable for durable clients"); - throw IllegalStateException( - "Durable flag only applicable for durable clients"); - } - if (regex.empty()) { throw IllegalArgumentException( "Region::registerRegex: Regex string is empty"); } - auto interestPolicy = InterestResultPolicy::NONE; - if (getInitialValues) { - interestPolicy = InterestResultPolicy::KEYS_VALUES; - } else { - interestPolicy = InterestResultPolicy::KEYS; - } - - LOGDEBUG("ThinClientRegion::registerRegex : interestpolicy is %d", - interestPolicy.ordinal); + const auto interestPolicy = + getInitialValues ? InterestResultPolicy::KEYS_VALUES + : caching && kAllKeysRegex != regex ? InterestResultPolicy::KEYS + : InterestResultPolicy::NONE; - auto resultKeys2 = - std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>(); + LOGDEBUG("ThinClientRegion::registerRegex : interestPolicy is %d", + interestPolicy); - // if we need to fetch initial data for "allKeys" case, then we - // get the keys in that call itself using the special GET_ALL message and - // do not need to get the keys in the initial register interest call - GfErrType err = - registerRegexNoThrow(regex, true, nullptr, isDurable, resultKeys2, - interestPolicy, receiveValues); + const auto err = registerRegexNoThrow(regex, true, nullptr, isDurable, + nullptr, interestPolicy, receiveValues); if (m_tcrdm->isFatalError(err)) { throwExceptionIfError("Region::registerRegex", err); @@ -535,8 +491,8 @@ void ThinClientRegion::registerRegex(const std::string& regex, bool isDurable, } void ThinClientRegion::unregisterRegex(const std::string& regex) { - auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); - if (pool != nullptr) { + if (auto pool = + m_cacheImpl->getPoolManager().find(getAttributes().getPoolName())) { if (!pool->getSubscriptionEnabled()) { LOGERROR( "Unregister regex is supported only if " @@ -553,26 +509,11 @@ void ThinClientRegion::unregisterRegex(const std::string& regex) { throw IllegalArgumentException("Unregister regex string is empty"); } - GfErrType err = unregisterRegexNoThrow(regex); + const auto err = unregisterRegexNoThrow(regex); throwExceptionIfError("Region::unregisterRegex", err); } -void ThinClientRegion::unregisterAllKeys() { - auto pool = m_cacheImpl->getPoolManager().find(getAttributes().getPoolName()); - if (pool != nullptr) { - if (!pool->getSubscriptionEnabled()) { - LOGERROR( - "Unregister all keys is supported only if " - "subscription-enabled attribute is true for pool " + - pool->getName()); - throw UnsupportedOperationException( - "Unregister all keys is supported only if " - "pool subscription-enabled attribute is true."); - } - } - GfErrType err = unregisterRegexNoThrow(".*"); - throwExceptionIfError("Region::unregisterAllKeys", err); -} +void ThinClientRegion::unregisterAllKeys() { unregisterRegex(kAllKeysRegex); } std::shared_ptr<SelectResults> ThinClientRegion::query( const std::string& predicate, std::chrono::milliseconds timeout) { @@ -2208,45 +2149,40 @@ GfErrType ThinClientRegion::registerKeysNoThrow( RegionGlobalLocks acquireLocksRedundancy(this, false); RegionGlobalLocks acquireLocksFailover(this); CHECK_DESTROY_PENDING_NOTHROW(shared_lock); - GfErrType err = GF_NOERR; + auto err = GF_NOERR; std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); if (keys.empty()) { return err; } - TcrMessageReply replyLocal(true, m_tcrdm.get()); - bool needToCreateRC = true; - if (reply == nullptr) { - reply = &replyLocal; - } else { - needToCreateRC = false; - } - - LOGDEBUG("ThinClientRegion::registerKeysNoThrow : interestpolicy is %d", - interestPolicy.ordinal); + LOGDEBUG("ThinClientRegion::registerKeysNoThrow : interestPolicy is %d", + interestPolicy); TcrMessageRegisterInterestList request( new DataOutput(m_cacheImpl->createDataOutput()), this, keys, isDurable, getAttributes().getCachingEnabled(), receiveValues, interestPolicy, m_tcrdm.get()); + std::recursive_mutex responseLock; - TcrChunkedResult* resultCollector = nullptr; - if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) { - auto values = std::make_shared<HashMapOfCacheable>(); - auto exceptions = std::make_shared<HashMapOfException>(); - MapOfUpdateCounters trackers; - int32_t destroyTracker = 1; - if (needToCreateRC) { - resultCollector = (new ChunkedGetAllResponse( - request, this, &keys, values, exceptions, nullptr, trackers, - destroyTracker, true, responseLock)); - reply->setChunkedResultHandler(resultCollector); - } - } else { - if (needToCreateRC) { - resultCollector = (new ChunkedInterestResponse(request, nullptr, *reply)); - reply->setChunkedResultHandler(resultCollector); + std::unique_ptr<TcrChunkedResult> resultCollector; + TcrMessageReply replyLocal(true, m_tcrdm.get()); + if (!reply) { + reply = &replyLocal; + if (interestPolicy == InterestResultPolicy::KEYS_VALUES) { + auto values = std::make_shared<HashMapOfCacheable>(); + auto exceptions = std::make_shared<HashMapOfException>(); + MapOfUpdateCounters trackers; + int32_t destroyTracker = 1; + resultCollector = + std::unique_ptr<TcrChunkedResult>(new ChunkedGetAllResponse( + request, this, &keys, values, exceptions, nullptr, trackers, + destroyTracker, true, responseLock)); + reply->setChunkedResultHandler(resultCollector.get()); + } else { + resultCollector = std::unique_ptr<TcrChunkedResult>( + new ChunkedInterestResponse(request, nullptr, *reply)); + reply->setChunkedResultHandler(resultCollector.get()); } } @@ -2262,15 +2198,12 @@ GfErrType ThinClientRegion::registerKeysNoThrow( endpoint->name().c_str()); } else if (attemptFailover) { addKeys(keys, isDurable, receiveValues, interestPolicy); - if (!(interestPolicy.ordinal == - InterestResultPolicy::KEYS_VALUES.ordinal)) { + if (interestPolicy != InterestResultPolicy::KEYS_VALUES) { localInvalidateForRegisterInterest(keys); } } } - if (needToCreateRC) { - delete resultCollector; - } + return err; } @@ -2280,7 +2213,7 @@ GfErrType ThinClientRegion::unregisterKeysNoThrow( RegionGlobalLocks acquireLocksRedundancy(this, false); RegionGlobalLocks acquireLocksFailover(this); CHECK_DESTROY_PENDING_NOTHROW(shared_lock); - GfErrType err = GF_NOERR; + auto err = GF_NOERR; std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); TcrMessageReply reply(true, m_tcrdm.get()); if (keys.empty()) { @@ -2295,8 +2228,8 @@ GfErrType ThinClientRegion::unregisterKeysNoThrow( } TcrMessageUnregisterInterestList request( - new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true, - InterestResultPolicy::NONE, m_tcrdm.get()); + new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, + m_tcrdm.get()); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { if (attemptFailover) { @@ -2316,7 +2249,7 @@ GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy( bool attemptFailover) { RegionGlobalLocks acquireLocksRedundancy(this, false); RegionGlobalLocks acquireLocksFailover(this); - GfErrType err = GF_NOERR; + auto err = GF_NOERR; std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); TcrMessageReply reply(true, m_tcrdm.get()); if (keys.empty()) { @@ -2331,8 +2264,8 @@ GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy( } TcrMessageUnregisterInterestList request( - new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, true, - InterestResultPolicy::NONE, m_tcrdm.get()); + new DataOutput(m_cacheImpl->createDataOutput()), this, keys, false, + m_tcrdm.get()); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR) { if (attemptFailover) { @@ -2350,7 +2283,7 @@ GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy( bool ThinClientRegion::isRegexRegistered( std::unordered_map<std::string, InterestResultPolicy>& interestListRegex, const std::string& regex, bool allKeys) { - if (interestListRegex.find(".*") != interestListRegex.end() || + if (interestListRegex.find(kAllKeysRegex) != interestListRegex.end() || (!allKeys && interestListRegex.find(regex) != interestListRegex.end())) { return true; } @@ -2366,9 +2299,9 @@ GfErrType ThinClientRegion::registerRegexNoThrow( RegionGlobalLocks acquireLocksRedundancy(this, false); RegionGlobalLocks acquireLocksFailover(this); CHECK_DESTROY_PENDING_NOTHROW(shared_lock); - GfErrType err = GF_NOERR; + auto err = GF_NOERR; - bool allKeys = (regex == ".*"); + const auto allKeys = regex == kAllKeysRegex; std::lock_guard<decltype(m_keysLock)> keysGuard(m_keysLock); if (attemptFailover) { @@ -2384,64 +2317,51 @@ GfErrType ThinClientRegion::registerRegexNoThrow( } } - ChunkedInterestResponse* resultCollector = nullptr; - ChunkedGetAllResponse* getAllResultCollector = nullptr; - if (reply != nullptr) { - // need to check - resultCollector = dynamic_cast<ChunkedInterestResponse*>( - reply->getChunkedResultHandler()); - if (resultCollector != nullptr) { - resultKeys = resultCollector->getResultKeys(); - } else { - getAllResultCollector = dynamic_cast<ChunkedGetAllResponse*>( - reply->getChunkedResultHandler()); - resultKeys = getAllResultCollector->getResultKeys(); - } - } + LOGDEBUG("ThinClientRegion::registerRegexNoThrow : interestPolicy is %d", + interestPolicy); - bool isRCCreatedLocally = false; - LOGDEBUG("ThinClientRegion::registerRegexNoThrow : interestpolicy is %d", - interestPolicy.ordinal); + TcrMessageRegisterInterestRegex request( + new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex, + interestPolicy, isDurable, getAttributes().getCachingEnabled(), + receiveValues, m_tcrdm.get()); - // TODO: - TcrMessageRegisterInterest request( - new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, - regex.c_str(), interestPolicy, isDurable, - getAttributes().getCachingEnabled(), receiveValues, m_tcrdm.get()); std::recursive_mutex responseLock; - if (reply == nullptr) { - TcrMessageReply replyLocal(true, m_tcrdm.get()); - auto values = std::make_shared<HashMapOfCacheable>(); - auto exceptions = std::make_shared<HashMapOfException>(); - + std::unique_ptr<TcrChunkedResult> resultCollector; + TcrMessageReply replyLocal(true, m_tcrdm.get()); + if (reply) { + if (auto chunkedInterestResponse = dynamic_cast<ChunkedInterestResponse*>( + reply->getChunkedResultHandler())) { + resultKeys = chunkedInterestResponse->getResultKeys(); + } else if (auto chunkedGetAllResponse = + dynamic_cast<ChunkedGetAllResponse*>( + reply->getChunkedResultHandler())) { + resultKeys = chunkedGetAllResponse->getResultKeys(); + } + } else { reply = &replyLocal; - if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) { + if (!resultKeys) { + resultKeys = + std::make_shared<std::vector<std::shared_ptr<CacheableKey>>>(); + } + if (interestPolicy == InterestResultPolicy::KEYS_VALUES) { + auto values = std::make_shared<HashMapOfCacheable>(); + auto exceptions = std::make_shared<HashMapOfException>(); MapOfUpdateCounters trackers; int32_t destroyTracker = 1; - if (resultKeys == nullptr) { - resultKeys = - std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>( - new std::vector<std::shared_ptr<CacheableKey>>()); - } - // need to check - getAllResultCollector = (new ChunkedGetAllResponse( - request, this, nullptr, values, exceptions, resultKeys, trackers, - destroyTracker, true, responseLock)); - reply->setChunkedResultHandler(getAllResultCollector); - isRCCreatedLocally = true; - } else { - isRCCreatedLocally = true; - // need to check resultCollector = - new ChunkedInterestResponse(request, resultKeys, replyLocal); - reply->setChunkedResultHandler(resultCollector); + std::unique_ptr<TcrChunkedResult>(new ChunkedGetAllResponse( + request, this, nullptr, values, exceptions, resultKeys, trackers, + destroyTracker, true, responseLock)); + } else { + resultCollector = std::unique_ptr<TcrChunkedResult>( + new ChunkedInterestResponse(request, resultKeys, *reply)); } - err = m_tcrdm->sendSyncRequestRegisterInterest( - request, replyLocal, attemptFailover, this, endpoint); - } else { - err = m_tcrdm->sendSyncRequestRegisterInterest( - request, *reply, attemptFailover, this, endpoint); + reply->setChunkedResultHandler(resultCollector.get()); } + + err = m_tcrdm->sendSyncRequestRegisterInterest( + request, *reply, attemptFailover, this, endpoint); + if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { if (reply->getMessageType() == TcrMessage::RESPONSE_FROM_SECONDARY && endpoint) { @@ -2451,26 +2371,18 @@ GfErrType ThinClientRegion::registerRegexNoThrow( endpoint->name().c_str()); } else if (attemptFailover) { addRegex(regex, isDurable, receiveValues, interestPolicy); - if (interestPolicy.ordinal != InterestResultPolicy::KEYS_VALUES.ordinal) { + if (interestPolicy != InterestResultPolicy::KEYS_VALUES) { if (allKeys) { localInvalidateRegion_internal(); } else { - const std::shared_ptr<std::vector<std::shared_ptr<CacheableKey>>>& - keys = resultCollector != nullptr - ? resultCollector->getResultKeys() - : getAllResultCollector->getResultKeys(); - if (keys != nullptr) { - localInvalidateForRegisterInterest(*keys); + if (resultKeys) { + localInvalidateForRegisterInterest(*resultKeys); } } } } } - if (isRCCreatedLocally == true) { - if (resultCollector != nullptr) delete resultCollector; - if (getAllResultCollector != nullptr) delete getAllResultCollector; - } return err; } @@ -2479,15 +2391,15 @@ GfErrType ThinClientRegion::unregisterRegexNoThrow(const std::string& regex, RegionGlobalLocks acquireLocksRedundancy(this, false); RegionGlobalLocks acquireLocksFailover(this); CHECK_DESTROY_PENDING_NOTHROW(shared_lock); - GfErrType err = GF_NOERR; + auto err = GF_NOERR; err = findRegex(regex); if (err == GF_NOERR) { TcrMessageReply reply(false, m_tcrdm.get()); - TcrMessageUnregisterInterest request( + TcrMessageUnregisterInterestRegex request( new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex, - InterestResultPolicy::NONE, false, true, m_tcrdm.get()); + false, m_tcrdm.get()); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { if (attemptFailover) { @@ -2531,9 +2443,9 @@ GfErrType ThinClientRegion::unregisterRegexNoThrowLocalDestroy( if (err == GF_NOERR) { TcrMessageReply reply(false, m_tcrdm.get()); - TcrMessageUnregisterInterest request( + TcrMessageUnregisterInterestRegex request( new DataOutput(m_cacheImpl->createDataOutput()), m_fullPath, regex, - InterestResultPolicy::NONE, false, true, m_tcrdm.get()); + false, m_tcrdm.get()); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR) { if (attemptFailover) { @@ -2580,7 +2492,7 @@ void ThinClientRegion::addRegex(const std::string& regex, bool isDurable, : (receiveValues ? m_interestListRegex : m_interestListRegexForUpdatesAsInvalidates); - if (regex == ".*") { + if (regex == kAllKeysRegex) { interestListRegex.clear(); interestList.clear(); } diff --git a/cppcache/test/CMakeLists.txt b/cppcache/test/CMakeLists.txt index a19c393..8736b0f 100644 --- a/cppcache/test/CMakeLists.txt +++ b/cppcache/test/CMakeLists.txt @@ -44,7 +44,6 @@ add_executable(apache-geode_unittests geodeBannerTest.cpp gtest_extensions.h gmock_extensions.h - InterestResultPolicyTest.cpp LocalRegionTest.cpp LoggingTest.cpp LRUQueueTest.cpp diff --git a/cppcache/test/InterestResultPolicyTest.cpp b/cppcache/test/InterestResultPolicyTest.cpp deleted file mode 100644 index 8f2f00d..0000000 --- a/cppcache/test/InterestResultPolicyTest.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <InterestResultPolicy.hpp> - -#include <gtest/gtest.h> - -using apache::geode::client::InterestResultPolicy; - -TEST(InterestResultPolicyTest, VerifyOrdinals) { - EXPECT_NE(InterestResultPolicy::NONE.getOrdinal(), - InterestResultPolicy::KEYS.getOrdinal()) - << "NONE and KEYS have different ordinals"; - EXPECT_NE(InterestResultPolicy::KEYS.getOrdinal(), - InterestResultPolicy::KEYS_VALUES.getOrdinal()) - << "KEYS and KEYS_VALUES have different ordinals"; - EXPECT_NE(InterestResultPolicy::KEYS_VALUES.getOrdinal(), - InterestResultPolicy::NONE.getOrdinal()) - << "KEYS_VALUES and NONE have different ordinals"; -} diff --git a/cppcache/test/TcrMessageTest.cpp b/cppcache/test/TcrMessageTest.cpp index ad74626..2654d1a 100644 --- a/cppcache/test/TcrMessageTest.cpp +++ b/cppcache/test/TcrMessageTest.cpp @@ -383,9 +383,7 @@ TEST_F(TcrMessageTest, testConstructor5WithUnregisterInteresetList) { TcrMessageUnregisterInterestList message( new DataOutputUnderTest(), static_cast<const Region *>(nullptr), keys, - false, // isDurable - false, // receiveValues - InterestResultPolicy::NONE, static_cast<ThinClientBaseDM *>(nullptr)); + false, static_cast<ThinClientBaseDM *>(nullptr)); EXPECT_EQ(TcrMessage::UNREGISTER_INTEREST_LIST, message.getMessageType()); @@ -428,27 +426,25 @@ TEST_F(TcrMessageTest, testConstructorKeySet) { TEST_F(TcrMessageTest, testConstructor6WithCreateRegion) { using apache::geode::client::TcrMessageCreateRegion; - TcrMessageCreateRegion message(new DataOutputUnderTest(), - "str1", // TODO: what does this parameter do?! - "str2", // TODO: what does this parameter do?! - false, // isDurable - false, // receiveValues + TcrMessageCreateRegion message(new DataOutputUnderTest(), "parentRegionName", + "regionName", + false, // isDurable + false, // receiveValues static_cast<ThinClientBaseDM *>(nullptr)); EXPECT_EQ(TcrMessage::CREATE_REGION, message.getMessageType()); EXPECT_MESSAGE_EQ( - "0000001D0000001200000002FFFFFFFF00000000040073747231000000040073747232", + "0000001D0000002400000002FFFFFFFF000000001000706172656E74526567696F6E4E61" + "6D650000000A00726567696F6E4E616D65", message); } TEST_F(TcrMessageTest, testConstructor6WithRegisterInterest) { - using apache::geode::client::TcrMessageRegisterInterest; + using apache::geode::client::TcrMessageRegisterInterestRegex; - TcrMessageRegisterInterest message( - new DataOutputUnderTest(), - "str1", // TODO: what does this parameter do?! - "str2", // TODO: what does this parameter do?! + TcrMessageRegisterInterestRegex message( + new DataOutputUnderTest(), "regionName", "regexString", InterestResultPolicy::NONE, false, // isDurable false, // isCacheingEnabled @@ -458,28 +454,105 @@ TEST_F(TcrMessageTest, testConstructor6WithRegisterInterest) { EXPECT_EQ(TcrMessage::REGISTER_INTEREST, message.getMessageType()); EXPECT_MESSAGE_EQ( - "000000140000003600000007FFFFFFFF0000000004007374723100000004000000000100" - "0000030101250000000001000000000004007374723200000001000100000002000000", + "000000140000004300000007FFFFFFFF000000000A00726567696F6E4E616D6500000004" + "000000000100000003010125000000000100000000000B007265676578537472696E6700" + "000001000100000002000000", message); -} -TEST_F(TcrMessageTest, testConstructor6WithUnregisterInterest) { - using apache::geode::client::TcrMessageUnregisterInterest; + TcrMessageRegisterInterestRegex message2( + new DataOutputUnderTest(), "regionName", "regexString", + InterestResultPolicy::NONE, + true, // isDurable + false, // isCacheingEnabled + false, // receiveValues + static_cast<ThinClientBaseDM *>(nullptr)); - TcrMessageUnregisterInterest message( - new DataOutputUnderTest(), - "str1", // TODO: what does this parameter do?! - "str2", // TODO: what does this parameter do?! + EXPECT_EQ(TcrMessage::REGISTER_INTEREST, message2.getMessageType()); + + EXPECT_MESSAGE_EQ( + "000000140000004300000007FFFFFFFF000000000A00726567696F6E4E616D6500000004" + "000000000100000003010125000000000100010000000B007265676578537472696E6700" + "000001000100000002000000", + message2); + + TcrMessageRegisterInterestRegex message3( + new DataOutputUnderTest(), "regionName", "regexString", InterestResultPolicy::NONE, false, // isDurable + true, // isCacheingEnabled false, // receiveValues static_cast<ThinClientBaseDM *>(nullptr)); + EXPECT_EQ(TcrMessage::REGISTER_INTEREST, message3.getMessageType()); + + EXPECT_MESSAGE_EQ( + "000000140000004300000007FFFFFFFF000000000A00726567696F6E4E616D6500000004" + "000000000100000003010125000000000100000000000B007265676578537472696E6700" + "000001000100000002000100", + message3); + + TcrMessageRegisterInterestRegex message4( + new DataOutputUnderTest(), "regionName", "regexString", + InterestResultPolicy::NONE, + false, // isDurable + false, // isCacheingEnabled + true, // receiveValues + static_cast<ThinClientBaseDM *>(nullptr)); + + EXPECT_EQ(TcrMessage::REGISTER_INTEREST, message4.getMessageType()); + + EXPECT_MESSAGE_EQ( + "000000140000004300000007FFFFFFFF000000000A00726567696F6E4E616D6500000004" + "000000000100000003010125000000000100000000000B007265676578537472696E6700" + "000001000000000002000000", + message4); + + TcrMessageRegisterInterestRegex message5( + new DataOutputUnderTest(), "regionName", "regexString", + InterestResultPolicy::KEYS, + true, // isDurable + true, // isCacheingEnabled + true, // receiveValues + static_cast<ThinClientBaseDM *>(nullptr)); + + EXPECT_EQ(TcrMessage::REGISTER_INTEREST, message5.getMessageType()); + + EXPECT_MESSAGE_EQ( + "000000140000004300000007FFFFFFFF000000000A00726567696F6E4E616D6500000004" + "000000000100000003010125010000000100010000000B007265676578537472696E6700" + "000001000000000002000100", + message5); + + TcrMessageRegisterInterestRegex message6( + new DataOutputUnderTest(), "regionName", "regexString", + InterestResultPolicy::KEYS_VALUES, + true, // isDurable + true, // isCacheingEnabled + true, // receiveValues + static_cast<ThinClientBaseDM *>(nullptr)); + + EXPECT_EQ(TcrMessage::REGISTER_INTEREST, message6.getMessageType()); + + EXPECT_MESSAGE_EQ( + "000000140000004300000007FFFFFFFF000000000A00726567696F6E4E616D6500000004" + "000000000100000003010125020000000100010000000B007265676578537472696E6700" + "000001000000000002000100", + message6); +} + +TEST_F(TcrMessageTest, testConstructor6WithUnregisterInterest) { + using apache::geode::client::TcrMessageUnregisterInterestRegex; + + TcrMessageUnregisterInterestRegex message( + new DataOutputUnderTest(), "regionName", "regexString", + false, // isDurable + static_cast<ThinClientBaseDM *>(nullptr)); + EXPECT_EQ(TcrMessage::UNREGISTER_INTEREST, message.getMessageType()); EXPECT_MESSAGE_EQ( - "000000160000002700000005FFFFFFFF0000000004007374723100000004000000000100" - "0000040073747232000000010000000000010000", + "000000160000003400000005FFFFFFFF000000000A00726567696F6E4E616D6500000004" + "00000000010000000B007265676578537472696E67000000010000000000010000", message); }