Repository: geode Updated Branches: refs/heads/develop ab90d406b -> 56ea940d3
GEODE-3315: Replaced PreferBytes... with VMCachedDeserializable When getting a HAEventWrapper as part of a GII, make sure that we store the wrapper in a VMCachedDeserializable. This object needs to have a reference to the HAContainer. If PREFER_SERIALIZED is set to true, we we using a PreferBytesSerializable which would always create new copy of the HAEventWrapper. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/56ea940d Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/56ea940d Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/56ea940d Branch: refs/heads/develop Commit: 56ea940d3c826e98b16d6b508fc834f7bd50220c Parents: ab90d40 Author: Barry Oglesby <[email protected]> Authored: Thu Jun 22 13:52:24 2017 -0700 Committer: Dan Smith <[email protected]> Committed: Tue Aug 1 15:30:43 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/AbstractRegionMap.java | 12 +- .../ha/PreferSerializedHARegionQueueTest.java | 184 +++++++++++++++++++ 2 files changed, 189 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/56ea940d/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index f958f94..fd5a430 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -771,12 +771,11 @@ public abstract class AbstractRegionMap implements RegionMap { if (owner instanceof HARegion && newValue instanceof CachedDeserializable) { Object actualVal = null; + CachedDeserializable newValueCd = (CachedDeserializable) newValue; try { - actualVal = - BlobHelper.deserializeBlob(((CachedDeserializable) newValue).getSerializedValue(), - sender.getVersionObject(), null); - newValue = CachedDeserializableFactory.create(actualVal, - ((CachedDeserializable) newValue).getValueSizeInBytes()); + actualVal = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(), + sender.getVersionObject(), null); + newValue = new VMCachedDeserializable(actualVal, newValueCd.getSizeInBytes()); } catch (IOException | ClassNotFoundException e) { throw new RuntimeException("Unable to deserialize HA event for region " + owner); } @@ -809,8 +808,7 @@ public abstract class AbstractRegionMap implements RegionMap { HARegionQueue.addClientCQsAndInterestList(oldMsg, haEventWrapper, haContainer, owner.getName()); haEventWrapper.setClientUpdateMessage(null); - newValue = CachedDeserializableFactory.create(original, - ((CachedDeserializable) newValue).getSizeInBytes()); + newValue = new VMCachedDeserializable(original, newValueCd.getSizeInBytes()); } else { original = null; } http://git-wip-us.apache.org/repos/asf/geode/blob/56ea940d/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java new file mode 100644 index 0000000..59aa79a --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/PreferSerializedHARegionQueueTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.ha; + +import static org.junit.Assert.assertEquals; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientCacheFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.cache.client.PoolFactory; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; +import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper; +import org.apache.geode.test.dunit.DUnitEnv; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.awaitility.Awaitility; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +@Category(DistributedTest.class) +public class PreferSerializedHARegionQueueTest extends JUnit4CacheTestCase { + + private static final long serialVersionUID = 1L; + + @Test + public void copyingHARegionQueueShouldNotThrowException() throws Exception { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + VM vm3 = host.getVM(3); + VM vm4 = host.getVM(4); + VM vm5 = host.getVM(5); + VM vm6 = host.getVM(6); + + // Set prefer serialized + vm1.invoke(() -> setPreferSerialized()); + vm2.invoke(() -> setPreferSerialized()); + vm3.invoke(() -> setPreferSerialized()); + vm4.invoke(() -> setPreferSerialized()); + + String regionName = getTestMethodName() + "_PR"; + try { + // Initialize initial cache servers + vm1.invoke(() -> initializeServer(regionName)); + vm2.invoke(() -> initializeServer(regionName)); + + // Create register interest client + vm5.invoke(() -> createClient(regionName, true, 1, Integer.MAX_VALUE)); + + // Wait for both primary and secondary servers to establish proxies + vm1.invoke(() -> waitForCacheClientProxies(1)); + vm2.invoke(() -> waitForCacheClientProxies(1)); + + // Create client loader and load entries + int numPuts = 10; + vm6.invoke( + () -> createClient(regionName, false, 0, PoolFactory.DEFAULT_SUBSCRIPTION_ACK_INTERVAL)); + vm6.invoke(() -> { + Region region = getCache().getRegion(regionName); + IntStream.range(0, numPuts).forEach(i -> region.put(i, i)); + }); + + // Verify HARegion sizes + vm1.invoke(() -> waitForHARegionSize(numPuts)); + vm2.invoke(() -> waitForHARegionSize(numPuts)); + + // Initialize next cache server + vm3.invoke(() -> initializeServer(regionName)); + + // Stop one of the original cache servers + vm1.invoke(() -> closeCache()); + + // Wait for new cache server to establish proxies + vm3.invoke(() -> waitForCacheClientProxies(1)); + + // Verify HARegion size + vm3.invoke(() -> waitForHARegionSize(numPuts)); + + // Initialize final cache server + vm4.invoke(() -> initializeServer(regionName)); + + // Stop other original cache server + vm2.invoke(() -> closeCache()); + + // Wait for new cache server to establish proxies + vm4.invoke(() -> waitForCacheClientProxies(1)); + + // Verify HARegion size + vm4.invoke(() -> waitForHARegionSize(numPuts)); + + // Stop the clients to prevent suspect strings when the servers are stopped + vm5.invoke(() -> closeCache()); + vm6.invoke(() -> closeCache()); + } finally { + // Clear prefer serialized + vm1.invoke(() -> clearPreferSerialized()); + vm2.invoke(() -> clearPreferSerialized()); + vm3.invoke(() -> clearPreferSerialized()); + vm4.invoke(() -> clearPreferSerialized()); + } + } + + public void initializeServer(String regionName) throws IOException { + getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName); + + final CacheServer cacheServer = getCache().addCacheServer(); + cacheServer.setPort(0); + cacheServer.start(); + } + + public void createClient(String regionName, boolean subscriptionEnabled, + int subscriptionRedundancy, int subscriptionAckInterval) { + + ClientCacheFactory clientCacheFactory = + new ClientCacheFactory().setPoolSubscriptionAckInterval(subscriptionAckInterval) + .setPoolSubscriptionEnabled(subscriptionEnabled) + .setPoolSubscriptionRedundancy(subscriptionRedundancy) + .addPoolLocator("localhost", DUnitEnv.get().getLocatorPort()); + + ClientCache cache = getClientCache(clientCacheFactory); + + Region region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); + if (subscriptionEnabled) { + region.registerInterest("ALL_KEYS"); + } + } + + public static void setPreferSerialized() { + System.setProperty("gemfire.PREFER_SERIALIZED", "true"); + } + + public static void clearPreferSerialized() { + System.clearProperty("gemfire.PREFER_SERIALIZED"); + } + + public void waitForCacheClientProxies(final int expectedSize) { + final CacheServer cs = getCache().getCacheServers().iterator().next(); + Awaitility.await().atMost(1, TimeUnit.MINUTES) + .until(() -> assertEquals(expectedSize, cs.getAllClientSessions().size())); + } + + public void waitForHARegionSize(final int expectedSize) { + final CacheServer cs = getCache().getCacheServers().iterator().next(); + final CacheClientProxy ccp = (CacheClientProxy) cs.getAllClientSessions().iterator().next(); + Awaitility.await().atMost(1, TimeUnit.MINUTES) + .until(() -> assertEquals(expectedSize, getHAEventsCount(ccp))); + } + + private static int getHAEventsCount(CacheClientProxy ccp) { + Region haRegion = ccp.getHARegion(); + if (haRegion == null) { + return 0; + } + int count = 0; + for (Object value : haRegion.values()) { + if (value instanceof HAEventWrapper) { + count += 1; + } + } + return count; + } +}
