This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-7702 in repository https://gitbox.apache.org/repos/asf/geode.git
commit b868cf60559b3d712018ac75eaabd13f011751f7 Author: zhouxh <[email protected]> AuthorDate: Mon Apr 27 23:55:40 2020 -0700 GEODE-7702: bulkOp from accessor or NORMAL should sync with clear --- .../cache30/DistributedAckRegionCCEDUnitTest.java | 1 - .../geode/cache30/PutAllMultiVmDUnitTest.java | 92 +++++++++++++++++++++- .../apache/geode/internal/cache/LocalRegion.java | 4 +- .../geode/internal/cache/LocalRegionDataView.java | 7 ++ .../internal/cache/tx/RemotePutAllMessage.java | 2 + .../internal/cache/tx/RemoteRemoveAllMessage.java | 2 + 6 files changed, 101 insertions(+), 7 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java index 25a98cd..bd606c9 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/DistributedAckRegionCCEDUnitTest.java @@ -264,7 +264,6 @@ public class DistributedAckRegionCCEDUnitTest extends DistributedAckRegionDUnitT } @Test - @Ignore("Enable after fix for bug GEODE-1891") public void testClearOnNonReplicateWithConcurrentEvents() { versionTestClearOnNonReplicateWithConcurrentEvents(); } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java index 51715a4..fc3f331 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache30/PutAllMultiVmDUnitTest.java @@ -20,12 +20,17 @@ */ package org.apache.geode.cache30; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Random; +import java.util.Set; import java.util.TreeMap; import org.junit.Test; @@ -41,6 +46,7 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.RegionDestroyedException; import org.apache.geode.cache.Scope; import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.Invoke; import org.apache.geode.test.dunit.SerializableRunnable; @@ -63,8 +69,8 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO: Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); - vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache()); - vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache()); + vm0.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE)); + vm1.invoke(() -> PutAllMultiVmDUnitTest.createCache(DataPolicy.REPLICATE)); } @Override @@ -83,15 +89,15 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO: }); } - public static void createCache() { + public static void createCache(DataPolicy dataPolicy) { try { ds = (new PutAllMultiVmDUnitTest()).getSystem(props); cache = CacheFactory.create(ds); AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setDataPolicy(dataPolicy); RegionAttributes attr = factory.create(); region = cache.createRegion("map", attr); - } catch (Exception ex) { ex.printStackTrace(); } @@ -120,8 +126,86 @@ public class PutAllMultiVmDUnitTest extends JUnit4DistributedTestCase { // TODO: } }// end of closeCache + private AsyncInvocation invokeClear(VM vm) { + AsyncInvocation async = vm.invokeAsync(() -> region.clear()); + return async; + } + + private AsyncInvocation invokeBulkOp(VM vm) { + AsyncInvocation async = vm.invokeAsync(() -> { + Map m = new HashMap(); + for (int i = 0; i < 20; i++) { + m.put(new Integer(i), new String("map" + i)); + } + region.putAll(m); + + HashSet m2 = new HashSet(); + for (int i = 0; i < 10; i++) { + m2.add(new Integer(i)); + } + region.removeAll(m2); + }); + return async; + } + + private void testBulkOpFromNonDataStore(final DataPolicy dataPolicy) throws InterruptedException { + Host host = Host.getHost(0); + VM vm0 = host.getVM(0); + VM vm1 = host.getVM(1); + VM vm2 = host.getVM(2); + + vm2.invoke(() -> PutAllMultiVmDUnitTest.createCache(dataPolicy)); + Random rand = new Random(); + for (int k = 0; k < 100; k++) { + int shuffle = rand.nextInt(2); + AsyncInvocation a1 = null; + AsyncInvocation a2 = null; + if (shuffle == 1) { + a1 = invokeClear(vm1); + a2 = invokeBulkOp(vm2); + } else { + a2 = invokeBulkOp(vm2); + a1 = invokeClear(vm1); + } + a1.await(); + a2.await(); + + // verify vm0 and vm1 has the same keys + await().untilAsserted(() -> { + Set vm0Contents = vm0.invoke(() -> { + final HashSet<Object> keys = new HashSet<>(); + for (Object o : region.keySet()) { + keys.add(o); + } + return keys; + }); // replicated + Set vm1Contents = vm1.invoke(() -> { + final HashSet<Object> keys = new HashSet<>(); + for (Object o : region.keySet()) { + keys.add(o); + } + return keys; + }); // replicated + assertThat(vm0Contents).isEqualTo(vm1Contents); + }); + } + } // tests methods + @Test + public void testPutAllFromAccessor() throws InterruptedException { + testBulkOpFromNonDataStore(DataPolicy.EMPTY); + } + + @Test + public void testPutAllFromNormal() throws InterruptedException { + testBulkOpFromNonDataStore(DataPolicy.NORMAL); + } + + @Test + public void testPutAllFromPreload() throws InterruptedException { + testBulkOpFromNonDataStore(DataPolicy.PRELOADED); + } @Test public void testSimplePutAll() { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 8559f77..b075b3b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -9359,7 +9359,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, * putAll completes. This won't work for non-replicate regions though since they uses one-hop * during basicPutPart2 to get a valid version tag. */ - private void lockRVVForBulkOp() { + public void lockRVVForBulkOp() { ARMLockTestHook testHook = getRegionMap().getARMLockTestHook(); if (testHook != null) { testHook.beforeBulkLock(this); @@ -9374,7 +9374,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } } - private void unlockRVVForBulkOp() { + public void unlockRVVForBulkOp() { ARMLockTestHook testHook = getRegionMap().getARMLockTestHook(); if (testHook != null) { testHook.beforeBulkRelease(this); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java index 36ab278..f7f442b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegionDataView.java @@ -17,6 +17,7 @@ package org.apache.geode.internal.cache; import java.util.Collection; import java.util.Set; +import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.Region; import org.apache.geode.cache.Region.Entry; @@ -352,6 +353,9 @@ public class LocalRegionDataView implements InternalDataView { successfulPuts.clear(); putallOp.fillVersionedObjectList(successfulPuts); } + if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) { + return; + } // BR & DR's putAll long token = -1; try { @@ -374,6 +378,9 @@ public class LocalRegionDataView implements InternalDataView { successfulOps.clear(); op.fillVersionedObjectList(successfulOps); } + if (reg.getDataPolicy() == DataPolicy.NORMAL || reg.getDataPolicy() == DataPolicy.PRELOADED) { + return; + } // BR, DR's removeAll long token = -1; try { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java index 7399969..54b6a6d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemotePutAllMessage.java @@ -363,6 +363,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply { final DistributedPutAllOperation dpao = new DistributedPutAllOperation(baseEvent, putAllDataCount, false); try { + r.lockRVVForBulkOp(); final VersionedObjectList versions = new VersionedObjectList(putAllDataCount, true, dr.getConcurrencyChecksEnabled()); dr.syncBulkOp(new Runnable() { @@ -397,6 +398,7 @@ public class RemotePutAllMessage extends RemoteOperationMessageWithDirectReply { this.putAllDataCount); return false; } finally { + r.unlockRVVForBulkOp(); dpao.freeOffHeapResources(); } } finally { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java index a2e95a3..beb202d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/RemoteRemoveAllMessage.java @@ -355,6 +355,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl final DistributedRemoveAllOperation op = new DistributedRemoveAllOperation(baseEvent, removeAllDataCount, false); try { + r.lockRVVForBulkOp(); final VersionedObjectList versions = new VersionedObjectList(removeAllDataCount, true, dr.getConcurrencyChecksEnabled()); dr.syncBulkOp(new Runnable() { @@ -391,6 +392,7 @@ public class RemoteRemoveAllMessage extends RemoteOperationMessageWithDirectRepl this.removeAllDataCount); return false; } finally { + r.unlockRVVForBulkOp(); op.freeOffHeapResources(); } } finally {
