This is an automated email from the ASF dual-hosted git repository.
alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 7ccdc8297b GEODE-10155: Avoid threads hanging when function execution
times-out (#7493)
7ccdc8297b is described below
commit 7ccdc8297b1ded06175f41543884d945d5995c60
Author: Alberto Gomez <[email protected]>
AuthorDate: Mon Jun 6 20:18:46 2022 +0200
GEODE-10155: Avoid threads hanging when function execution times-out (#7493)
* GEODE-10155: Avoid threads hanging when function execution times-out
* GEODE-10155: Updated after review
* GEODE-10155: More changes after review
* GEODE-10155: Changes after more reviews
* GEODE-10155: Some more changes after review
* GEODE-10155: More changes after review
* GEODE-10155: More clean-up after review
---
...nctionExecutionNoSingleHopDistributedTest.java} | 895 ++++++++++-----------
.../cache/execute/PRClientServerTestBase.java | 351 ++++----
.../PartitionedRegionFunctionResultSender.java | 77 +-
.../PartitionedRegionFunctionResultSenderTest.java | 174 ++++
.../internal/cache/functions/TestFunction.java | 23 +-
.../geode/test/junit/rules/ServerStarterRule.java | 9 +
.../sanctioned-geode-dunit-serializables.txt | 2 +-
7 files changed, 864 insertions(+), 667 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java
similarity index 53%
rename from
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java
rename to
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java
index a76a7157b8..1c66b3d2e0 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java
@@ -14,10 +14,9 @@
*/
package org.apache.geode.internal.cache.execute;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.Serializable;
import java.util.ArrayList;
@@ -27,6 +26,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
@@ -37,6 +38,7 @@ import
org.junit.runners.Parameterized.UseParametersRunnerFactory;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionAdapter;
@@ -50,11 +52,10 @@ import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.functions.TestFunction;
import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.SerializableRunnableIF;
-import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -64,7 +65,7 @@ import
org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor
@Category({ClientServerTest.class, FunctionServiceTest.class})
@RunWith(Parameterized.class)
@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+public class PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
extends PRClientServerTestBase {
private static final Logger logger = LogService.getLogger();
@@ -77,7 +78,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
private static final int retryCount = 0;
- public PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest() {
+ public PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest() {
super();
}
@@ -87,10 +88,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testServerAllKeyExecution_byInstance() {
createScenario();
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
registerFunctionAtServer(function);
isByName = Boolean.FALSE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverAllKeyExecution(isByName));
}
@@ -100,7 +101,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testServerGetAllFunction() {
createScenario();
-
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::getAll);
+
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::getAll);
}
/*
@@ -109,7 +110,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testServerPutAllFunction() {
createScenario();
-
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putAll);
+
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putAll);
}
/*
@@ -119,10 +120,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testServerSingleKeyExecution_byName() {
createScenario();
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
registerFunctionAtServer(function);
isByName = Boolean.TRUE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverSingleKeyExecution(isByName));
}
@@ -135,7 +136,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
public void testserverSingleKeyExecution_FunctionInvocationTargetException()
{
createScenario();
client.invoke(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverSingleKeyExecution_FunctionInvocationTargetException);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverSingleKeyExecution_FunctionInvocationTargetException);
}
/*
@@ -144,7 +145,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testBucketFilter() {
createScenarioForBucketFilter();
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
registerFunctionAtServer(function);
Set<Integer> bucketFilterSet = new HashSet<>();
@@ -161,7 +162,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testBucketFilterOverride() {
createScenarioForBucketFilter();
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
registerFunctionAtServer(function);
// test multi key filter
Set<Integer> bucketFilterSet = new HashSet<>();
@@ -181,10 +182,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testServerSingleKeyExecution_SocketTimeOut() {
createScenario();
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
registerFunctionAtServer(function);
isByName = Boolean.TRUE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverSingleKeyExecutionSocketTimeOut(isByName));
}
@@ -195,10 +196,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testServerSingleKeyExecution_byInstance() {
createScenario();
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
registerFunctionAtServer(function);
isByName = Boolean.FALSE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverSingleKeyExecution(isByName));
}
@@ -209,7 +210,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
public void testServerSingleKeyExecution_byInlineFunction() {
createScenario();
client.invoke(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverSingleKeyExecution_Inline);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverSingleKeyExecution_Inline);
}
/*
@@ -219,26 +220,26 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testserverMultiKeyExecution_byName() {
createScenario();
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
registerFunctionAtServer(function);
isByName = Boolean.TRUE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverMultiKeyExecution(isByName));
server1.invoke(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer);
server2.invoke(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer);
server3.invoke(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer);
}
@Test
- public void testserverMultiKeyExecution_SocektTimeOut() {
+ public void testserverMultiKeyExecution_SocketTimeOut() {
createScenario();
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
registerFunctionAtServer(function);
isByName = Boolean.TRUE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverMultiKeyExecutionSocketTimeOut(isByName));
}
@@ -249,7 +250,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
public void testserverMultiKeyExecution_byInlineFunction() {
createScenario();
client.invoke(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverMultiKeyExecution_Inline);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverMultiKeyExecution_Inline);
}
/*
@@ -262,7 +263,7 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
public void testserverMultiKeyExecution_FunctionInvocationTargetException() {
createScenario();
client.invoke(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverMultiKeyExecution_FunctionInvocationTargetException);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverMultiKeyExecution_FunctionInvocationTargetException);
}
/*
@@ -272,10 +273,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testserverMultiKeyExecutionNoResult_byName() {
createScenario();
- Function function = new TestFunction(false, TEST_FUNCTION7);
+ Function<Object> function = new TestFunction<>(false, TEST_FUNCTION7);
registerFunctionAtServer(function);
isByName = Boolean.TRUE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverMultiKeyExecutionNoResult(isByName));
}
@@ -286,10 +287,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testserverMultiKeyExecution_byInstance() {
createScenario();
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
registerFunctionAtServer(function);
isByName = Boolean.FALSE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverMultiKeyExecution(isByName));
}
@@ -300,10 +301,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testserverMultiKeyExecutionOnASingleBucket_byName() {
createScenario();
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
registerFunctionAtServer(function);
isByName = Boolean.TRUE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverMultiKeyExecutionOnASingleBucket(isByName));
}
@@ -314,10 +315,10 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
@Test
public void testserverMultiKeyExecutionOnASingleBucket_byInstance() {
createScenario();
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
registerFunctionAtServer(function);
isByName = Boolean.FALSE;
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.serverMultiKeyExecutionOnASingleBucket(isByName));
}
@@ -326,33 +327,30 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
* failover to other available server
*/
@Test
- public void testServerFailoverWithTwoServerAliveHA() {
+ public void testServerFailoverWithTwoServerAliveHA() throws
InterruptedException {
IgnoredException.addIgnoredException("FunctionInvocationTargetException");
- ArrayList commonAttributes =
+ ArrayList<Object> commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
createClientServerScenarion(commonAttributes, 20, 20, 20);
- Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_HA);
registerFunctionAtServer(function);
-
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putOperation);
+
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putOperation);
int AsyncInvocationArrSize = 1;
- AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
+ AsyncInvocation<?>[] async = new
AsyncInvocation<?>[AsyncInvocationArrSize];
async[0] = client.invokeAsync(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::executeFunctionHA);
-
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::executeFunctionHA);
+
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
.verifyDeadAndLiveServers(2));
- ThreadUtils.join(async[0], 6 * 60 * 1000);
- if (async[0].getException() != null) {
- Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
- }
- List l = (List) async[0].getReturnValue();
- assertEquals(2, l.size());
+ List<?> l = (List<?>) async[0].get();
+
+ assertThat(l).hasSize(2);
}
/*
@@ -360,31 +358,28 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
* failover to other available server
*/
@Test
- public void testServerCacheClosedFailoverWithTwoServerAliveHA() {
+ public void testServerCacheClosedFailoverWithTwoServerAliveHA() throws
InterruptedException {
IgnoredException.addIgnoredException("FunctionInvocationTargetException");
- ArrayList commonAttributes =
+ ArrayList<Object> commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
createClientServerScenarion(commonAttributes, 20, 20, 20);
- Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_HA);
registerFunctionAtServer(function);
-
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putOperation);
+
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putOperation);
int AsyncInvocationArrSize = 1;
- AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
+ AsyncInvocation<?>[] async = new
AsyncInvocation<?>[AsyncInvocationArrSize];
async[0] = client.invokeAsync(
-
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::executeFunctionHA);
-
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::closeCacheHA);
- client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::executeFunctionHA);
+
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::closeCacheHA);
+ client.invoke(() ->
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
.verifyDeadAndLiveServers(2));
- ThreadUtils.join(async[0], 5 * 60 * 1000);
- if (async[0].getException() != null) {
- Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
- }
- List l = (List) async[0].getReturnValue();
- assertEquals(2, l.size());
+
+ List<?> l = (List<?>) async[0].get();
+ assertThat(l).hasSize(2);
}
@Test
@@ -392,20 +387,90 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
createScenario();
server1
.invoke(
- (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+ (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
server1
.invoke(
- (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+ (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
server1
.invoke(
- (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+ (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
client
.invoke(
- (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+ (SerializableRunnableIF)
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
client.invoke(
PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714);
}
+ /**
+ * This test case verifies that if the execution of a function handled
+ * by a Function Execution thread times out at the client, the
ServerConnection
+ * thread will eventually be released.
+ * In order to test this, a slow function will be executed by a client
+ * with a small time-out a number of times equal to the number of servers
+ * in the cluster * the max number of threads configured.
+ * After the function executions have timed-out, another request will be
+ * sent by the client to any server and it should be served timely.
+ * If the ServerConnection threads had not been released, this new
+ * request will never be served because there would be not ServerConnection
+ * threads available and the test case will time-out.
+ */
+ @Test
+ public void
testClientFunctionExecutionTimingOutDoesNotLeaveServerConnectionThreadsHanged()
{
+ // Set client connect-timeout to a very high value so that if there are no
+ // ServerConnection threads available the test will time-out before the
client times-out.
+ int connectTimeout = (int) (GeodeAwaitility.getTimeout().toMillis() * 2);
+ int maxThreads = 2;
+ createScenarioWithClientConnectTimeout(connectTimeout, maxThreads);
+
+ // The function must be executed a number of times equal
+ // to the number of servers * the max-threads, to check if all the
+ // threads are hanged.
+ int executions = (3 * maxThreads);
+
+ // functionTimeoutSecs should be lower than the
+ // time taken by the slow function to return all
+ // the results
+ int functionTimeoutSecs = 2;
+
+ Function<?> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_SLOW);
+ registerFunctionAtServer(function);
+
+ // Run the function that will time-out at the client
+ // the number of specified times.
+ IntStream.range(0, executions)
+ .forEach(i -> assertThatThrownBy(() -> client
+ .invoke(() -> executeSlowFunctionOnRegionNoFilter(function,
PartitionedRegionName,
+ functionTimeoutSecs)))
+
.getCause().getCause().isInstanceOf(ServerConnectivityException.class));
+
+ // Make sure that the get returns timely. If it hangs, it means
+ // that there are no threads available in the servers to handle the
+ // request because they were hanged due to the previous function
+ // executions.
+ await().until(() -> {
+ client.invoke(() -> executeGet(PartitionedRegionName, "key"));
+ return true;
+ });
+ }
+
+ private Object executeGet(String regionName, Object key) {
+ Region<?, ?> region = cache.getRegion(regionName);
+ return region.get(key);
+ }
+
+ private Object executeSlowFunctionOnRegionNoFilter(Function<?> function,
String regionName,
+ int functionTimeoutSecs) {
+ FunctionService.registerFunction(function);
+ Region<?, ?> region = cache.getRegion(regionName);
+
+ Execution execution = FunctionService.onRegion(region);
+
+ Object[] args = {Boolean.TRUE};
+ return execution.setArguments(args).execute(function.getId(),
functionTimeoutSecs,
+ TimeUnit.SECONDS).getResult();
+ }
+
+
public static void registerFunction() {
FunctionService.registerFunction(new FunctionAdapter() {
@Override
@@ -457,57 +522,56 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
public static void executeFunction() {
- Region region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ Region<Object, Object> region = cache.getRegion(PartitionedRegionName);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 10); i > 0; i--) {
testKeysSet.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
try {
- ResultCollector rc1 =
+ ResultCollector<?, ?> rc1 =
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId());
- HashMap resultMap = ((HashMap) rc1.getResult());
- assertEquals(3, resultMap.size());
+ HashMap<?, ?> resultMap = ((HashMap<?, ?>) rc1.getResult());
+ assertThat(resultMap).hasSize(3);
- for (Object o : resultMap.entrySet()) {
- Map.Entry entry = (Map.Entry) o;
- ArrayList resultListForMember = (ArrayList) entry.getValue();
+ for (Map.Entry<?, ?> o : resultMap.entrySet()) {
+ ArrayList<?> resultListForMember = (ArrayList<?>) o.getValue();
for (Object result : resultListForMember) {
- assertEquals(Boolean.TRUE, result);
+ assertThat(result).isEqualTo(true);
}
}
} catch (Exception e) {
logger.info("Got an exception : " + e.getMessage());
- assertTrue(e instanceof CacheClosedException);
+ assertThat(e).isInstanceOf(CacheClosedException.class);
}
}
private static Object executeFunctionHA() {
- Region region = cache.getRegion(PartitionedRegionName);
+ Region<Object, Object> region = cache.getRegion(PartitionedRegionName);
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 10); i > 0; i--) {
testKeysSet.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_HA);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
- ResultCollector rc1 =
+ ResultCollector<?, ?> rc1 =
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId());
- List l = ((List) rc1.getResult());
+ List<?> l = ((List<?>) rc1.getResult());
logger.info("Result size : " + l.size());
return l;
}
private static void putOperation() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 10); i > 0; i--) {
testKeysSet.add("execKey-" + i);
@@ -520,132 +584,119 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
}
private void createScenario() {
- ArrayList commonAttributes =
+ ArrayList<Object> commonAttributes =
createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20);
}
+ private void createScenarioWithClientConnectTimeout(int connectTimeout, int
maxThreads) {
+ ArrayList<Object> commonAttributes =
+ createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
+ createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20,
maxThreads, connectTimeout);
+ }
+
+
private void createScenarioForBucketFilter() {
- ArrayList commonAttributes =
createCommonServerAttributes("TestPartitionedRegion",
+ ArrayList<Object> commonAttributes =
createCommonServerAttributes("TestPartitionedRegion",
new BucketFilterPRResolver(), 0, null);
createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20);
}
private static void checkBucketsOnServer() {
PartitionedRegion region = (PartitionedRegion)
cache.getRegion(PartitionedRegionName);
- HashMap localBucket2RegionMap = (HashMap)
region.getDataStore().getSizeLocally();
+ HashMap<Integer, Integer> localBucket2RegionMap =
+ (HashMap<Integer, Integer>) region.getDataStore().getSizeLocally();
logger.info(
"Size of the " + PartitionedRegionName + " in this VM :- " +
localBucket2RegionMap.size());
- Set entrySet = localBucket2RegionMap.entrySet();
- assertNotNull(entrySet);
+ Set<Map.Entry<Integer, Integer>> entrySet =
localBucket2RegionMap.entrySet();
+ assertThat(entrySet).isNotNull();
}
private static void serverAllKeyExecution(Boolean isByName) {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets / 2); i > 0; i--) {
testKeysSet.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
- try {
- int j = 0;
- HashSet<Integer> origVals = new HashSet<>();
- for (String item : testKeysSet) {
- Integer val = j++;
- origVals.add(val);
- region.put(item, val);
- }
- ResultCollector rc1 = executeOnAll(dataSet, Boolean.TRUE, function,
isByName);
- List resultList = (List) rc1.getResult();
- logger.info("Result size : " + resultList.size());
- logger.info("Result are SSSS : " + resultList);
- assertEquals(3, resultList.size());
-
- for (Object result : resultList) {
- assertEquals(Boolean.TRUE, result);
- }
- ResultCollector rc2 = executeOnAll(dataSet, testKeysSet, function,
isByName);
- List l2 = ((List) rc2.getResult());
- assertEquals(3, l2.size());
- HashSet<Integer> foundVals = new HashSet<>();
- for (Object value : l2) {
- ArrayList subL = (ArrayList) (value);
- assertTrue(subL.size() > 0);
- for (Object o : subL) {
- assertTrue(foundVals.add((Integer) o));
- }
- }
- assertEquals(origVals, foundVals);
- } catch (Exception e) {
- Assert.fail("Test failed after the put operation", e);
+ int j = 0;
+ HashSet<Integer> origVals = new HashSet<>();
+ for (String item : testKeysSet) {
+ Integer val = j++;
+ origVals.add(val);
+ region.put(item, val);
+ }
+ ResultCollector<?, ?> rc1 = executeOnAll(dataSet, Boolean.TRUE, function,
isByName);
+ List<?> resultList = (List<?>) rc1.getResult();
+ assertThat(resultList).hasSize(3);
+ for (Object result : resultList) {
+ assertThat(result).isEqualTo(true);
}
+ ResultCollector<?, ?> rc2 = executeOnAll(dataSet, testKeysSet, function,
isByName);
+ List<?> l2 = (List<?>) rc2.getResult();
+ assertThat(l2).hasSize(3);
+ HashSet<Integer> foundVals = new HashSet<>();
+ for (Object value : l2) {
+ List<?> subL = (List<?>) value;
+ assertThat(subL).hasSizeGreaterThan(0);
+ for (Object o : subL) {
+ assertThat(foundVals.add((Integer) o)).isTrue();
+ }
+ }
+ assertThat(foundVals).containsExactlyInAnyOrderElementsOf(origVals);
}
public static void getAll() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final List<String> testKeysList = new ArrayList<>();
for (int i = (totalNumBuckets * 3); i > 0; i--) {
testKeysList.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- try {
- int j = 0;
- Map<String, Integer> origVals = new HashMap<>();
- for (String key : testKeysList) {
- Integer val = j++;
- origVals.put(key, val);
- region.put(key, val);
- }
- Map resultMap = region.getAll(testKeysList);
- assertEquals(resultMap, origVals);
- Wait.pause(2000);
- Map secondResultMap = region.getAll(testKeysList);
- assertEquals(secondResultMap, origVals);
-
- } catch (Exception e) {
- Assert.fail("Test failed after the put operation", e);
-
+ int j = 0;
+ Map<String, Integer> origVals = new HashMap<>();
+ for (String key : testKeysList) {
+ Integer val = j++;
+ origVals.put(key, val);
+ region.put(key, val);
}
+ Map<String, Integer> resultMap = region.getAll(testKeysList);
+ assertThat(resultMap).containsExactlyInAnyOrderEntriesOf(origVals);
+ await().untilAsserted(
+ () ->
assertThat(region.getAll(testKeysList)).containsExactlyInAnyOrderEntriesOf(origVals));
}
public static void putAll() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final List<String> testKeysList = new ArrayList<>();
for (int i = (totalNumBuckets * 3); i > 0; i--) {
testKeysList.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- try {
- int j = 0;
- Map<String, Integer> origVals = new HashMap<>();
- for (String key : testKeysList) {
- Integer val = j++;
- origVals.put(key, val);
- region.put(key, val);
- }
- Map resultMap = region.getAll(testKeysList);
- assertEquals(resultMap, origVals);
- Wait.pause(2000);
- Map secondResultMap = region.getAll(testKeysList);
- assertEquals(secondResultMap, origVals);
-
- } catch (Exception e) {
- Assert.fail("Test failed after the put operation", e);
-
+ int j = 0;
+ Map<String, Integer> origVals = new HashMap<>();
+ for (String key : testKeysList) {
+ Integer val = j++;
+ origVals.put(key, val);
+ region.put(key, val);
}
+ Map<String, Integer> resultMap = region.getAll(testKeysList);
+ assertThat(resultMap).containsExactlyInAnyOrderEntriesOf(origVals);
+ await().untilAsserted(
+ () ->
assertThat(region.getAll(testKeysList)).containsExactlyInAnyOrderEntriesOf(origVals));
}
private static void serverMultiKeyExecutionOnASingleBucket(Boolean isByName)
{
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 2); i > 0; i--) {
testKeysSet.add("execKey-" + i);
@@ -656,192 +707,163 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
region.put(value, val);
}
DistributedSystem.setThreadsSocketPolicy(false);
- for (String o : testKeysSet) {
- try {
- Set<String> singleKeySet = Collections.singleton(o);
- Function function = new TestFunction(true, TEST_FUNCTION2);
- FunctionService.registerFunction(function);
- Execution dataSet = FunctionService.onRegion(region);
- ResultCollector rc1 = execute(dataSet, singleKeySet, Boolean.TRUE,
function, isByName);
- List l = ((List) rc1.getResult());
- assertEquals(1, l.size());
-
- ResultCollector rc2 =
- execute(dataSet, singleKeySet, new HashSet<>(singleKeySet),
function, isByName);
- List l2 = ((List) rc2.getResult());
-
- assertEquals(1, l2.size());
- List subList = (List) l2.iterator().next();
- assertEquals(1, subList.size());
- assertEquals(region.get(singleKeySet.iterator().next()),
subList.iterator().next());
- } catch (Exception expected) {
- logger.info("Exception : " + expected.getMessage());
- expected.printStackTrace();
- fail("Test failed after the put operation");
- }
+ for (String key : testKeysSet) {
+ Set<String> singleKeySet = Collections.singleton(key);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
+ FunctionService.registerFunction(function);
+ Execution dataSet = FunctionService.onRegion(region);
+ ResultCollector<?, ?> rc1 = execute(dataSet, singleKeySet, Boolean.TRUE,
function, isByName);
+ List<?> list1 = (List<?>) rc1.getResult();
+ assertThat(list1).hasSize(1);
+
+ ResultCollector<?, ?> rc2 =
+ execute(dataSet, singleKeySet, new HashSet<>(singleKeySet),
function, isByName);
+ List<?> list2 = (List<?>) rc2.getResult();
+
+ assertThat(list2).hasSize(1);
+ List<Integer> subList = (List<Integer>) list2.iterator().next();
+ assertThat(subList).hasSize(1);
+
assertThat(subList).containsOnly(region.get(singleKeySet.iterator().next()));
}
}
private static void serverMultiKeyExecution(Boolean isByName) {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 2); i > 0; i--) {
testKeysSet.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
- try {
- int j = 0;
- HashSet<Integer> origVals = new HashSet<>();
- for (String element : testKeysSet) {
- Integer val = j++;
- origVals.add(val);
- region.put(element, val);
- }
- ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
- List l = ((List) rc1.getResult());
- logger.info("Result size : " + l.size());
- assertEquals(3, l.size());
- for (Object item : l) {
- assertEquals(Boolean.TRUE, item);
- }
-
- ResultCollector rc2 = execute(dataSet, testKeysSet, testKeysSet,
function, isByName);
- List l2 = ((List) rc2.getResult());
- assertEquals(3, l2.size());
- HashSet<Integer> foundVals = new HashSet<>();
- for (Object value : l2) {
- ArrayList subL = (ArrayList) value;
- assertTrue(subL.size() > 0);
- for (Object o : subL) {
- assertTrue(foundVals.add((Integer) o));
- }
- }
- assertEquals(origVals, foundVals);
- } catch (Exception e) {
- Assert.fail("Test failed after the put operation", e);
+ int j = 0;
+ HashSet<Integer> origVals = new HashSet<>();
+ for (String element : testKeysSet) {
+ Integer val = j++;
+ origVals.add(val);
+ region.put(element, val);
+ }
+ ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
+ List<?> l = (List<?>) rc1.getResult();
+ assertThat(l).hasSize(3);
+ for (Object item : l) {
+ assertThat(item).isEqualTo(true);
+ }
+ ResultCollector<?, ?> rc2 = execute(dataSet, testKeysSet, testKeysSet,
function, isByName);
+ List<?> l2 = (List<?>) rc2.getResult();
+ assertThat(l2).hasSize(3);
+ HashSet<Integer> foundVals = new HashSet<>();
+ for (Object value : l2) {
+ List<?> subL = (List<?>) value;
+ assertThat(subL).hasSizeGreaterThan(0);
+ for (Object o : subL) {
+ assertThat(foundVals.add((Integer) o)).isTrue();
+ }
}
+ assertThat(foundVals).containsExactlyInAnyOrderElementsOf(origVals);
}
private static void serverMultiKeyExecutionSocketTimeOut(Boolean isByName) {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 2); i > 0; i--) {
testKeysSet.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
- try {
- int j = 0;
- for (String value : testKeysSet) {
- Integer val = j++;
- region.put(value, val);
- }
- ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
- List l = ((List) rc1.getResult());
- logger.info("Result size : " + l.size());
- assertEquals(3, l.size());
- for (Object o : l) {
- assertEquals(Boolean.TRUE, o);
- }
-
- } catch (Exception e) {
- Assert.fail("Test failed after the function execution", e);
+ int j = 0;
+ for (String value : testKeysSet) {
+ Integer val = j++;
+ region.put(value, val);
+ }
+ ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
+ List<?> l = (List<?>) rc1.getResult();
+ logger.info("Result size : " + l.size());
+ assertThat(l).hasSize(3);
+ for (Object o : l) {
+ assertThat(o).isEqualTo(true);
}
}
private static void serverSingleKeyExecutionSocketTimeOut(Boolean isByName) {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final String testKey = "execKey";
final Set<String> testKeysSet = new HashSet<>();
testKeysSet.add(testKey);
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+ Function<Object> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
region.put(testKey, 1);
- try {
- ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
- assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0));
- ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function,
isByName);
- assertEquals(testKey, ((List) rs2.getResult()).get(0));
+ ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
+ assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo(true);
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.info("Exception : ", ex);
- Assert.fail("Test failed after the put operation", ex);
- }
+ ResultCollector<?, ?> rs2 = execute(dataSet, testKeysSet, testKey,
function, isByName);
+ assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo(testKey);
}
private static void serverMultiKeyExecution_Inline() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 2); i > 0; i--) {
testKeysSet.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
Execution dataSet = FunctionService.onRegion(region);
- try {
- int j = 0;
- for (String value : testKeysSet) {
- Integer val = j++;
- region.put(value, val);
- }
- ResultCollector rc1 =
-
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new
FunctionAdapter() {
- @Override
- public void execute(FunctionContext context) {
- @SuppressWarnings("unchecked")
- final ResultSender<Object> resultSender =
context.getResultSender();
- if (context.getArguments() instanceof String) {
- resultSender.lastResult("Success");
- } else if (context.getArguments() instanceof Boolean) {
- resultSender.lastResult(Boolean.TRUE);
- }
- }
- @Override
- public String getId() {
- return getClass().getName();
+ int j = 0;
+ for (String value : testKeysSet) {
+ Integer val = j++;
+ region.put(value, val);
+ }
+ ResultCollector<?, ?> rc1 =
+ dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new
FunctionAdapter() {
+ @Override
+ public void execute(FunctionContext context) {
+ @SuppressWarnings("unchecked")
+ final ResultSender<Object> resultSender =
context.getResultSender();
+ if (context.getArguments() instanceof String) {
+ resultSender.lastResult("Success");
+ } else if (context.getArguments() instanceof Boolean) {
+ resultSender.lastResult(Boolean.TRUE);
}
+ }
- @Override
- public boolean hasResult() {
- return true;
- }
- });
- List l = ((List) rc1.getResult());
- logger.info("Result size : " + l.size());
- assertEquals(3, l.size());
- for (Object o : l) {
- assertEquals(Boolean.TRUE, o);
- }
- } catch (Exception e) {
- logger.info("Exception : " + e.getMessage());
- e.printStackTrace();
- fail("Test failed after the put operation");
+ @Override
+ public String getId() {
+ return getClass().getName();
+ }
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+ });
+ List<?> list = (List<?>) rc1.getResult();
+ logger.info("Result size : " + list.size());
+ assertThat(list).hasSize(3);
+ for (Object item : list) {
+ assertThat(item).isEqualTo(true);
}
}
private static void
serverMultiKeyExecution_FunctionInvocationTargetException() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 2); i > 0; i--) {
testKeysSet.add("execKey-" + i);
@@ -853,49 +875,43 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
Integer val = j++;
region.put(o, val);
}
- try {
- ResultCollector rc1 =
-
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new
FunctionAdapter() {
- @Override
- public void execute(FunctionContext context) {
- if (context.isPossibleDuplicate()) {
- context.getResultSender().lastResult(retryCount);
- return;
- }
- if (context.getArguments() instanceof Boolean) {
- throw new FunctionInvocationTargetException("I have been
thrown from TestFunction");
- }
+ ResultCollector<?, ?> rc1 =
+ dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new
FunctionAdapter() {
+ @Override
+ public void execute(FunctionContext context) {
+ if (context.isPossibleDuplicate()) {
+ context.getResultSender().lastResult(retryCount);
+ return;
}
-
- @Override
- public String getId() {
- return getClass().getName();
+ if (context.getArguments() instanceof Boolean) {
+ throw new FunctionInvocationTargetException("I have been thrown
from TestFunction");
}
+ }
- @Override
- public boolean hasResult() {
- return true;
- }
- });
+ @Override
+ public String getId() {
+ return getClass().getName();
+ }
- List list = (ArrayList) rc1.getResult();
- assertEquals(list.get(0), 0);
- } catch (Throwable e) {
- e.printStackTrace();
- Assert.fail("This is not expected Exception", e);
- }
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+ });
+ List<?> list = (List<?>) rc1.getResult();
+ assertThat(list.get(0)).isEqualTo(0);
}
private static void serverMultiKeyExecutionNoResult(Boolean isByName) {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<String> testKeysSet = new HashSet<>();
for (int i = (totalNumBuckets * 2); i > 0; i--) {
testKeysSet.add("execKey-" + i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(false, TEST_FUNCTION7);
+ Function<Object> function = new TestFunction<>(false, TEST_FUNCTION7);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
try {
@@ -906,18 +922,11 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
Integer val = j++;
region.put(o, val);
}
- ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
- rc1.getResult();
- Thread.sleep(20000);
- fail("Test failed after the put operation");
- } catch (FunctionException expected) {
- expected.printStackTrace();
- logger.info("Exception : " + expected.getMessage());
- assertTrue(expected.getMessage()
- .startsWith((String.format("Cannot %s result as the
Function#hasResult() is false",
- "return any"))));
- } catch (Exception notexpected) {
- Assert.fail("Test failed during execute or sleeping", notexpected);
+ ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
+
assertThatThrownBy(rc1::getResult).isExactlyInstanceOf(FunctionException.class)
+ .hasMessageStartingWith(
+ String.format("Cannot %s result as the Function#hasResult() is
false",
+ "return any"));
} finally {
cache.getLogger()
.info("<ExpectedException action=remove>" + "FunctionException" +
"</ExpectedException>");
@@ -926,179 +935,149 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
private static void serverSingleKeyExecution(Boolean isByName) {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final String testKey = "execKey";
final Set<String> testKeysSet = new HashSet<>();
testKeysSet.add(testKey);
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true, TEST_FUNCTION2);
+ Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
- try {
- execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName);
- } catch (Exception expected) {
- assertTrue(expected.getMessage().contains("No target node found for KEY
= " + testKey)
- || expected.getMessage().startsWith("Server could not send the
reply")
- || expected.getMessage().startsWith("Unexpected exception during"));
- }
+
+ execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName);
region.put(testKey, 1);
- try {
- ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
- assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0));
- ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function,
isByName);
- assertEquals(1, ((List) rs2.getResult()).get(0));
+ ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE,
function, isByName);
+ assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo(true);
- HashMap<String, Integer> putData = new HashMap<>();
- putData.put(testKey + "1", 2);
- putData.put(testKey + "2", 3);
+ ResultCollector<?, ?> rs2 = execute(dataSet, testKeysSet, testKey,
function, isByName);
+ assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo(1);
- ResultCollector rs1 = execute(dataSet, testKeysSet, putData, function,
isByName);
- assertEquals(Boolean.TRUE, ((List) rs1.getResult()).get(0));
+ HashMap<String, Integer> putData = new HashMap<>();
+ putData.put(testKey + "1", 2);
+ putData.put(testKey + "2", 3);
- assertEquals((Integer) 2, region.get(testKey + "1"));
- assertEquals((Integer) 3, region.get(testKey + "2"));
+ ResultCollector<?, ?> rs1 = execute(dataSet, testKeysSet, putData,
function, isByName);
+ assertThat(((List<?>) rs1.getResult()).get(0)).isEqualTo(true);
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.info("Exception : ", ex);
- Assert.fail("Test failed after the put operation", ex);
- }
+ assertThat(region.get(testKey + "1")).isEqualTo(2);
+ assertThat(region.get(testKey + "2")).isEqualTo(3);
}
private static void
serverSingleKeyExecution_FunctionInvocationTargetException() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final String testKey = "execKey";
final Set<String> testKeysSet = new HashSet<>();
testKeysSet.add(testKey);
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION);
+ Function<Object> function =
+ new TestFunction<>(true,
TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION);
FunctionService.registerFunction(function);
Execution dataSet = FunctionService.onRegion(region);
region.put(testKey, 1);
- try {
- ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE,
function, false);
- ArrayList list = (ArrayList) rs.getResult();
- assertTrue(((Integer) list.get(0)) >= 5);
- } catch (Exception ex) {
- ex.printStackTrace();
- Assert.fail("This is not expected Exception", ex);
- }
+
+ ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE,
function, false);
+ ArrayList<?> list = (ArrayList<?>) rs.getResult();
+ assertThat(((Integer) list.get(0))).isGreaterThanOrEqualTo(5);
}
private static void serverSingleKeyExecution_Inline() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final String testKey = "execKey";
final Set<String> testKeysSet = new HashSet<>();
testKeysSet.add(testKey);
DistributedSystem.setThreadsSocketPolicy(false);
Execution dataSet = FunctionService.onRegion(region);
- try {
- cache.getLogger()
- .info("<ExpectedException action=add>" + "No target node found for
KEY = "
- + "|Server could not send the reply" + "|Unexpected exception
during"
- + "</ExpectedException>");
- dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new
FunctionAdapter() {
- @Override
- public void execute(FunctionContext context) {
- @SuppressWarnings("unchecked")
- final ResultSender<Object> resultSender = context.getResultSender();
- if (context.getArguments() instanceof String) {
- resultSender.lastResult("Success");
- }
- resultSender.lastResult("Failure");
- }
- @Override
- public String getId() {
- return getClass().getName();
+ FunctionAdapter functionAdapter = new FunctionAdapter() {
+ @Override
+ public void execute(FunctionContext context) {
+ @SuppressWarnings("unchecked")
+ final ResultSender<Object> resultSender = context.getResultSender();
+ if (context.getArguments() instanceof String) {
+ resultSender.lastResult("Success");
}
+ resultSender.lastResult("Failure");
+ }
- @Override
- public boolean hasResult() {
- return true;
- }
- });
- } catch (Exception expected) {
- logger.debug("Exception occurred : " + expected.getMessage());
- assertTrue(expected.getMessage().contains("No target node found for KEY
= " + testKey)
- || expected.getMessage().startsWith("Server could not send the
reply")
- || expected.getMessage().startsWith("Unexpected exception during"));
- } finally {
- cache.getLogger()
- .info("<ExpectedException action=remove>" + "No target node found
for KEY = "
- + "|Server could not send the reply" + "|Unexpected exception
during"
- + "</ExpectedException>");
- }
+ @Override
+ public String getId() {
+ return getClass().getName();
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+ };
+
+
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(functionAdapter);
region.put(testKey, 1);
- try {
- ResultCollector rs =
-
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new
FunctionAdapter() {
- @Override
- public void execute(FunctionContext context) {
- @SuppressWarnings("unchecked")
- final ResultSender<Object> resultSender =
context.getResultSender();
- if (context.getArguments() instanceof String) {
- resultSender.lastResult("Success");
- } else {
- resultSender.lastResult("Failure");
- }
- }
- @Override
- public String getId() {
- return getClass().getName();
+ ResultCollector<?, ?> rs =
+ dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new
FunctionAdapter() {
+ @Override
+ public void execute(FunctionContext context) {
+ @SuppressWarnings("unchecked")
+ final ResultSender<Object> resultSender =
context.getResultSender();
+ if (context.getArguments() instanceof String) {
+ resultSender.lastResult("Success");
+ } else {
+ resultSender.lastResult("Failure");
}
+ }
- @Override
- public boolean hasResult() {
- return true;
- }
- });
- assertEquals("Failure", ((List) rs.getResult()).get(0));
-
- ResultCollector rs2 =
- dataSet.withFilter(testKeysSet).setArguments(testKey).execute(new
FunctionAdapter() {
- @Override
- public void execute(FunctionContext context) {
- @SuppressWarnings("unchecked")
- final ResultSender<Object> resultSender =
context.getResultSender();
- if (context.getArguments() instanceof String) {
- resultSender.lastResult("Success");
- } else {
- resultSender.lastResult("Failure");
- }
- }
+ @Override
+ public String getId() {
+ return getClass().getName();
+ }
- @Override
- public String getId() {
- return getClass().getName();
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+ });
+ assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo("Failure");
+
+ ResultCollector<?, ?> rs2 =
+ dataSet.withFilter(testKeysSet).setArguments(testKey).execute(new
FunctionAdapter() {
+ @Override
+ public void execute(FunctionContext context) {
+ @SuppressWarnings("unchecked")
+ final ResultSender<Object> resultSender =
context.getResultSender();
+ if (context.getArguments() instanceof String) {
+ resultSender.lastResult("Success");
+ } else {
+ resultSender.lastResult("Failure");
}
+ }
- @Override
- public boolean hasResult() {
- return true;
- }
- });
- assertEquals("Success", ((List) rs2.getResult()).get(0));
+ @Override
+ public String getId() {
+ return getClass().getName();
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+ });
+
+ assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo("Success");
- } catch (Exception ex) {
- ex.printStackTrace();
- logger.info("Exception : ", ex);
- Assert.fail("Test failed after the put operation", ex);
- }
}
- private static ResultCollector execute(Execution dataSet, Set testKeysSet,
Serializable args,
- Function function, Boolean isByName) {
+ private static ResultCollector<?, ?> execute(Execution dataSet, Set<?>
testKeysSet,
+ Serializable args,
+ Function<?> function, Boolean isByName) {
if (isByName) {// by name
return
dataSet.withFilter(testKeysSet).setArguments(args).execute(function.getId());
} else { // By Instance
@@ -1106,8 +1085,8 @@ public class
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
}
}
- private static ResultCollector executeOnAll(Execution dataSet, Serializable
args,
- Function function, Boolean isByName) {
+ private static ResultCollector<?, ?> executeOnAll(Execution dataSet,
Serializable args,
+ Function<?> function, Boolean isByName) {
if (isByName) {// by name
return dataSet.setArguments(args).execute(function.getId());
} else { // By Instance
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
index 1a0c92d1aa..107b50a26e 100755
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
@@ -17,11 +17,9 @@ package org.apache.geode.internal.cache.execute;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
-import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -46,6 +44,7 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.execute.Function;
@@ -57,8 +56,6 @@ import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.functions.TestFunction;
import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallableIF;
import org.apache.geode.test.dunit.SerializableRunnable;
@@ -79,6 +76,8 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
protected static Cache cache = null;
+ protected String hostName;
+
static String PartitionedRegionName = "TestPartitionedRegion"; // default
name
protected static String regionName = "TestRegion"; // default name
@@ -93,15 +92,15 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
@Override
public final void postSetUp() throws Exception {
- Host host = Host.getHost(0);
- server1 = host.getVM(0);
- server2 = host.getVM(1);
- server3 = host.getVM(2);
- client = host.getVM(3);
+ server1 = VM.getVM(0);
+ server2 = VM.getVM(1);
+ server3 = VM.getVM(2);
+ client = VM.getVM(3);
+ hostName = NetworkUtils.getServerHostName();
postSetUpPRClientServerTestBase();
}
- protected void postSetUpPRClientServerTestBase() throws Exception {}
+ protected void postSetUpPRClientServerTestBase() {}
private enum ExecuteFunctionMethod {
ExecuteFunctionByObject, ExecuteFunctionById
@@ -119,7 +118,8 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
return ExecuteFunctionMethod.ExecuteFunctionByObject ==
functionExecutionType;
}
- ArrayList createCommonServerAttributes(String regionName, PartitionResolver
pr, int red,
+ ArrayList<Object> createCommonServerAttributes(String regionName,
PartitionResolver<?, ?> pr,
+ int red,
String colocatedWithRegion) {
ArrayList<Object> commonAttributes = new ArrayList<>();
commonAttributes.add(regionName); // 0
@@ -130,123 +130,111 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
return commonAttributes;
}
- public static Integer createCacheServer(ArrayList commonAttributes, Integer
localMaxMemory) {
- AttributesFactory factory = new AttributesFactory();
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ public static Integer createCacheServer(ArrayList<Object> commonAttributes,
+ Integer localMaxMemory) {
+ return createCacheServer(commonAttributes, localMaxMemory, -1);
+ }
+
+ public static Integer createCacheServer(ArrayList<Object> commonAttributes,
+ Integer localMaxMemory,
+ int maxThreads) {
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+ PartitionAttributesFactory<Object, Object> paf = new
PartitionAttributesFactory<>();
- paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1));
+ paf.setPartitionResolver((PartitionResolver<Object, Object>)
commonAttributes.get(1));
paf.setRedundantCopies((Integer) commonAttributes.get(2));
paf.setTotalNumBuckets((Integer) commonAttributes.get(3));
paf.setColocatedWith((String) commonAttributes.get(4));
paf.setLocalMaxMemory(localMaxMemory);
- PartitionAttributes partitionAttributes = paf.create();
+ PartitionAttributes<?, ?> partitionAttributes = paf.create();
factory.setDataPolicy(DataPolicy.PARTITION);
factory.setPartitionAttributes(partitionAttributes);
- RegionAttributes attrs = factory.create();
+ RegionAttributes<Object, Object> attrs = factory.create();
- Region region = cache.createRegion((String) commonAttributes.get(0),
attrs);
- assertNotNull(region);
+ Region<Object, Object> region = cache.createRegion((String)
commonAttributes.get(0), attrs);
+ assertThat(region).isNotNull();
CacheServer server1 = cache.addCacheServer();
- assertNotNull(server1);
+ assertThat(server1).isNotNull();
int port = getRandomAvailableTCPPort();
server1.setPort(port);
- try {
- server1.start();
- } catch (IOException e) {
- Assert.fail("Failed to start the Server", e);
+ if (maxThreads > 0) {
+ server1.setMaxThreads(maxThreads);
}
- assertTrue(server1.isRunning());
+ assertThatNoException().isThrownBy(server1::start);
+ assertThat(server1.isRunning()).isTrue();
return server1.getPort();
}
- private static Integer createSelectorCacheServer(ArrayList commonAttributes,
+ private static Integer createSelectorCacheServer(ArrayList<Object>
commonAttributes,
Integer localMaxMemory) throws Exception {
- AttributesFactory factory = new AttributesFactory();
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+ PartitionAttributesFactory<Object, Object> paf = new
PartitionAttributesFactory<>();
- paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1));
+ paf.setPartitionResolver((PartitionResolver<Object, Object>)
commonAttributes.get(1));
paf.setRedundantCopies((Integer) commonAttributes.get(2));
paf.setTotalNumBuckets((Integer) commonAttributes.get(3));
paf.setColocatedWith((String) commonAttributes.get(4));
paf.setLocalMaxMemory(localMaxMemory);
- PartitionAttributes partitionAttributes = paf.create();
+ PartitionAttributes<?, ?> partitionAttributes = paf.create();
factory.setDataPolicy(DataPolicy.PARTITION);
factory.setPartitionAttributes(partitionAttributes);
- RegionAttributes attrs = factory.create();
+ RegionAttributes<Object, Object> attrs = factory.create();
- Region region = cache.createRegion((String) commonAttributes.get(0),
attrs);
- assertNotNull(region);
+ Region<Object, Object> region = cache.createRegion((String)
commonAttributes.get(0), attrs);
+ assertThat(region).isNotNull();
CacheServer server1 = cache.addCacheServer();
- assertNotNull(server1);
+ assertThat(server1).isNotNull();
int port = getRandomAvailableTCPPort();
server1.setPort(port);
server1.setMaxThreads(16);
server1.start();
- assertTrue(server1.isRunning());
+ assertThat(server1.isRunning()).isTrue();
return server1.getPort();
}
- private static Integer createCacheServerWith2Regions(ArrayList
commonAttributes,
+ private static Integer createCacheServerWith2Regions(ArrayList<Object>
commonAttributes,
Integer localMaxMemory) throws Exception {
- AttributesFactory factory = new AttributesFactory();
- PartitionAttributesFactory paf = new PartitionAttributesFactory();
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+ PartitionAttributesFactory<Object, Object> paf = new
PartitionAttributesFactory<>();
- paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1));
+ paf.setPartitionResolver((PartitionResolver<Object, Object>)
commonAttributes.get(1));
paf.setRedundantCopies((Integer) commonAttributes.get(2));
paf.setTotalNumBuckets((Integer) commonAttributes.get(3));
paf.setColocatedWith((String) commonAttributes.get(4));
paf.setLocalMaxMemory(localMaxMemory);
- PartitionAttributes partitionAttributes = paf.create();
+ PartitionAttributes<?, ?> partitionAttributes = paf.create();
factory.setDataPolicy(DataPolicy.PARTITION);
factory.setPartitionAttributes(partitionAttributes);
- RegionAttributes attrs = factory.create();
+ RegionAttributes<Object, Object> attrs = factory.create();
- Region region1 = cache.createRegion(PartitionedRegionName + "1", attrs);
- assertNotNull(region1);
- Region region2 = cache.createRegion(PartitionedRegionName + "2", attrs);
- assertNotNull(region2);
+ Region<Object, Object> region1 = cache.createRegion(PartitionedRegionName
+ "1", attrs);
+ assertThat(region1).isNotNull();
+ Region<Object, Object> region2 = cache.createRegion(PartitionedRegionName
+ "2", attrs);
+ assertThat(region2).isNotNull();
CacheServer server1 = cache.addCacheServer();
- assertNotNull(server1);
+ assertThat(server1).isNotNull();
int port = getRandomAvailableTCPPort();
server1.setPort(port);
server1.start();
- assertTrue(server1.isRunning());
+ assertThat(server1.isRunning()).isTrue();
return server1.getPort();
}
public static Integer createCacheServer() throws Exception {
CacheServer server1 = cache.addCacheServer();
- assertNotNull(server1);
+ assertThat(server1).isNotNull();
int port = getRandomAvailableTCPPort();
server1.setPort(port);
server1.start();
- assertTrue(server1.isRunning());
+ assertThat(server1.isRunning()).isTrue();
return server1.getPort();
}
- private static Integer createCacheServerWithDR() throws Exception {
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- assertNotNull(cache);
- Region region = cache.createRegion(regionName, factory.create());
- assertNotNull(region);
-
- CacheServer server1 = cache.addCacheServer();
- assertNotNull(server1);
- int port = getRandomAvailableTCPPort();
- server1.setPort(port);
- server1.start();
- assertTrue(server1.isRunning());
-
- return server1.getPort();
- }
-
- public static void createCacheClient(String host, Integer port1, Integer
port2, Integer port3) {
+ public static void createCacheClient(String host, int port1, int port2, int
port3) {
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
@@ -260,16 +248,16 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
pool = (PoolImpl) p;
- AttributesFactory factory = new AttributesFactory();
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
factory.setScope(Scope.LOCAL);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setPoolName(p.getName());
- RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(PartitionedRegionName, attrs);
- assertNotNull(region);
+ RegionAttributes<Object, Object> attrs = factory.create();
+ Region<Object, Object> region = cache.createRegion(PartitionedRegionName,
attrs);
+ assertThat(region).isNotNull();
}
- private static void createCacheClient_SingleConnection(String host, Integer
port1) {
+ private static void createCacheClient_SingleConnection(String host, int
port1) {
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
@@ -282,17 +270,17 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
pool = (PoolImpl) p;
- AttributesFactory factory = new AttributesFactory();
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
factory.setScope(Scope.LOCAL);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setPoolName(p.getName());
- RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(PartitionedRegionName, attrs);
- assertNotNull(region);
+ RegionAttributes<Object, Object> attrs = factory.create();
+ Region<Object, Object> region = cache.createRegion(PartitionedRegionName,
attrs);
+ assertThat(region).isNotNull();
}
- private static void createCacheClientWith2Regions(String host, Integer
port1, Integer port2,
- Integer port3) {
+ private static void createCacheClientWith2Regions(String host, int port1,
int port2,
+ int port3) {
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
@@ -306,22 +294,22 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
pool = (PoolImpl) p;
- AttributesFactory factory = new AttributesFactory();
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setPoolName(p.getName());
- RegionAttributes attrs = factory.create();
- Region region1 = cache.createRegion(PartitionedRegionName + "1", attrs);
- assertNotNull(region1);
+ RegionAttributes<Object, Object> attrs = factory.create();
+ Region<Object, Object> region1 = cache.createRegion(PartitionedRegionName
+ "1", attrs);
+ assertThat(region1).isNotNull();
- factory = new AttributesFactory();
+ factory = new AttributesFactory<>();
factory.setDataPolicy(DataPolicy.EMPTY);
attrs = factory.create();
- Region region2 = cache.createRegion(PartitionedRegionName + "2", attrs);
- assertNotNull(region2);
+ Region<Object, Object> region2 = cache.createRegion(PartitionedRegionName
+ "2", attrs);
+ assertThat(region2).isNotNull();
}
- private static void createSingleHopCacheClient(String host, Integer port1,
Integer port2,
- Integer port3) {
+ private static void createSingleHopCacheClient(String host, int port1, int
port2,
+ int port3) {
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
@@ -335,17 +323,17 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
pool = (PoolImpl) p;
- AttributesFactory factory = new AttributesFactory();
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
factory.setScope(Scope.LOCAL);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setPoolName(p.getName());
- RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(PartitionedRegionName, attrs);
- assertNotNull(region);
+ RegionAttributes<Object, Object> attrs = factory.create();
+ Region<Object, Object> region = cache.createRegion(PartitionedRegionName,
attrs);
+ assertThat(region).isNotNull();
}
- private static void createNoSingleHopCacheClient(String host, Integer port1,
Integer port2,
- Integer port3) {
+ private static void createNoSingleHopCacheClient(String host, int port1, int
port2,
+ int port3) {
CacheServerTestUtil.disableShufflingOfEndpoints();
Pool p;
@@ -359,36 +347,44 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
pool = (PoolImpl) p;
- AttributesFactory factory = new AttributesFactory();
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
factory.setScope(Scope.LOCAL);
factory.setDataPolicy(DataPolicy.EMPTY);
factory.setPoolName(p.getName());
- RegionAttributes attrs = factory.create();
- Region region = cache.createRegion(PartitionedRegionName, attrs);
- assertNotNull(region);
+ RegionAttributes<Object, Object> attrs = factory.create();
+ Region<Object, Object> region = cache.createRegion(PartitionedRegionName,
attrs);
+ assertThat(region).isNotNull();
}
- private static void createCacheClientWithoutRegion(String host, Integer
port1, Integer port2,
- Integer port3) {
+ private static void createNoSingleHopCacheClient(String host,
+ int port1, int port2, int port3, int connectTimeout) {
CacheServerTestUtil.disableShufflingOfEndpoints();
- logger
- .info("PRClientServerTestBase#createCacheClientWithoutRegion :
creating pool");
Pool p;
try {
- p = PoolManager.createFactory().addServer(host, port1)
- .addServer(host, port2).addServer(host, port3).setPingInterval(250)
-
.setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000)
-
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(1)
- .create("PRClientServerTestBaseWithoutRegion");
+ PoolFactory factory = PoolManager.createFactory().addServer(host, port1)
+ .addServer(host, port2).addServer(host, port3).setPingInterval(2000)
+ .setSubscriptionEnabled(true).setReadTimeout(2000)
+ .setSocketBufferSize(1000).setRetryAttempts(0)
+ .setSocketConnectTimeout(connectTimeout)
+ .setPRSingleHopEnabled(false);
+
+ p = factory.create("PRClientServerTestBase");
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
pool = (PoolImpl) p;
+ AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+ factory.setScope(Scope.LOCAL);
+ factory.setDataPolicy(DataPolicy.EMPTY);
+ factory.setPoolName(p.getName());
+ RegionAttributes<Object, Object> attrs = factory.create();
+ Region<Object, Object> region = cache.createRegion(PartitionedRegionName,
attrs);
+ assertThat(region).isNotNull();
}
- private static void createCacheClientWithDistributedRegion(String host,
Integer port1,
- Integer port2, Integer port3) throws Exception {
+ private static void createCacheClientWithoutRegion(String host, int port1,
int port2,
+ int port3) {
CacheServerTestUtil.disableShufflingOfEndpoints();
logger
.info("PRClientServerTestBase#createCacheClientWithoutRegion :
creating pool");
@@ -398,70 +394,64 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
p = PoolManager.createFactory().addServer(host, port1)
.addServer(host, port2).addServer(host, port3).setPingInterval(250)
.setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000)
-
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(0)
+
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(1)
.create("PRClientServerTestBaseWithoutRegion");
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
pool = (PoolImpl) p;
- AttributesFactory factory = new AttributesFactory();
- factory.setScope(Scope.DISTRIBUTED_ACK);
- factory.setDataPolicy(DataPolicy.REPLICATE);
- assertNotNull(cache);
- Region region = cache.createRegion(regionName, factory.create());
- assertNotNull(region);
}
- void createClientServerScenarion(ArrayList commonAttributes, int
localMaxMemoryServer1,
+ void createClientServerScenarion(ArrayList<Object> commonAttributes, int
localMaxMemoryServer1,
int localMaxMemoryServer2, int localMaxMemoryServer3) {
createCacheInClientServer();
- Integer port1 = server1.invoke(() -> PRClientServerTestBase
+ int port1 = server1.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer1));
- Integer port2 = server2.invoke(() -> PRClientServerTestBase
+ int port2 = server2.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer2));
- Integer port3 = server3.invoke(() -> PRClientServerTestBase
+ int port3 = server3.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer3));
client.invoke(() -> PRClientServerTestBase
- .createCacheClient(NetworkUtils.getServerHostName(server1.getHost()),
port1, port2, port3));
+ .createCacheClient(hostName, port1, port2, port3));
}
- void createClientServerScenarion_SingleConnection(ArrayList commonAttributes,
+ void createClientServerScenarion_SingleConnection(ArrayList<Object>
commonAttributes,
int localMaxMemoryServer1,
int localMaxMemoryServer2) {
createCacheInClientServer();
- Integer port1 = server1.invoke(() -> PRClientServerTestBase
+ int port1 = server1.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer1));
server2.invoke(() ->
PRClientServerTestBase.createCacheServer(commonAttributes,
localMaxMemoryServer2));
client.invoke(() ->
PRClientServerTestBase.createCacheClient_SingleConnection(
- NetworkUtils.getServerHostName(server1.getHost()), port1));
+ hostName, port1));
}
- void createClientServerScenarionWith2Regions(ArrayList commonAttributes,
+ void createClientServerScenarionWith2Regions(ArrayList<Object>
commonAttributes,
int localMaxMemoryServer1, int localMaxMemoryServer2,
int localMaxMemoryServer3) {
createCacheInClientServer();
- Integer port1 = server1.invoke(() -> PRClientServerTestBase
+ int port1 = server1.invoke(() -> PRClientServerTestBase
.createCacheServerWith2Regions(commonAttributes,
localMaxMemoryServer1));
- Integer port2 = server2.invoke(() -> PRClientServerTestBase
+ int port2 = server2.invoke(() -> PRClientServerTestBase
.createCacheServerWith2Regions(commonAttributes,
localMaxMemoryServer2));
- Integer port3 = server3.invoke(() -> PRClientServerTestBase
+ int port3 = server3.invoke(() -> PRClientServerTestBase
.createCacheServerWith2Regions(commonAttributes,
localMaxMemoryServer3));
client.invoke(() -> PRClientServerTestBase.createCacheClientWith2Regions(
- NetworkUtils.getServerHostName(server1.getHost()), port1, port2,
port3));
+ hostName, port1, port2, port3));
}
- void createClientServerScenarioSingleHop(ArrayList commonAttributes,
+ void createClientServerScenarioSingleHop(ArrayList<Object> commonAttributes,
int localMaxMemoryServer1, int localMaxMemoryServer2,
int localMaxMemoryServer3) {
createCacheInClientServer();
- Integer port1 = server1.invoke(() -> PRClientServerTestBase
+ int port1 = server1.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer1));
- Integer port2 = server2.invoke(() -> PRClientServerTestBase
+ int port2 = server2.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer2));
- Integer port3 = server3.invoke(() -> PRClientServerTestBase
+ int port3 = server3.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer3));
// Workaround for the issue that hostnames returned by the client metadata
may
// not match those configured by the pool, leading to multiple copies
@@ -475,33 +465,49 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
return cache.getDistributedSystem().getDistributedMember().getHost();
}
- void createClientServerScenarioNoSingleHop(ArrayList commonAttributes,
+ void createClientServerScenarioNoSingleHop(ArrayList<Object>
commonAttributes,
int localMaxMemoryServer1, int localMaxMemoryServer2,
int localMaxMemoryServer3) {
createCacheInClientServer();
- Integer port1 = server1.invoke(() -> PRClientServerTestBase
+ int port1 = server1.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer1));
- Integer port2 = server2.invoke(() -> PRClientServerTestBase
+ int port2 = server2.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer2));
- Integer port3 = server3.invoke(() -> PRClientServerTestBase
+ int port3 = server3.invoke(() -> PRClientServerTestBase
.createCacheServer(commonAttributes, localMaxMemoryServer3));
client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient(
- NetworkUtils.getServerHostName(server1.getHost()), port1, port2,
port3));
+ hostName, port1, port2, port3));
+ }
+
+ void createClientServerScenarioNoSingleHop(ArrayList<Object>
commonAttributes,
+ int localMaxMemoryServer1, int localMaxMemoryServer2,
+ int localMaxMemoryServer3,
+ int maxThreads,
+ int connectTimeout) {
+ createCacheInClientServer();
+ int port1 = server1.invoke(() -> PRClientServerTestBase
+ .createCacheServer(commonAttributes, localMaxMemoryServer1,
maxThreads));
+ int port2 = server2.invoke(() -> PRClientServerTestBase
+ .createCacheServer(commonAttributes, localMaxMemoryServer2,
maxThreads));
+ int port3 = server3.invoke(() -> PRClientServerTestBase
+ .createCacheServer(commonAttributes, localMaxMemoryServer3,
maxThreads));
+ client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient(
+ hostName, port1, port2, port3, connectTimeout));
}
- void createClientServerScenarioSelectorNoSingleHop(ArrayList
commonAttributes,
+ void createClientServerScenarioSelectorNoSingleHop(ArrayList<Object>
commonAttributes,
int localMaxMemoryServer1,
int localMaxMemoryServer2,
int localMaxMemoryServer3) {
createCacheInClientServer();
- Integer port1 = server1.invoke(() -> PRClientServerTestBase
+ int port1 = server1.invoke(() -> PRClientServerTestBase
.createSelectorCacheServer(commonAttributes, localMaxMemoryServer1));
- Integer port2 = server2.invoke(() -> PRClientServerTestBase
+ int port2 = server2.invoke(() -> PRClientServerTestBase
.createSelectorCacheServer(commonAttributes, localMaxMemoryServer2));
- Integer port3 = server3.invoke(() -> PRClientServerTestBase
+ int port3 = server3.invoke(() -> PRClientServerTestBase
.createSelectorCacheServer(commonAttributes, localMaxMemoryServer3));
client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient(
- NetworkUtils.getServerHostName(server1.getHost()), port1, port2,
port3));
+ hostName, port1, port2, port3));
}
@@ -509,15 +515,15 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
logger.info(
"PRClientServerTestBase#createClientServerScenarionWithoutRegion :
creating client server");
createCacheInClientServer();
- Integer port1 = server1.invoke(
+ int port1 = server1.invoke(
(SerializableCallableIF<Integer>)
PRClientServerTestBase::createCacheServer);
- Integer port2 = server2.invoke(
+ int port2 = server2.invoke(
(SerializableCallableIF<Integer>)
PRClientServerTestBase::createCacheServer);
- Integer port3 = server3.invoke(
+ int port3 = server3.invoke(
(SerializableCallableIF<Integer>)
PRClientServerTestBase::createCacheServer);
client.invoke(() -> PRClientServerTestBase.createCacheClientWithoutRegion(
- NetworkUtils.getServerHostName(server1.getHost()), port1, port2,
port3));
+ hostName, port1, port2, port3));
}
void runOnAllServers(SerializableRunnable runnable) {
@@ -526,7 +532,7 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
server3.invoke(runnable);
}
- void registerFunctionAtServer(Function function) {
+ void registerFunctionAtServer(Function<?> function) {
server1.invoke(PRClientServerTestBase.class, "registerFunction", new
Object[] {function});
server2.invoke(PRClientServerTestBase.class, "registerFunction", new
Object[] {function});
@@ -534,7 +540,7 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
server3.invoke(PRClientServerTestBase.class, "registerFunction", new
Object[] {function});
}
- public static void registerFunction(Function function) {
+ public static void registerFunction(Function<?> function) {
FunctionService.registerFunction(function);
}
@@ -559,24 +565,20 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
}
private void createCache(Properties props) {
- try {
- DistributedSystem ds = getSystem(props);
- assertNotNull(ds);
- ds.disconnect();
- ds = getSystem(props);
- cache = CacheFactory.create(ds);
- assertNotNull(cache);
- } catch (Exception e) {
- Assert.fail("Failed while creating the cache", e);
- }
+ DistributedSystem ds = getSystem(props);
+ assertThat(ds).isNotNull();
+ ds.disconnect();
+ ds = getSystem(props);
+ cache = CacheFactory.create(ds);
+ assertThat(cache).isNotNull();
}
static void startServerHA() throws Exception {
Wait.pause(2000);
- Collection bridgeServers = cache.getCacheServers();
+ Collection<?> bridgeServers = cache.getCacheServers();
logger
.info("Start Server cache servers list : " + bridgeServers.size());
- Iterator bridgeIterator = bridgeServers.iterator();
+ Iterator<?> bridgeIterator = bridgeServers.iterator();
CacheServer bridgeServer = (CacheServer) bridgeIterator.next();
logger.info("start Server cache server" + bridgeServer);
bridgeServer.start();
@@ -584,7 +586,7 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
static void stopServerHA() {
Wait.pause(1000);
- Iterator iter = cache.getCacheServers().iterator();
+ Iterator<?> iter = cache.getCacheServers().iterator();
if (iter.hasNext()) {
CacheServer server = (CacheServer) iter.next();
server.stop();
@@ -616,13 +618,13 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
void serverBucketFilterExecution(Set<Integer> bucketFilterSet) {
Region<Integer, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<Integer> testKeysSet = new HashSet<>();
for (int i = 150; i > 0; i--) {
testKeysSet.add(i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+ Function<?> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
if (shouldRegisterFunctionsOnClient()) {
FunctionService.registerFunction(function);
}
@@ -636,24 +638,24 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
ResultCollector<Integer, List<Integer>> rc =
dataSet.withBucketFilter(bucketFilterSet).execute(function.getId());
List<Integer> results = rc.getResult();
- assertEquals(bucketFilterSet.size(), results.size());
+ assertThat(results.size()).isEqualTo(bucketFilterSet.size());
for (Integer bucket : results) {
bucketFilterSet.remove(bucket);
}
- assertTrue(bucketFilterSet.isEmpty());
+ assertThat(bucketFilterSet).isEmpty();
}
void serverBucketFilterOverrideExecution(Set<Integer> bucketFilterSet,
Set<Integer> ketFilterSet) {
Region<Integer, Integer> region = cache.getRegion(PartitionedRegionName);
- assertNotNull(region);
+ assertThat(region).isNotNull();
final HashSet<Integer> testKeysSet = new HashSet<>();
for (int i = 150; i > 0; i--) {
testKeysSet.add(i);
}
DistributedSystem.setThreadsSocketPolicy(false);
- Function function = new TestFunction(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+ Function<?> function = new TestFunction<>(true,
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
if (shouldRegisterFunctionsOnClient()) {
FunctionService.registerFunction(function);
}
@@ -671,20 +673,21 @@ public class PRClientServerTestBase extends
JUnit4CacheTestCase {
ResultCollector<Integer, List<Integer>> rc =
dataSet.withBucketFilter(bucketFilterSet)
.withFilter(ketFilterSet).execute(function.getId());
List<Integer> results = rc.getResult();
- assertEquals(expectedBucketSet.size(), results.size());
+ assertThat(results.size()).isEqualTo(expectedBucketSet.size());
for (Integer bucket : results) {
expectedBucketSet.remove(bucket);
}
- assertTrue(expectedBucketSet.isEmpty());
+ assertThat(expectedBucketSet).isEmpty();
}
- public static class BucketFilterPRResolver implements PartitionResolver,
Serializable {
+ public static class BucketFilterPRResolver
+ implements PartitionResolver<Object, Object>, Serializable {
@Override
public void close() {}
@Override
- public Object getRoutingObject(EntryOperation opDetails) {
+ public Object getRoutingObject(EntryOperation<Object, Object> opDetails) {
Object key = opDetails.getKey();
return getBucketID(key);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
index 69a0450294..a63c8a1b52 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
@@ -16,6 +16,8 @@
package org.apache.geode.internal.cache.execute;
+import java.util.function.BiFunction;
+
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.execute.Function;
@@ -23,8 +25,10 @@ import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager;
import
org.apache.geode.internal.cache.partitioned.PartitionedRegionFunctionStreamingMessage;
import org.apache.geode.internal.serialization.KnownVersion;
@@ -43,7 +47,7 @@ public class PartitionedRegionFunctionResultSender implements
InternalResultSend
private static final Logger logger = LogService.getLogger();
- PartitionedRegionFunctionStreamingMessage msg = null;
+ private final PartitionedRegionFunctionStreamingMessage msg;
private final DistributionManager dm;
@@ -53,15 +57,15 @@ public class PartitionedRegionFunctionResultSender
implements InternalResultSend
private final boolean forwardExceptions;
- private ResultCollector rc;
+ private final ResultCollector rc;
- private ServerToClientFunctionResultSender serverSender;
+ private final ServerToClientFunctionResultSender serverSender;
private boolean localLastResultReceived = false;
- private boolean onlyLocal = false;
+ private final boolean onlyLocal;
- private boolean onlyRemote = false;
+ private final boolean onlyRemote;
private boolean completelyDoneFromRemote = false;
@@ -73,6 +77,7 @@ public class PartitionedRegionFunctionResultSender implements
InternalResultSend
private BucketMovedException bme;
+ private BiFunction<String, InternalDistributedSystem, FunctionStats>
functionStatsFunctionProvider;
public KnownVersion getClientVersion() {
if (serverSender != null && serverSender.sc != null) { // is a
client-server connection
@@ -81,41 +86,40 @@ public class PartitionedRegionFunctionResultSender
implements InternalResultSend
return null;
}
- /**
- * Have to combine next two constructor in one and make a new class which
will send Results back.
- *
- */
public PartitionedRegionFunctionResultSender(DistributionManager dm,
PartitionedRegion pr,
- long time, PartitionedRegionFunctionStreamingMessage msg, Function
function,
- int[] bucketArray) {
- this.msg = msg;
- this.dm = dm;
- this.pr = pr;
- this.time = time;
- this.function = function;
- this.bucketArray = bucketArray;
-
- forwardExceptions = false;
+ long time, PartitionedRegionFunctionStreamingMessage msg,
+ Function function, int[] bucketArray) {
+ this(dm, pr, time, null, null, false, false, false, function, bucketArray,
msg,
+ (x, y) -> FunctionStatsManager.getFunctionStats((String) x,
(InternalDistributedSystem) y));
}
- /**
- * Have to combine next two constructor in one and make a new class which
will send Results back.
- *
- */
public PartitionedRegionFunctionResultSender(DistributionManager dm,
PartitionedRegion partitionedRegion, long time, ResultCollector rc,
ServerToClientFunctionResultSender sender, boolean onlyLocal, boolean
onlyRemote,
boolean forwardExceptions, Function function, int[] bucketArray) {
+ this(dm, partitionedRegion, time, rc, sender, onlyLocal, onlyRemote,
forwardExceptions,
+ function, bucketArray, null,
+ (x, y) -> FunctionStatsManager.getFunctionStats((String) x,
(InternalDistributedSystem) y));
+ }
+
+ PartitionedRegionFunctionResultSender(DistributionManager dm,
+ PartitionedRegion partitionedRegion, long time, ResultCollector rc,
+ ServerToClientFunctionResultSender sender, boolean onlyLocal, boolean
onlyRemote,
+ boolean forwardExceptions, Function function, int[] bucketArray,
+ PartitionedRegionFunctionStreamingMessage msg,
+ BiFunction functionStatsFunctionProvider) {
this.dm = dm;
pr = partitionedRegion;
this.time = time;
this.rc = rc;
+ this.msg = msg;
serverSender = sender;
this.onlyLocal = onlyLocal;
this.onlyRemote = onlyRemote;
this.forwardExceptions = forwardExceptions;
this.function = function;
this.bucketArray = bucketArray;
+ this.functionStatsFunctionProvider = functionStatsFunctionProvider;
}
private void checkForBucketMovement(Object oneResult) {
@@ -201,7 +205,7 @@ public class PartitionedRegionFunctionResultSender
implements InternalResultSend
// call a synchronized method as local node is also waiting to send
lastResult
lastResult(oneResult, rc, false, true,
dm.getDistributionManagerId());
}
- FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+ functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
.incResultsReceived();
}
// incrementing result sent stats.
@@ -210,7 +214,7 @@ public class PartitionedRegionFunctionResultSender
implements InternalResultSend
// time the stats for the result sent is again incremented : Once the PR
team comes with the
// concept of the Streaming FunctionOperation
// for the partitioned Region then it will be simple to fix this problem.
- FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+ functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
.incResultsReturned();
}
}
@@ -319,14 +323,14 @@ public class PartitionedRegionFunctionResultSender
implements InternalResultSend
if (dm == null) {
FunctionStatsManager.getFunctionStats(function.getId()).incResultsReceived();
} else {
- FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+ functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
.incResultsReceived();
}
}
if (dm == null) {
FunctionStatsManager.getFunctionStats(function.getId()).incResultsReturned();
} else {
- FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+ functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
.incResultsReturned();
}
}
@@ -360,21 +364,31 @@ public class PartitionedRegionFunctionResultSender
implements InternalResultSend
"PartitionedRegionFunctionResultSender adding result to
ResultCollector on local node {}",
oneResult);
rc.addResult(dm.getDistributionManagerId(), oneResult);
- FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+ functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
.incResultsReceived();
}
// incrementing result sent stats.
- FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+ functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
.incResultsReturned();
}
}
private void clientSend(Object oneResult, DistributedMember memberID) {
- serverSender.sendResult(oneResult, memberID);
+ try {
+ serverSender.sendResult(oneResult, memberID);
+ } catch (FunctionException e) {
+ logger.warn("Exception when sending result to client", e);
+ setException(e);
+ }
}
private void lastClientSend(DistributedMember memberID, Object lastResult) {
- serverSender.lastResult(lastResult, memberID);
+ try {
+ serverSender.lastResult(lastResult, memberID);
+ } catch (FunctionException e) {
+ logger.warn("Exception when sending last result to client", e);
+ setException(e);
+ }
}
@Override
@@ -411,5 +425,4 @@ public class PartitionedRegionFunctionResultSender
implements InternalResultSend
public boolean isLastResultReceived() {
return localLastResultReceived;
}
-
}
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java
new file mode 100644
index 0000000000..9ac049d663
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static
org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultSenderTest.MethodToInvoke.LAST_RESULT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@RunWith(GeodeParamsRunner.class)
+public class PartitionedRegionFunctionResultSenderTest {
+ private DistributionManager dm = mock(DistributionManager.class);
+ private PartitionedRegion region = mock(PartitionedRegion.class);
+ private PartitionedRegionDataStore dataStore =
mock(PartitionedRegionDataStore.class);
+ private ATestResultCollector rc = new ATestResultCollector();
+ private ServerToClientFunctionResultSender
serverToClientFunctionResultSender =
+ mock(ServerToClientFunctionResultSender.class);
+ private FunctionStats functionStats = mock(FunctionStats.class);
+
+ enum MethodToInvoke {
+ SEND_EXCEPTION, LAST_RESULT
+ }
+
+ @BeforeEach
+ public void setUp() {
+ when(region.getDataStore()).thenReturn(dataStore);
+ when(dataStore.areAllBucketsHosted(any())).thenReturn(true);
+ }
+
+ @Test
+ public void
whenResponseToClientInLastResultFailsEndResultsIsCalled_NotOnlyLocal_OnlyRemote()
{
+ doThrow(new FunctionException()).when(serverToClientFunctionResultSender)
+ .lastResult(any(), any());
+ PartitionedRegionFunctionResultSender sender =
+ new PartitionedRegionFunctionResultSender(dm,
+ region, 1, rc, serverToClientFunctionResultSender, false, true,
true,
+ new TestFunction(), new int[2], null, (x, y) -> functionStats);
+
+ sender.lastResult(new Object(), true, rc, null);
+
+ assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+ }
+
+ @ParameterizedTest(name = "{displayName} with {arguments}")
+ @EnumSource(MethodToInvoke.class)
+ public void
whenResponseToClientInLastResultFailsEndResultsIsCalled_OnlyLocal_NotOnlyRemote(
+ MethodToInvoke methodToInvoke) {
+ doThrow(new
FunctionException("IOException")).when(serverToClientFunctionResultSender)
+ .lastResult(any(), any());
+
+ PartitionedRegionFunctionResultSender sender =
+ new PartitionedRegionFunctionResultSender(dm,
+ region, 1, rc, serverToClientFunctionResultSender, true, false,
true,
+ new TestFunction(), new int[2]);
+
+ if (methodToInvoke == LAST_RESULT) {
+ sender.lastResult(new Object());
+ } else {
+ sender.sendException(new Exception());
+ }
+
+ assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+ }
+
+ @ParameterizedTest(name = "{displayName} with {arguments}")
+ @EnumSource(MethodToInvoke.class)
+ public void
whenResponseToClientInSendResultFailsEndResultsIsCalled_NotOnlyLocal_OnlyRemote(
+ MethodToInvoke methodToInvoke) {
+ doThrow(new
FunctionException("IOException")).when(serverToClientFunctionResultSender)
+ .sendResult(any(), any());
+ PartitionedRegionFunctionResultSender sender =
+ new PartitionedRegionFunctionResultSender(dm,
+ region, 1, rc, serverToClientFunctionResultSender, false, true,
true,
+ new TestFunction(), new int[2], null, (x, y) -> functionStats);
+
+ if (methodToInvoke == LAST_RESULT) {
+ sender.lastResult(new Object());
+ } else {
+ sender.sendException(new Exception());
+ }
+
+ assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+ }
+
+ @ParameterizedTest(name = "{displayName} with {arguments}")
+ @EnumSource(MethodToInvoke.class)
+ public void
whenResponseToClientInSendResultFailsEndResultsIsCalled_NotOnlyLocal_NotOnlyRemote(
+ MethodToInvoke methodToInvoke) {
+ doThrow(new
FunctionException("IOException")).when(serverToClientFunctionResultSender)
+ .sendResult(any(), any());
+ PartitionedRegionFunctionResultSender sender =
+ new PartitionedRegionFunctionResultSender(dm,
+ region, 1, rc, serverToClientFunctionResultSender, false, false,
true,
+ new TestFunction(), new int[2], null, (x, y) -> functionStats);
+
+ if (methodToInvoke == LAST_RESULT) {
+ sender.lastResult(new Object(), true, rc, null);
+ } else {
+ sender.sendException(new Exception());
+ }
+
+ assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+ }
+
+ private static class TestFunction implements Function {
+ @Override
+ public void execute(FunctionContext context) {}
+ }
+
+ private static class ATestResultCollector implements ResultCollector {
+ private volatile boolean isEndResultsCalled = false;
+
+ @Override
+ public Object getResult() throws FunctionException {
+ return null;
+ }
+
+ @Override
+ public Object getResult(long timeout, TimeUnit unit)
+ throws FunctionException, InterruptedException {
+ return null;
+ }
+
+ @Override
+ public void addResult(DistributedMember memberID, Object
resultOfSingleExecution) {}
+
+ @Override
+ public void endResults() {
+ isEndResultsCalled = true;
+ }
+
+ @Override
+ public void clearResults() {}
+
+ public boolean isEndResultsCalled() {
+ return isEndResultsCalled;
+ }
+ }
+}
diff --git
a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
index 951a9b7ca1..a46840fd0e 100755
---
a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
+++
b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
@@ -99,6 +99,7 @@ public class TestFunction<T> implements Function<T>,
Declarable2, DataSerializab
public static final String TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP =
"executeFunctionSingleHopForceNetworkHop";
public static final String TEST_FUNCTION_GET_NETWORK_HOP =
"executeFunctionGetNetworkHop";
+ public static final String TEST_FUNCTION_SLOW = "SlowFunction";
private static final String ID = "id";
private static final String HAVE_RESULTS = "haveResults";
private final Properties props;
@@ -197,6 +198,8 @@ public class TestFunction<T> implements Function<T>,
Declarable2, DataSerializab
executeSingleHopForceNetworkHop(context);
} else if (id.equals(TEST_FUNCTION_GET_NETWORK_HOP)) {
executeGetNetworkHop(context);
+ } else if (id.equals(TEST_FUNCTION_SLOW)) {
+ executeSlowFunction(context);
} else if (noAckTest.equals("true")) {
execute1(context);
}
@@ -1041,6 +1044,22 @@ public class TestFunction<T> implements Function<T>,
Declarable2, DataSerializab
context.getResultSender().lastResult(networkHopType);
}
+ private void executeSlowFunction(FunctionContext context) {
+ int entries = 4;
+ int waitBetweenEntriesMs = 5000;
+ for (int i = 0; i < entries; i++) {
+ try {
+ Thread.sleep(waitBetweenEntriesMs);
+ } catch (InterruptedException e) {
+ context.getResultSender().sendException(e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ context.getResultSender().sendResult(i);
+ }
+ context.getResultSender().lastResult(entries);
+ }
+
/**
* Get the function identifier, used by clients to invoke this function
*
@@ -1096,12 +1115,12 @@ public class TestFunction<T> implements Function<T>,
Declarable2, DataSerializab
@Override
public boolean isHA() {
-
if (getId().equals(TEST_FUNCTION10)) {
return true;
}
if (getId().equals(TEST_FUNCTION_NONHA_SERVER) ||
getId().equals(TEST_FUNCTION_NONHA_REGION)
- || getId().equals(TEST_FUNCTION_NONHA_NOP) ||
getId().equals(TEST_FUNCTION_NONHA)) {
+ || getId().equals(TEST_FUNCTION_NONHA_NOP) ||
getId().equals(TEST_FUNCTION_NONHA)
+ || getId().equals(TEST_FUNCTION_SLOW)) {
return false;
}
return Boolean.parseBoolean(props.getProperty(HAVE_RESULTS));
diff --git
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
index bce7ed9ade..f23ee9ad23 100644
---
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
+++
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
@@ -66,6 +66,7 @@ public class ServerStarterRule extends
MemberStarterRule<ServerStarterRule> impl
private PdxSerializer pdxSerializer = null;
private boolean pdxReadSerialized = false;
private boolean pdxReadSerializedUserSet = false;
+ private int maxThreads = -1;
// By default we start one server per jvm
private int serverCount = 1;
@@ -123,6 +124,11 @@ public class ServerStarterRule extends
MemberStarterRule<ServerStarterRule> impl
servers.clear();
}
+ public ServerStarterRule withMaxThreads(int maxThreads) {
+ this.maxThreads = maxThreads;
+ return this;
+ }
+
public ServerStarterRule withPDXPersistent() {
pdxPersistent = true;
pdxPersistentUserSet = true;
@@ -219,6 +225,9 @@ public class ServerStarterRule extends
MemberStarterRule<ServerStarterRule> impl
} else {
server.setPort(0);
}
+ if (maxThreads >= 0) {
+ server.setMaxThreads(maxThreads);
+ }
try {
server.start();
} catch (IOException e) {
diff --git
a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
index 81f334b6bf..70a8520acc 100644
---
a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
+++
b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
@@ -180,4 +180,4 @@
org/apache/geode/test/junit/rules/LocatorLauncherStartupRule,false,autoStart:boo
org/apache/geode/test/junit/rules/LocatorStarterRule,false
org/apache/geode/test/junit/rules/MemberStarterRule,false,autoStart:boolean,availableHttpPort:int,availableJmxPort:int,cleanWorkingDir:boolean,firstLevelChildrenFile:java/util/List,httpPort:int,jmxPort:int,logFile:boolean,memberPort:int,name:java/lang/String,properties:java/util/Properties,restore:org/apache/geode/test/junit/rules/accessible/AccessibleRestoreSystemProperties,systemProperties:java/util/Properties
org/apache/geode/test/junit/rules/ServerLauncherStartupRule,false,autoStart:boolean,builderOperator:java/util/function/UnaryOperator,launcher:org/apache/geode/distributed/ServerLauncher,properties:java/util/Properties,temp:org/junit/rules/TemporaryFolder
-org/apache/geode/test/junit/rules/ServerStarterRule,false,availableLocatorPort:int,embeddedLocatorPort:int,pdxPersistent:boolean,pdxPersistentUserSet:boolean,pdxReadSerialized:boolean,pdxReadSerializedUserSet:boolean,pdxSerializer:org/apache/geode/pdx/PdxSerializer,regions:java/util/Map,serverCount:int
+org/apache/geode/test/junit/rules/ServerStarterRule,false,availableLocatorPort:int,embeddedLocatorPort:int,maxThreads:int,pdxPersistent:boolean,pdxPersistentUserSet:boolean,pdxReadSerialized:boolean,pdxReadSerializedUserSet:boolean,pdxSerializer:org/apache/geode/pdx/PdxSerializer,regions:java/util/Map,serverCount:int