Repository: geode Updated Branches: refs/heads/release/1.2.0 2974dab32 -> 2cf335c11
GEODE-3172: Fix serialization errors copying queue between 1.0 and 1.2 Deserialize a HAEventWrapper using the version of the sender when receiving a GII. Serialize entries using the version of the remote member when sending data as part of GII. This works for the client queues because client queues always have deserialized values. If there is an internal region that has serialized values in memory, those values would still be copied on the wire directly without being translated to the old members version. Adding a test that demonstrates the serialization issues we were seeing with this issue. The test starts a 1.0 server, puts some data in the queue and starts a 1.2 server. This closes #620 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/2cf335c1 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/2cf335c1 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/2cf335c1 Branch: refs/heads/release/1.2.0 Commit: 2cf335c117639781d4ba58786c51f1a778a4c404 Parents: 2974dab Author: Dan Smith <[email protected]> Authored: Thu Jul 6 15:08:04 2017 -0700 Committer: Dan Smith <[email protected]> Committed: Mon Jul 10 15:25:19 2017 -0700 ---------------------------------------------------------------------- .../cache/AbstractOplogDiskRegionEntry.java | 8 +- .../internal/cache/AbstractRegionEntry.java | 9 +- .../geode/internal/cache/AbstractRegionMap.java | 13 +- .../apache/geode/internal/cache/DiskEntry.java | 8 +- .../internal/cache/InitialImageOperation.java | 6 +- .../internal/cache/NonLocalRegionEntry.java | 6 +- .../org/apache/geode/internal/cache/Oplog.java | 5 +- .../geode/internal/cache/ProxyRegionMap.java | 6 +- .../geode/internal/cache/RegionEntry.java | 6 +- .../internal/cache/ValidatingDiskRegion.java | 4 +- .../sockets/ClientServerMiscBCDUnitTest.java | 257 -------------- geode-cq/build.gradle | 1 + .../sockets/ClientServerMiscBCDUnitTest.java | 350 +++++++++++++++++++ 13 files changed, 398 insertions(+), 281 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractOplogDiskRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractOplogDiskRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractOplogDiskRegionEntry.java index bfeb941..45d1b14 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractOplogDiskRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractOplogDiskRegionEntry.java @@ -18,6 +18,8 @@ import org.apache.geode.cache.EntryEvent; import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.distributed.internal.DM; import org.apache.geode.internal.ByteArrayDataInput; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.cache.InitialImageOperation.Entry; import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.offheap.annotations.Retained; @@ -54,9 +56,9 @@ public abstract class AbstractOplogDiskRegionEntry extends AbstractDiskRegionEnt } @Override - public boolean fillInValue(LocalRegion r, InitialImageOperation.Entry entry, - ByteArrayDataInput in, DM mgr) { - return Helper.fillInValue(this, entry, r.getDiskRegion(), mgr, in, r); + public boolean fillInValue(LocalRegion r, Entry entry, ByteArrayDataInput in, DM mgr, + final Version version) { + return Helper.fillInValue(this, entry, r.getDiskRegion(), mgr, in, r, version); } @Override http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java index 1bc31b3..4b420b8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionEntry.java @@ -19,6 +19,7 @@ import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.*; import java.io.IOException; import java.util.Arrays; +import org.apache.geode.internal.cache.InitialImageOperation.Entry; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -326,8 +327,8 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje @Override public boolean fillInValue(LocalRegion region, - @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry entry, - ByteArrayDataInput in, DM mgr) { + @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) Entry entry, ByteArrayDataInput in, DM mgr, + final Version version) { // starting default value entry.setSerialized(false); @@ -362,7 +363,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje entry.value = tmp; } else { try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + HeapDataOutputStream hdos = new HeapDataOutputStream(version); BlobHelper.serializeTo(tmp, hdos); hdos.trim(); entry.value = hdos; @@ -386,7 +387,7 @@ public abstract class AbstractRegionEntry implements RegionEntry, HashEntry<Obje } } try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + HeapDataOutputStream hdos = new HeapDataOutputStream(version); BlobHelper.serializeTo(preparedValue, hdos); hdos.trim(); entry.value = hdos; http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java index ece3de1..7f12eab 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache; +import java.io.IOException; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; @@ -22,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.geode.internal.util.BlobHelper; import org.apache.logging.log4j.Logger; import org.apache.geode.GemFireIOException; @@ -768,7 +770,16 @@ public abstract class AbstractRegionMap implements RegionMap { } if (owner instanceof HARegion && newValue instanceof CachedDeserializable) { - Object actualVal = ((CachedDeserializable) newValue).getDeserializedValue(null, null); + Object actualVal = null; + try { + actualVal = + BlobHelper.deserializeBlob(((CachedDeserializable) newValue).getSerializedValue(), + sender.getVersionObject(), null); + newValue = CachedDeserializableFactory.create(actualVal, + ((CachedDeserializable) newValue).getValueSizeInBytes()); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException("Unable to deserialize HA event for region " + owner); + } if (actualVal instanceof HAEventWrapper) { HAEventWrapper haEventWrapper = (HAEventWrapper) actualVal; // Key was removed at sender side so not putting it into the HARegion http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java index f78a6c1..6b0871e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskEntry.java @@ -276,7 +276,7 @@ public interface DiskEntry extends RegionEntry { * @since GemFire 3.2.1 */ static boolean fillInValue(DiskEntry de, InitialImageOperation.Entry entry, DiskRegion dr, - DM mgr, ByteArrayDataInput in, RegionEntryContext context) { + DM mgr, ByteArrayDataInput in, RegionEntryContext context, Version version) { @Retained @Released Object v = null; @@ -310,7 +310,7 @@ public interface DiskEntry extends RegionEntry { } assert did != null; // do recursive call to get readLock on did - return fillInValue(de, entry, dr, mgr, in, context); + return fillInValue(de, entry, dr, mgr, in, context, version); } if (logger.isDebugEnabled()) { logger.debug( @@ -360,7 +360,7 @@ public interface DiskEntry extends RegionEntry { entry.setSerialized(true); } else { try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + HeapDataOutputStream hdos = new HeapDataOutputStream(version); BlobHelper.serializeTo(tmp, hdos); hdos.trim(); entry.value = hdos; @@ -401,7 +401,7 @@ public interface DiskEntry extends RegionEntry { } { try { - HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT); + HeapDataOutputStream hdos = new HeapDataOutputStream(version); BlobHelper.serializeTo(preparedValue, hdos); hdos.trim(); entry.value = hdos; http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java index f8e9d0f..b48fdc5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java @@ -1910,7 +1910,8 @@ public class InitialImageOperation { entry = new InitialImageOperation.Entry(); entry.key = key; entry.setVersionTag(stamp.asVersionTag()); - fillRes = mapEntry.fillInValue(rgn, entry, in, rgn.getDistributionManager()); + fillRes = mapEntry.fillInValue(rgn, entry, in, rgn.getDistributionManager(), + sender.getVersionObject()); if (versionVector != null) { if (logger.isTraceEnabled(LogMarker.GII)) { logger.trace(LogMarker.GII, "chunkEntries:entry={},stamp={}", entry, stamp); @@ -1920,7 +1921,8 @@ public class InitialImageOperation { } else { entry = new InitialImageOperation.Entry(); entry.key = key; - fillRes = mapEntry.fillInValue(rgn, entry, in, rgn.getDistributionManager()); + fillRes = mapEntry.fillInValue(rgn, entry, in, rgn.getDistributionManager(), + sender.getVersionObject()); } } catch (DiskAccessException dae) { rgn.handleDiskAccessException(dae); http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java index a6bb959..4709b7b 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/NonLocalRegionEntry.java @@ -30,6 +30,8 @@ import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.Assert; import org.apache.geode.internal.ByteArrayDataInput; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.cache.InitialImageOperation.Entry; import org.apache.geode.internal.cache.lru.NewLRUClockHand; import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.versions.VersionStamp; @@ -184,8 +186,8 @@ public class NonLocalRegionEntry implements RegionEntry, VersionStamp { return false; } - public boolean fillInValue(LocalRegion r, InitialImageOperation.Entry entry, - ByteArrayDataInput in, DM mgr) { + public boolean fillInValue(LocalRegion r, Entry entry, ByteArrayDataInput in, DM mgr, + final Version version) { throw new UnsupportedOperationException( LocalizedStrings.PartitionedRegion_NOT_APPROPRIATE_FOR_PARTITIONEDREGIONNONLOCALREGIONENTRY .toLocalizedString()); http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java index 7bf1a9d..5399d5a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java @@ -7126,9 +7126,8 @@ public class Oplog implements CompactableOplog, Flushable { } @Override - public boolean fillInValue(LocalRegion r, - org.apache.geode.internal.cache.InitialImageOperation.Entry entry, ByteArrayDataInput in, - DM mgr) { + public boolean fillInValue(LocalRegion r, InitialImageOperation.Entry entry, + ByteArrayDataInput in, DM mgr, final Version version) { // TODO Auto-generated method stub return false; } http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java index fedafc8..70f2f7c 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java @@ -32,7 +32,9 @@ import org.apache.geode.distributed.internal.DM; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.ByteArrayDataInput; import org.apache.geode.internal.InternalStatisticsDisabledException; +import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.AbstractRegionMap.ARMLockTestHook; +import org.apache.geode.internal.cache.InitialImageOperation.Entry; import org.apache.geode.internal.cache.lru.LRUEntry; import org.apache.geode.internal.cache.lru.NewLRUClockHand; import org.apache.geode.internal.cache.persistence.DiskRegionView; @@ -485,8 +487,8 @@ class ProxyRegionMap implements RegionMap { .toLocalizedString(DataPolicy.EMPTY)); } - public boolean fillInValue(LocalRegion r, InitialImageOperation.Entry entry, - ByteArrayDataInput in, DM mgr) { + public boolean fillInValue(LocalRegion r, Entry entry, ByteArrayDataInput in, DM mgr, + final Version version) { throw new UnsupportedOperationException( LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0 .toLocalizedString(DataPolicy.EMPTY)); http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java index fe0c190..2650626 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEntry.java @@ -25,6 +25,8 @@ import org.apache.geode.cache.TimeoutException; import org.apache.geode.distributed.internal.DM; import org.apache.geode.internal.ByteArrayDataInput; import org.apache.geode.internal.InternalStatisticsDisabledException; +import org.apache.geode.internal.Version; +import org.apache.geode.internal.cache.InitialImageOperation.Entry; import org.apache.geode.internal.cache.lru.NewLRUClockHand; import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.cache.versions.VersionStamp; @@ -179,8 +181,8 @@ public interface RegionEntry { * @since GemFire 3.2.1 */ public boolean fillInValue(LocalRegion r, - @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) InitialImageOperation.Entry entry, - ByteArrayDataInput in, DM mgr); + @Retained(ABSTRACT_REGION_ENTRY_FILL_IN_VALUE) Entry entry, ByteArrayDataInput in, DM mgr, + final Version version); /** * Returns true if this entry has overflowed to disk. http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java index fe11912..5b36b52 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidatingDiskRegion.java @@ -27,6 +27,7 @@ import org.apache.geode.cache.TimeoutException; import org.apache.geode.distributed.internal.DM; import org.apache.geode.internal.ByteArrayDataInput; import org.apache.geode.internal.InternalStatisticsDisabledException; +import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.DistributedRegion.DiskPosition; import org.apache.geode.internal.cache.InitialImageOperation.Entry; import org.apache.geode.internal.cache.lru.EnableLRU; @@ -339,7 +340,8 @@ public class ValidatingDiskRegion extends DiskRegion implements DiskRecoveryStor } @Override - public boolean fillInValue(LocalRegion r, Entry entry, ByteArrayDataInput in, DM mgr) { + public boolean fillInValue(LocalRegion r, Entry entry, ByteArrayDataInput in, DM mgr, + final Version version) { // TODO Auto-generated method stub return false; } http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java deleted file mode 100755 index 46896c4..0000000 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.internal.cache.tier.sockets; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.awaitility.Awaitility; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.client.Pool; -import org.apache.geode.cache.client.PoolManager; -import org.apache.geode.cache.client.internal.PoolImpl; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.cache.EventID; -import org.apache.geode.internal.cache.LocalRegion; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.NetworkUtils; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.standalone.VersionManager; -import org.apache.geode.test.junit.categories.BackwardCompatibilityTest; -import org.apache.geode.test.junit.categories.ClientServerTest; -import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; - -@Category({DistributedTest.class, ClientServerTest.class, BackwardCompatibilityTest.class}) -@RunWith(Parameterized.class) [email protected](CategoryWithParameterizedRunnerFactory.class) -public class ClientServerMiscBCDUnitTest extends ClientServerMiscDUnitTest { - @Parameterized.Parameters - public static Collection<String> data() { - List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent(); - if (result.size() < 1) { - throw new RuntimeException("No older versions of Geode were found to test against"); - } else { - System.out.println("running against these versions: " + result); - } - return result; - } - - public ClientServerMiscBCDUnitTest(String version) { - super(); - testVersion = version; - } - - @Test - public void testSubscriptionWithCurrentServerAndOldClients() throws Exception { - // start server first - int serverPort = initServerCache(true); - VM client1 = Host.getHost(0).getVM(testVersion, 1); - VM client2 = Host.getHost(0).getVM(testVersion, 3); - String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); - client1.invoke("create client1 cache", () -> { - createClientCache(hostname, serverPort); - populateCache(); - registerInterest(); - }); - client2.invoke("create client2 cache", () -> { - Pool ignore = createClientCache(hostname, serverPort); - }); - - client2.invoke("putting data in client2", () -> putForClient()); - - // client1 will receive client2's updates asynchronously - client1.invoke(() -> { - Region r2 = getCache().getRegion(REGION_NAME2); - MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); - Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> verifier.eventReceived); - }); - - // client2's update should have included a memberID - GEODE-2954 - client1.invoke(() -> { - Region r2 = getCache().getRegion(REGION_NAME2); - MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); - assertFalse(verifier.memberIDNotReceived); - }); - } - - @Test - public void testSubscriptionWithMixedServersAndNewPeerFeed() throws Exception { - doTestSubscriptionWithMixedServersAndPeerFeed(VersionManager.CURRENT_VERSION, true); - } - - @Test - public void testSubscriptionWithMixedServersAndOldPeerFeed() throws Exception { - doTestSubscriptionWithMixedServersAndPeerFeed(testVersion, true); - } - - @Test - public void testSubscriptionWithMixedServersAndOldClientFeed() throws Exception { - doTestSubscriptionWithMixedServersAndPeerFeed(testVersion, false); - } - - private void doTestSubscriptionWithMixedServersAndPeerFeed(String version, - boolean usePeerForFeed) { - server1 = Host.getHost(0).getVM(testVersion, 2); - server2 = Host.getHost(0).getVM(3); - VM server3 = Host.getHost(0).getVM(4); - VM interestClient = Host.getHost(0).getVM(testVersion, 0); - VM feeder = Host.getHost(0).getVM(version, 1); - - // start servers first - int server1Port = initServerCache(true); - - int server2Port = initServerCache2(true); - - int server3Port = server3.invoke(() -> createServerCache(true, getMaxThreads(), false)); - - System.out.println("old server is vm 2 and new server is vm 3"); - System.out - .println("old server port is " + server1Port + " and new server port is " + server2Port); - - String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); - interestClient.invoke("create interestClient cache", () -> { - createClientCache(hostname, 300000, false, server1Port, server2Port, server3Port); - populateCache(); - registerInterest(); - }); - - if (!usePeerForFeed) { - feeder.invoke("create client cache for feed", () -> { - Pool ignore = createClientCache(hostname, server1Port); - }); - } - feeder.invoke("putting data in feeder", () -> putForClient()); - - // interestClient will receive feeder's updates asynchronously - interestClient.invoke("verification 1", () -> { - Region r2 = getCache().getRegion(REGION_NAME2); - MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); - Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> verifier.eventReceived); - verifier.reset(); - }); - - server1.invoke("shutdown old server", () -> { - getCache().getDistributedSystem().disconnect(); - }); - - server2.invoke("wait for failover queue to drain", () -> { - CacheClientProxy proxy = - CacheClientNotifier.getInstance().getClientProxies().iterator().next(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { - proxy.getHARegionQueue().isEmpty(); - }); - }); - - // the client should now get duplicate events from the current-version server - interestClient.invoke("verification 2", () -> { - Cache cache = getCache(); - Region r2 = cache.getRegion(REGION_NAME2); - MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); - assertFalse(verifier.eventReceived); // no duplicate events should have arrived - PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool"); - - Map seqMap = pool.getThreadIdToSequenceIdMap(); - assertEquals(3, seqMap.size()); // one for each server and one for the feed - verifier.reset(); - }); - - server2.invoke("shutdown new server", () -> { - getCache().getDistributedSystem().disconnect(); - }); - - server3.invoke("wait for failover queue to drain", () -> { - CacheClientProxy proxy = - CacheClientNotifier.getInstance().getClientProxies().iterator().next(); - Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { - proxy.getHARegionQueue().isEmpty(); - }); - }); - - // the client should now get duplicate events from the current-version server - interestClient.invoke("verification 3", () -> { - Cache cache = getCache(); - Region r2 = cache.getRegion(REGION_NAME2); - MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); - assertFalse(verifier.eventReceived); // no duplicate events should have arrived - PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool"); - - Map seqMap = pool.getThreadIdToSequenceIdMap(); - assertEquals(4, seqMap.size()); // one for each server and one for the feed - }); - } - - @Test - public void testDistributedMemberBytesWithCurrentServerAndOldClient() throws Exception { - // Start current version server - int serverPort = initServerCache(true); - - // Start old version client and do puts - VM client = Host.getHost(0).getVM(testVersion, 1); - String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); - client.invoke("create client cache", () -> { - createClientCache(hostname, serverPort); - populateCache(); - }); - - // Get client member id byte array on client - byte[] clientMembershipIdBytesOnClient = - client.invoke(() -> getClientMembershipIdBytesOnClient()); - - // Get client member id byte array on server - byte[] clientMembershipIdBytesOnServer = - server1.invoke(() -> getClientMembershipIdBytesOnServer()); - - // Verify member id bytes on client and server are equal - String complaint = "size on client=" + clientMembershipIdBytesOnClient.length - + "; size on server=" + clientMembershipIdBytesOnServer.length + "\nclient bytes=" - + Arrays.toString(clientMembershipIdBytesOnClient) + "\nserver bytes=" - + Arrays.toString(clientMembershipIdBytesOnServer); - assertTrue(complaint, - Arrays.equals(clientMembershipIdBytesOnClient, clientMembershipIdBytesOnServer)); - } - - private byte[] getClientMembershipIdBytesOnClient() { - DistributedSystem system = getCache().getDistributedSystem(); - byte[] result = - EventID.getMembershipId(new ClientProxyMembershipID(system.getDistributedMember())); - System.out.println("client ID bytes are " + Arrays.toString(result)); - return result; - } - - private byte[] getClientMembershipIdBytesOnServer() { - Set cpmIds = ClientHealthMonitor.getInstance().getClientHeartbeats().keySet(); - assertEquals(1, cpmIds.size()); - ClientProxyMembershipID cpmId = (ClientProxyMembershipID) cpmIds.iterator().next(); - System.out.println("client ID on server is " + cpmId.getDistributedMember()); - byte[] result = EventID.getMembershipId(cpmId); - System.out.println("client ID bytes are " + Arrays.toString(result)); - return result; - } -} http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-cq/build.gradle ---------------------------------------------------------------------- diff --git a/geode-cq/build.gradle b/geode-cq/build.gradle index b3a8449..7412dad 100644 --- a/geode-cq/build.gradle +++ b/geode-cq/build.gradle @@ -19,5 +19,6 @@ dependencies { provided project(':geode-core') testCompile files(project(':geode-core').sourceSets.test.output) + testCompile project(':geode-old-versions') testCompile project(':geode-junit') } http://git-wip-us.apache.org/repos/asf/geode/blob/2cf335c1/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java new file mode 100755 index 0000000..37aeaa4 --- /dev/null +++ b/geode-cq/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscBCDUnitTest.java @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.tier.sockets; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import io.codearte.catchexception.shade.mockito.Mockito; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.geode.cache.query.CqAttributesFactory; +import org.apache.geode.cache.query.CqListener; +import org.apache.geode.cache.query.CqQuery; +import org.apache.geode.cache.server.CacheServer; +import org.awaitility.Awaitility; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.Pool; +import org.apache.geode.cache.client.PoolManager; +import org.apache.geode.cache.client.internal.PoolImpl; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.LocalRegion; +import org.apache.geode.test.dunit.Host; +import org.apache.geode.test.dunit.NetworkUtils; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.standalone.VersionManager; +import org.apache.geode.test.junit.categories.BackwardCompatibilityTest; +import org.apache.geode.test.junit.categories.ClientServerTest; +import org.apache.geode.test.junit.categories.DistributedTest; +import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; + +@Category({DistributedTest.class, ClientServerTest.class, BackwardCompatibilityTest.class}) +@RunWith(Parameterized.class) [email protected](CategoryWithParameterizedRunnerFactory.class) +public class ClientServerMiscBCDUnitTest extends ClientServerMiscDUnitTest { + @Parameterized.Parameters + public static Collection<String> data() { + List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent(); + if (result.size() < 1) { + throw new RuntimeException("No older versions of Geode were found to test against"); + } else { + System.out.println("running against these versions: " + result); + } + return result; + } + + public ClientServerMiscBCDUnitTest(String version) { + super(); + testVersion = version; + } + + @Test + public void testSubscriptionWithCurrentServerAndOldClients() throws Exception { + // start server first + int serverPort = initServerCache(true); + VM client1 = Host.getHost(0).getVM(testVersion, 1); + VM client2 = Host.getHost(0).getVM(testVersion, 3); + String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); + client1.invoke("create client1 cache", () -> { + createClientCache(hostname, serverPort); + populateCache(); + registerInterest(); + }); + client2.invoke("create client2 cache", () -> { + Pool ignore = createClientCache(hostname, serverPort); + }); + + client2.invoke("putting data in client2", () -> putForClient()); + + // client1 will receive client2's updates asynchronously + client1.invoke(() -> { + Region r2 = getCache().getRegion(REGION_NAME2); + MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> verifier.eventReceived); + }); + + // client2's update should have included a memberID - GEODE-2954 + client1.invoke(() -> { + Region r2 = getCache().getRegion(REGION_NAME2); + MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); + assertFalse(verifier.memberIDNotReceived); + }); + } + + @Test + public void testSubscriptionWithMixedServersAndNewPeerFeed() throws Exception { + doTestSubscriptionWithMixedServersAndPeerFeed(VersionManager.CURRENT_VERSION, true); + } + + @Test + public void testSubscriptionWithMixedServersAndOldPeerFeed() throws Exception { + doTestSubscriptionWithMixedServersAndPeerFeed(testVersion, true); + } + + @Test + public void testSubscriptionWithMixedServersAndOldClientFeed() throws Exception { + doTestSubscriptionWithMixedServersAndPeerFeed(testVersion, false); + } + + private void doTestSubscriptionWithMixedServersAndPeerFeed(String version, + boolean usePeerForFeed) { + server1 = Host.getHost(0).getVM(testVersion, 2); + server2 = Host.getHost(0).getVM(3); + VM server3 = Host.getHost(0).getVM(4); + VM interestClient = Host.getHost(0).getVM(testVersion, 0); + VM feeder = Host.getHost(0).getVM(version, 1); + + // start servers first + int server1Port = initServerCache(true); + + int server2Port = initServerCache2(true); + + int server3Port = server3.invoke(() -> createServerCache(true, getMaxThreads(), false)); + + System.out.println("old server is vm 2 and new server is vm 3"); + System.out + .println("old server port is " + server1Port + " and new server port is " + server2Port); + + String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); + interestClient.invoke("create interestClient cache", () -> { + createClientCache(hostname, 300000, false, server1Port, server2Port, server3Port); + populateCache(); + registerInterest(); + }); + + if (!usePeerForFeed) { + feeder.invoke("create client cache for feed", () -> { + Pool ignore = createClientCache(hostname, server1Port); + }); + } + feeder.invoke("putting data in feeder", () -> putForClient()); + + // interestClient will receive feeder's updates asynchronously + interestClient.invoke("verification 1", () -> { + Region r2 = getCache().getRegion(REGION_NAME2); + MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> verifier.eventReceived); + verifier.reset(); + }); + + server1.invoke("shutdown old server", () -> { + getCache().getDistributedSystem().disconnect(); + }); + + server2.invoke("wait for failover queue to drain", () -> { + CacheClientProxy proxy = + CacheClientNotifier.getInstance().getClientProxies().iterator().next(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + proxy.getHARegionQueue().isEmpty(); + }); + }); + + // the client should now get duplicate events from the current-version server + interestClient.invoke("verification 2", () -> { + Cache cache = getCache(); + Region r2 = cache.getRegion(REGION_NAME2); + MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); + assertFalse(verifier.eventReceived); // no duplicate events should have arrived + PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool"); + + Map seqMap = pool.getThreadIdToSequenceIdMap(); + assertEquals(3, seqMap.size()); // one for each server and one for the feed + verifier.reset(); + }); + + server2.invoke("shutdown new server", () -> { + getCache().getDistributedSystem().disconnect(); + }); + + server3.invoke("wait for failover queue to drain", () -> { + CacheClientProxy proxy = + CacheClientNotifier.getInstance().getClientProxies().iterator().next(); + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + proxy.getHARegionQueue().isEmpty(); + }); + }); + + // the client should now get duplicate events from the current-version server + interestClient.invoke("verification 3", () -> { + Cache cache = getCache(); + Region r2 = cache.getRegion(REGION_NAME2); + MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); + assertFalse(verifier.eventReceived); // no duplicate events should have arrived + PoolImpl pool = (PoolImpl) PoolManager.find("ClientServerMiscDUnitTestPool"); + + Map seqMap = pool.getThreadIdToSequenceIdMap(); + assertEquals(4, seqMap.size()); // one for each server and one for the feed + }); + } + + @Test + public void giiEventQueueFromOldToCurrentMemberShouldSucceed() { + giiEventQueueShouldSucceedWithMixedVersions(testVersion, VersionManager.CURRENT_VERSION); + } + + @Test + public void giiEventQueueFromCurrentToOldMemberShouldSucceed() { + giiEventQueueShouldSucceedWithMixedVersions(VersionManager.CURRENT_VERSION, testVersion); + + } + + public void giiEventQueueShouldSucceedWithMixedVersions(String server1Version, + String server2Version) { + VM interestClient = Host.getHost(0).getVM(testVersion, 0); + // VM interestClient = Host.getHost(0).getVM(0); + VM feeder = Host.getHost(0).getVM(1); + server1 = Host.getHost(0).getVM(server1Version, 2); + server2 = Host.getHost(0).getVM(server2Version, 3); + + // start servers first + int server1Port = initServerCache(true, server1, true); + int server2Port = initServerCache(true, server2, true); + server2.invoke(() -> { + getCache().getCacheServers().stream().forEach(CacheServer::stop); + }); + + + String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); + interestClient.invoke("create interestClient cache", () -> { + createClientCache(hostname, 300000, false, server1Port, server2Port); + registerInterest(); + registerCQ(); + }); + + feeder.invoke("putting data in feeder", () -> putForClient()); + + // Start server 2 + server2.invoke(() -> { + for (CacheServer server : getCache().getCacheServers()) { + server.start(); + } + }); + + // Make sure server 2 copies the queue + server2.invoke(() -> { + Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> { + final Collection<CacheClientProxy> clientProxies = + CacheClientNotifier.getInstance().getClientProxies(); + assertFalse(clientProxies.isEmpty()); + CacheClientProxy proxy = clientProxies.iterator().next(); + assertFalse(proxy.getHARegionQueue().isEmpty()); + }); + }); + + // interestClient will receive feeder's updates asynchronously + interestClient.invoke("verification 1", () -> { + Region r2 = getCache().getRegion(REGION_NAME2); + MemberIDVerifier verifier = (MemberIDVerifier) ((LocalRegion) r2).getCacheListener(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> verifier.eventReceived); + verifier.reset(); + }); + + server1.invoke("shutdown old server", () -> { + getCache().getDistributedSystem().disconnect(); + }); + + server2.invoke("wait for failover queue to drain", () -> { + CacheClientProxy proxy = + CacheClientNotifier.getInstance().getClientProxies().iterator().next(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + proxy.getHARegionQueue().isEmpty(); + }); + }); + } + + public static void registerCQ() throws Exception { + Cache cache = new ClientServerMiscDUnitTest().getCache(); + Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME2); + assertNotNull(r); + CqAttributesFactory cqAttributesFactory = new CqAttributesFactory(); + cqAttributesFactory.addCqListener(Mockito.mock(CqListener.class)); + final CqQuery cq = cache.getQueryService().newCq("testCQ", "select * from " + r.getFullPath(), + cqAttributesFactory.create()); + cq.execute(); + } + + @Test + public void testDistributedMemberBytesWithCurrentServerAndOldClient() throws Exception { + // Start current version server + int serverPort = initServerCache(true); + + // Start old version client and do puts + VM client = Host.getHost(0).getVM(testVersion, 1); + String hostname = NetworkUtils.getServerHostName(Host.getHost(0)); + client.invoke("create client cache", () -> { + createClientCache(hostname, serverPort); + populateCache(); + }); + + // Get client member id byte array on client + byte[] clientMembershipIdBytesOnClient = + client.invoke(() -> getClientMembershipIdBytesOnClient()); + + // Get client member id byte array on server + byte[] clientMembershipIdBytesOnServer = + server1.invoke(() -> getClientMembershipIdBytesOnServer()); + + // Verify member id bytes on client and server are equal + String complaint = "size on client=" + clientMembershipIdBytesOnClient.length + + "; size on server=" + clientMembershipIdBytesOnServer.length + "\nclient bytes=" + + Arrays.toString(clientMembershipIdBytesOnClient) + "\nserver bytes=" + + Arrays.toString(clientMembershipIdBytesOnServer); + assertTrue(complaint, + Arrays.equals(clientMembershipIdBytesOnClient, clientMembershipIdBytesOnServer)); + } + + private byte[] getClientMembershipIdBytesOnClient() { + DistributedSystem system = getCache().getDistributedSystem(); + byte[] result = + EventID.getMembershipId(new ClientProxyMembershipID(system.getDistributedMember())); + System.out.println("client ID bytes are " + Arrays.toString(result)); + return result; + } + + private byte[] getClientMembershipIdBytesOnServer() { + Set cpmIds = ClientHealthMonitor.getInstance().getClientHeartbeats().keySet(); + assertEquals(1, cpmIds.size()); + ClientProxyMembershipID cpmId = (ClientProxyMembershipID) cpmIds.iterator().next(); + System.out.println("client ID on server is " + cpmId.getDistributedMember()); + byte[] result = EventID.getMembershipId(cpmId); + System.out.println("client ID bytes are " + Arrays.toString(result)); + return result; + } +}
