This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-6061
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6061 by this 
push:
     new fa28a1f  GEODE-6061: Add more test coverage on function with 
transaction.
fa28a1f is described below

commit fa28a1f2df3def7223ca2aa9ca9547456e24ae63
Author: eshu <e...@pivotal.io>
AuthorDate: Thu Nov 15 14:44:41 2018 -0800

    GEODE-6061: Add more test coverage on function with transaction.
---
 ...ionExecutionWithTransactionDistributedTest.java | 764 +++++++++++++++++++++
 1 file changed, 764 insertions(+)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionWithTransactionDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionWithTransactionDistributedTest.java
new file mode 100644
index 0000000..cfac74e
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionWithTransactionDistributedTest.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.assertj.core.api.Java6Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class FunctionExecutionWithTransactionDistributedTest implements
+    Serializable {
+
+  private String hostName;
+  private String uniqueName;
+  private String regionName;
+  private String replicateRegionName;
+  private VM server1;
+  private VM server2;
+  private VM server3;
+  private VM accessor;
+  private int port1;
+  private int port2;
+  private int port3;
+  private DistributedMember distributedMember;
+  private DistributedMember distributedMember2;
+  private transient PoolImpl pool;
+
+  private static final int KEY1 = 1;
+  private static final int KEY2 = 2;
+  private static final int KEY3 = 3;
+  private static final int KEY4 = 4;
+  private final String value = "value";
+
+  private enum Type {
+    ON_REGION, ON_MEMBER
+  }
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule();
+
+  @Rule
+  public DistributedDiskDirRule distributedDiskDir = new 
DistributedDiskDirRule();
+
+  @Rule
+  public CacheRule cacheRule = new CacheRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Before
+  public void setup() throws Exception {
+    accessor = getVM(0);
+    server1 = getVM(1);
+    server2 = getVM(2);
+    server3 = getVM(3);
+
+    hostName = getHostName();
+    uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+    regionName = uniqueName + "_region";
+    replicateRegionName = uniqueName + "_replicateRegion";
+  }
+
+  @Test
+  public void clientCanRollbackTransactionWithMultipleFunctions() {
+    port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+    port2 = server2.invoke(() -> createServerRegions(1, 3, false));
+    port3 = server3.invoke(() -> createServerRegions(1, 3, false));
+    createClientRegions(true, port1, port2, port3);
+
+    doMultipleFunctionsOnClient();
+
+    verifyDataOnServers();
+  }
+
+  private void doMultipleFunctionsOnClient() {
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    Region replicateRegion = 
clientCacheRule.getClientCache().getRegion(replicateRegionName);
+    doPuts(region);
+    doPuts(replicateRegion);
+    CacheTransactionManager txManager =
+        clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+    doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+    txManager.rollback();
+  }
+
+  @Test
+  public void clientCanRollbackTransactionWithOnReplicateRegionFunction() {
+    port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+    port2 = server2.invoke(() -> createServerRegions(1, 3, false));
+    port3 = server3.invoke(() -> createServerRegions(1, 3, false));
+    createClientRegions(true, port1, port2, port3);
+
+    Region replicateRegion = 
clientCacheRule.getClientCache().getRegion(replicateRegionName);
+    doPuts(replicateRegion);
+    CacheTransactionManager txManager =
+        clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+    txManager.rollback();
+
+    verifyReplicateRegionDataOnServers();
+  }
+
+  private void verifyReplicateRegionDataOnServers() {
+    server1.invoke(() -> 
verifyData(cacheRule.getCache().getRegion(replicateRegionName)));
+    server2.invoke(() -> 
verifyData(cacheRule.getCache().getRegion(replicateRegionName)));
+    server3.invoke(() -> 
verifyData(cacheRule.getCache().getRegion(replicateRegionName)));
+  }
+
+  @Test
+  public void 
clientConnectToAccessorCanRollbackTransactionWithMultipleFunctions() {
+    port1 = accessor.invoke(() -> createServerRegions(1, 3, true));
+    setupServerRegions();
+    createClientRegions(true, port1);
+
+    doMultipleFunctionsOnClient();
+
+    verifyDataOnServers();
+  }
+
+  @Test
+  public void 
clientFunctionOnPartitionedRegionWithTransactionFailsIfWithoutFilter() {
+    accessor.invoke(() -> createServerRegions(1, 3, true));
+    port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+    server2.invoke(() -> createServerRegions(1, 3, false));
+    server3.invoke(() -> createServerRegions(1, 3, false));
+    createClientRegions(true, port1);
+
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    doPuts(region);
+    CacheTransactionManager txManager =
+        clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    Throwable caughtException = catchThrowable(() -> doFunction(region,
+        new MyPartitionRegionFunction(), Type.ON_REGION, false));
+    assertThat(caughtException).isInstanceOf(FunctionException.class);
+    
assertThat(caughtException.getCause()).isInstanceOf(TransactionException.class)
+        .hasMessage("Function inside a transaction cannot execute on more than 
one node");
+    txManager.rollback();
+
+    verifyPartitionedRegionOnServers();
+  }
+
+  private void verifyPartitionedRegionOnServers() {
+    server1.invoke(() -> 
verifyData(cacheRule.getCache().getRegion(regionName)));
+    server2.invoke(() -> 
verifyData(cacheRule.getCache().getRegion(regionName)));
+    server3.invoke(() -> 
verifyData(cacheRule.getCache().getRegion(regionName)));
+  }
+
+  @Test
+  public void clientCanRollbackTransactionsWithMultipleColocatedKeysFilter() {
+    port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+    port2 = server2.invoke(() -> createServerRegions(1, 3, false));
+    port3 = server3.invoke(() -> createServerRegions(1, 3, false));
+    createClientRegions(true, port1, port2, port3);
+
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    Region replicateRegion = 
clientCacheRule.getClientCache().getRegion(replicateRegionName);
+    doPuts(region);
+    doPuts(replicateRegion);
+    CacheTransactionManager txManager =
+        clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true, 
true, true);
+    doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+    txManager.rollback();
+
+    verifyDataOnServers();
+  }
+
+  @Test
+  public void 
clientFunctionWithTransactionsFailsIfWithMultipleNonColocatedKeysFilter() {
+    port1 = accessor.invoke(() -> createServerRegions(1, 3, true));
+    setupServerRegions();
+    createClientRegions(true, port1);
+
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    doPuts(region);
+    CacheTransactionManager txManager =
+        clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    Throwable caughtException = catchThrowable(() -> doFunction(region,
+        new MyPartitionRegionFunction(), Type.ON_REGION, true, true, false));
+    assertThat(caughtException).isInstanceOf(FunctionException.class);
+    
assertThat(caughtException.getCause()).isInstanceOf(TransactionException.class)
+        .hasMessage("Function inside a transaction cannot execute on more than 
one node");
+
+    txManager.rollback();
+
+    verifyPartitionedRegionOnServers();
+  }
+
+  @Test
+  public void clientCanRollbackTransactionWithNonColocatedFunctions() {
+    port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+    server2.invoke(() -> createServerRegions(1, 3, false));
+    server3.invoke(() -> createServerRegions(1, 3, false));
+    createClientRegions(true, port1);
+
+    Region region = clientCacheRule.getClientCache().getRegion(regionName);
+    Region replicateRegion = 
clientCacheRule.getClientCache().getRegion(replicateRegionName);
+    doPuts(region);
+    doPuts(replicateRegion);
+    CacheTransactionManager txManager =
+        clientCacheRule.getClientCache().getCacheTransactionManager();
+    txManager.begin();
+    doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+    doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+
+    Throwable caughtException = catchThrowable(() -> doFunction2(region,
+        new MyPartitionRegionFunction(), Type.ON_REGION));
+    assertThat(caughtException).isInstanceOf(FunctionException.class);
+    
assertThat(caughtException.getCause()).isInstanceOf(TransactionDataRebalancedException.class)
+        .hasMessageContaining("Function execution is not colocated with 
transaction.");
+
+    txManager.rollback();
+
+    verifyDataOnServers();
+  }
+
+  @Test
+  public void memberCanRollbackTransactionWithMultipleOnRegionFunctions() {
+    setupServerRegions();
+    server2.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      Region replicateRegion = 
cacheRule.getCache().getRegion(replicateRegionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, 
true);
+      doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+      txManager.rollback();
+    });
+
+    verifyDataOnServers();
+  }
+
+  private void verifyDataOnServers() {
+    server1.invoke(() -> verifyData());
+    server2.invoke(() -> verifyData());
+    server3.invoke(() -> verifyData());
+  }
+
+  private void setupServerRegions() {
+    setupServerRegions(false);
+  }
+
+  private void setupServerRegions(boolean doPut) {
+    int redundantCopies = 1;
+    server1.invoke(() -> {
+      createServerRegions(redundantCopies, 3, false);
+      if (doPut) {
+        cacheRule.getCache().getRegion(regionName).put(KEY1, value + KEY1);
+      }
+    });
+    server2.invoke(() -> {
+      createServerRegions(redundantCopies, 3, false);
+      if (doPut) {
+        cacheRule.getCache().getRegion(regionName).put(KEY2, value + KEY2);
+      }
+    });
+    server3.invoke(() -> {
+      createServerRegions(redundantCopies, 3, false);
+      if (doPut) {
+        cacheRule.getCache().getRegion(regionName).put(KEY3, value + KEY3);
+      }
+    });
+  }
+
+  @Test
+  public void memberCanRollbackTransactionWithOnReplicateRegionFunction() {
+    setupServerRegions();
+    server2.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region replicateRegion = 
cacheRule.getCache().getRegion(replicateRegionName);
+      doPuts(replicateRegion);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+      txManager.rollback();
+    });
+
+    verifyReplicateRegionDataOnServers();
+  }
+
+  @Test
+  public void accessorCanRollbackTransactionWithMultipleFunctions() {
+    accessor.invoke(() -> createServerRegions(1, 3, true));
+    setupServerRegions();
+    accessor.invoke(() -> doPuts());
+
+    accessor.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      Region replicateRegion = 
cacheRule.getCache().getRegion(replicateRegionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, 
true);
+      doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+      txManager.rollback();
+    });
+
+    verifyDataOnServers();
+  }
+
+  @Test
+  public void 
memberFunctionOnPartitionedRegionWithTransactionFailsIfWithoutFilter() {
+    setupServerRegions();
+    server3.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      doPuts(region);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      Throwable caughtException = catchThrowable(() -> doFunction(region,
+          new MyPartitionRegionFunction(), Type.ON_REGION, false));
+      assertThat(caughtException).isInstanceOf(TransactionException.class)
+          .hasMessage("Function inside a transaction cannot execute on more 
than one node");
+      txManager.rollback();
+    });
+
+    verifyPartitionedRegionOnServers();
+  }
+
+  @Test
+  public void memberCanRollbackTransactionsWithMultipleColocatedKeysFilter() {
+    setupServerRegions();
+    server2.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      Region replicateRegion = 
cacheRule.getCache().getRegion(replicateRegionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, 
true, true, true);
+      doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+      txManager.rollback();
+    });
+
+    verifyDataOnServers();
+  }
+
+  @Test
+  public void 
memberFunctionWithTransactionsFailsIfWithMultipleNonColocatedKeysFilter() {
+    setupServerRegions();
+    server1.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      Throwable caughtException = catchThrowable(() -> doFunction(region,
+          new MyPartitionRegionFunction(), Type.ON_REGION, true, true, false));
+      assertThat(caughtException).isInstanceOf(TransactionException.class)
+          .hasMessage("Function inside a transaction cannot execute on more 
than one node");
+
+      txManager.rollback();
+    });
+
+    verifyPartitionedRegionOnServers();
+  }
+
+  @Test
+  public void memberTransactionFailsWithNonColocatedFunctions() {
+    setupServerRegions();
+    server1.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      Region replicateRegion = 
cacheRule.getCache().getRegion(replicateRegionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, 
true);
+      doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+
+      Throwable caughtException = catchThrowable(() -> doFunction2(region,
+          new MyPartitionRegionFunction(), Type.ON_REGION));
+      
assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class)
+          .hasMessageContaining("Function execution is not colocated with 
transaction.");
+
+      txManager.rollback();
+    });
+
+    verifyDataOnServers();
+  }
+
+  @Test
+  public void transactionFailsIfIsOnMembersFunction() {
+    setupServerRegions();
+    server1.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      Throwable caughtException = catchThrowable(() -> doFunction(region,
+          new MyPartitionRegionFunction(), Type.ON_MEMBER, false));
+      assertThat(caughtException).isInstanceOf(TransactionException.class)
+          .hasMessage("Function inside a transaction cannot execute on more 
than one node");
+
+      txManager.rollback();
+    });
+
+    verifyPartitionedRegionOnServers();
+  }
+
+  @Test
+  public void transactionFailsIfIsOnMultipleMembersFunction() {
+    setupServerRegions();
+    distributedMember = server1.invoke(() -> {
+      return 
cacheRule.getCache().getDistributedSystem().getDistributedMember();
+    });
+    distributedMember2 = server2.invoke(() -> {
+      return 
cacheRule.getCache().getDistributedSystem().getDistributedMember();
+    });
+    server1.invoke(() -> doPuts());
+
+    server1.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      Throwable caughtException = catchThrowable(() -> doFunction(region,
+          new MyPartitionRegionFunction(), Type.ON_MEMBER, false));
+      assertThat(caughtException).isInstanceOf(TransactionException.class)
+          .hasMessage("Function inside a transaction cannot execute on more 
than one node");
+
+      txManager.rollback();
+    });
+
+    verifyPartitionedRegionOnServers();
+  }
+
+  @Test
+  public void accessorCanRollbackTransactionWithOnMemberFunctions() {
+    accessor.invoke(() -> createServerRegions(1, 3, true));
+    setupServerRegions(true);
+    distributedMember = server1.invoke(() -> {
+      return 
cacheRule.getCache().getDistributedSystem().getDistributedMember();
+    });
+    server2.invoke(() -> doPuts());
+
+    accessor.invoke(() -> {
+      Region region = cacheRule.getCache().getRegion(regionName);
+      CacheTransactionManager txManager =
+          cacheRule.getCache().getCacheTransactionManager();
+      txManager.begin();
+      doFunction(region, new MyPartitionRegionFunction(), Type.ON_MEMBER, 
false);
+      txManager.rollback();
+    });
+
+    verifyPartitionedRegionOnServers();
+  }
+
+  @Test
+  public void memberCanRollbackTransactionWithOnRegionAndOnMemberFunctions() {
+    setupServerRegions(true);
+    distributedMember = server1.invoke(() -> {
+      return 
cacheRule.getCache().getDistributedSystem().getDistributedMember();
+    });
+    server2.invoke(() -> doPuts());
+
+    server1.invoke(() -> doOnRegionAndOnMemberFunction());
+
+    verifyDataOnServers();
+  }
+
+  @Test
+  public void accessorCanRollbackTransactionWithOnRegionAndOnMemberFunctions() 
{
+    accessor.invoke(() -> createServerRegions(1, 3, true));
+    setupServerRegions(true);
+    distributedMember = server1.invoke(() -> {
+      return 
cacheRule.getCache().getDistributedSystem().getDistributedMember();
+    });
+    server2.invoke(() -> doPuts());
+
+    accessor.invoke(() -> doOnRegionAndOnMemberFunction());
+
+    verifyDataOnServers();
+  }
+
+  private void doOnRegionAndOnMemberFunction() {
+    Region region = cacheRule.getCache().getRegion(regionName);
+    Region replicateRegion = null;
+    replicateRegion = cacheRule.getCache().getRegion(replicateRegionName);
+    CacheTransactionManager txManager =
+        cacheRule.getCache().getCacheTransactionManager();
+    txManager.begin();
+    doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+    doFunction(replicateRegion, new MyReplicateRegionFunction(), 
Type.ON_REGION, false);
+    doFunction(region, new MyPartitionRegionFunction(), Type.ON_MEMBER, false);
+    txManager.rollback();
+  }
+
+  private int createServerRegions(int redundantCopies, int totalNumOfBuckets,
+      boolean isAccessor) throws Exception {
+    PartitionAttributesFactory factory = new PartitionAttributesFactory();
+    
factory.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumOfBuckets);
+    if (isAccessor) {
+      factory.setLocalMaxMemory(0);
+    }
+
+    cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+        .setPartitionAttributes(factory.create()).create(regionName);
+    cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE)
+        .create(replicateRegionName);
+    CacheServer server = cacheRule.getCache().addCacheServer();
+    server.setPort(0);
+    server.start();
+    return server.getPort();
+  }
+
+  private void createClientRegions(boolean singleHopEnabled, int... ports) {
+    clientCacheRule.createClientCache();
+
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    try {
+      pool = getPool(singleHopEnabled, ports);
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+
+    ClientRegionFactory crf =
+        
clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+    crf.setPoolName(pool.getName());
+    crf.create(regionName);
+    crf.create(replicateRegionName);
+
+    if (ports.length > 1) {
+      pool.acquireConnection(new ServerLocation(hostName, ports[0]));
+    }
+  }
+
+  private PoolImpl getPool(boolean singleHopEnabled, int... ports) {
+    PoolFactory factory = PoolManager.createFactory();
+    for (int port : ports) {
+      factory.addServer(hostName, port);
+    }
+    factory.setPRSingleHopEnabled(singleHopEnabled);
+    return (PoolImpl) factory.create(uniqueName);
+  }
+
+  private void doPuts() {
+    doPuts(cacheRule.getCache().getRegion(regionName));
+    doPuts(cacheRule.getCache().getRegion(replicateRegionName));
+  }
+
+  private void doPuts(Region region) {
+    region.put(KEY1, value + KEY1);
+    region.put(KEY2, value + KEY2);
+    region.put(KEY3, value + KEY3);
+    region.put(KEY4, value + KEY4);
+  }
+
+  private void verifyData() {
+    verifyData(cacheRule.getCache().getRegion(regionName));
+    verifyData(cacheRule.getCache().getRegion(replicateRegionName));
+  }
+
+  private void verifyData(Region region) {
+    assertThat(region.get(KEY1)).isEqualTo(value + KEY1);
+    assertThat(region.get(KEY2)).isEqualTo(value + KEY2);
+    assertThat(region.get(KEY3)).isEqualTo(value + KEY3);
+    assertThat(region.get(KEY4)).isEqualTo(value + KEY4);
+  }
+
+  private void doFunction(Region region, Function function, Type type, boolean 
withFilter) {
+    doFunction(region, function, type, withFilter, false);
+  }
+
+  private void doFunction(Region region, Function function, Type type, boolean 
withFilter,
+      boolean withMultipleKeys) {
+    doFunction(region, function, type, withFilter, withMultipleKeys, false);
+  }
+
+  private void doFunction(Region region, Function function, Type type, boolean 
withFilter,
+      boolean withMultipleKeys, boolean areColocatedKeys) {
+    Execution execution;
+    Set keySet = new HashSet();
+    keySet.add(KEY1);
+    if (withMultipleKeys) {
+      if (areColocatedKeys) {
+        keySet.add(KEY4);
+      } else {
+        keySet.add(KEY2);
+      }
+    }
+    switch (type) {
+      case ON_MEMBER:
+        if (distributedMember != null) {
+          if (distributedMember2 != null) {
+            Set membersSet = new HashSet();
+            membersSet.add(distributedMember);
+            membersSet.add(distributedMember2);
+            execution = FunctionService.onMembers(membersSet);
+          } else {
+            execution = 
FunctionService.onMember(distributedMember).setArguments(regionName);
+          }
+        } else {
+          execution = FunctionService.onMembers();
+        }
+        break;
+      case ON_REGION:
+        execution = FunctionService.onRegion(region);
+        if (withFilter) {
+          execution = execution.withFilter(keySet);
+        }
+        break;
+      default:
+        throw new RuntimeException("unexpected type");
+    }
+
+    ResultCollector resultCollector = execution.execute(function);
+    resultCollector.getResult();
+  }
+
+  private void doFunction2(Region region, Function function, Type type) {
+    Execution execution;
+    Set keySet = new HashSet();
+    keySet.add(KEY2);
+    switch (type) {
+      case ON_MEMBER:
+        execution = FunctionService.onMembers();
+        break;
+      case ON_REGION:
+        execution = FunctionService.onRegion(region);
+        execution = execution.withFilter(keySet);
+        break;
+      default:
+        throw new RuntimeException("unexpected type");
+    }
+
+    ResultCollector resultCollector = execution.execute(function);
+    resultCollector.getResult();
+  }
+
+  public static class MyPartitionRegionFunction implements Function, 
DataSerializable {
+    @Override
+    public void execute(FunctionContext context) {
+      if (context instanceof RegionFunctionContext) {
+        PartitionedRegion region =
+            (PartitionedRegion) ((RegionFunctionContext) context).getDataSet();
+        Set keySet = ((RegionFunctionContext) context).getFilter();
+        Iterator iterator = keySet.iterator();
+        while (iterator.hasNext()) {
+          Object key = iterator.next();
+          region.destroy(key);
+        }
+      } else if (context instanceof FunctionContextImpl) {
+        String regionName = context.getArguments().toString();
+        Region region = context.getCache().getRegion(regionName);
+        region.destroy(KEY4);
+      }
+      context.getResultSender().lastResult(Boolean.TRUE);
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+
+    }
+
+    @Override
+    public boolean optimizeForWrite() {
+      return true;
+    }
+  }
+
+  public static class MyReplicateRegionFunction implements Function, 
DataSerializable {
+    @Override
+    public void execute(FunctionContext context) {
+      if (context instanceof RegionFunctionContext) {
+        Region region = ((RegionFunctionContext) context).getDataSet();
+        region.destroy(KEY2);
+        region.destroy(KEY3);
+        context.getResultSender().lastResult(Boolean.TRUE);
+      }
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+
+    }
+  }
+}

Reply via email to