This is an automated email from the ASF dual-hosted git repository.

alberto pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7ccdc8297b GEODE-10155: Avoid threads hanging when function execution 
times-out (#7493)
7ccdc8297b is described below

commit 7ccdc8297b1ded06175f41543884d945d5995c60
Author: Alberto Gomez <alberto.go...@est.tech>
AuthorDate: Mon Jun 6 20:18:46 2022 +0200

    GEODE-10155: Avoid threads hanging when function execution times-out (#7493)
    
    * GEODE-10155: Avoid threads hanging when function execution times-out
    
    * GEODE-10155: Updated after review
    
    * GEODE-10155: More changes after review
    
    * GEODE-10155: Changes after more reviews
    
    * GEODE-10155: Some more changes after review
    
    * GEODE-10155: More changes after review
    
    * GEODE-10155: More clean-up after review
---
 ...nctionExecutionNoSingleHopDistributedTest.java} | 895 ++++++++++-----------
 .../cache/execute/PRClientServerTestBase.java      | 351 ++++----
 .../PartitionedRegionFunctionResultSender.java     |  77 +-
 .../PartitionedRegionFunctionResultSenderTest.java | 174 ++++
 .../internal/cache/functions/TestFunction.java     |  23 +-
 .../geode/test/junit/rules/ServerStarterRule.java  |   9 +
 .../sanctioned-geode-dunit-serializables.txt       |   2 +-
 7 files changed, 864 insertions(+), 667 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java
similarity index 53%
rename from 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java
rename to 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java
index a76a7157b8..1c66b3d2e0 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest.java
@@ -14,10 +14,9 @@
  */
 package org.apache.geode.internal.cache.execute;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -27,6 +26,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
 
 import org.apache.logging.log4j.Logger;
 import org.junit.Test;
@@ -37,6 +38,7 @@ import 
org.junit.runners.Parameterized.UseParametersRunnerFactory;
 
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionAdapter;
@@ -50,11 +52,10 @@ import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.functions.TestFunction;
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
-import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.dunit.Wait;
 import org.apache.geode.test.dunit.WaitCriterion;
 import org.apache.geode.test.junit.categories.ClientServerTest;
@@ -64,7 +65,7 @@ import 
org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactor
 @Category({ClientServerTest.class, FunctionServiceTest.class})
 @RunWith(Parameterized.class)
 @UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public class PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+public class PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
     extends PRClientServerTestBase {
 
   private static final Logger logger = LogService.getLogger();
@@ -77,7 +78,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
 
   private static final int retryCount = 0;
 
-  public PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest() {
+  public PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest() {
     super();
   }
 
@@ -87,10 +88,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testServerAllKeyExecution_byInstance() {
     createScenario();
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     registerFunctionAtServer(function);
     isByName = Boolean.FALSE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverAllKeyExecution(isByName));
   }
 
@@ -100,7 +101,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testServerGetAllFunction() {
     createScenario();
-    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::getAll);
+    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::getAll);
   }
 
   /*
@@ -109,7 +110,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testServerPutAllFunction() {
     createScenario();
-    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putAll);
+    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putAll);
   }
 
   /*
@@ -119,10 +120,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testServerSingleKeyExecution_byName() {
     createScenario();
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     registerFunctionAtServer(function);
     isByName = Boolean.TRUE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverSingleKeyExecution(isByName));
   }
 
@@ -135,7 +136,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   public void testserverSingleKeyExecution_FunctionInvocationTargetException() 
{
     createScenario();
     client.invoke(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverSingleKeyExecution_FunctionInvocationTargetException);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverSingleKeyExecution_FunctionInvocationTargetException);
   }
 
   /*
@@ -144,7 +145,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testBucketFilter() {
     createScenarioForBucketFilter();
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
     registerFunctionAtServer(function);
 
     Set<Integer> bucketFilterSet = new HashSet<>();
@@ -161,7 +162,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testBucketFilterOverride() {
     createScenarioForBucketFilter();
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
     registerFunctionAtServer(function);
     // test multi key filter
     Set<Integer> bucketFilterSet = new HashSet<>();
@@ -181,10 +182,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testServerSingleKeyExecution_SocketTimeOut() {
     createScenario();
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
     registerFunctionAtServer(function);
     isByName = Boolean.TRUE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverSingleKeyExecutionSocketTimeOut(isByName));
   }
 
@@ -195,10 +196,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testServerSingleKeyExecution_byInstance() {
     createScenario();
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     registerFunctionAtServer(function);
     isByName = Boolean.FALSE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverSingleKeyExecution(isByName));
   }
 
@@ -209,7 +210,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   public void testServerSingleKeyExecution_byInlineFunction() {
     createScenario();
     client.invoke(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverSingleKeyExecution_Inline);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverSingleKeyExecution_Inline);
   }
 
   /*
@@ -219,26 +220,26 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testserverMultiKeyExecution_byName() {
     createScenario();
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     registerFunctionAtServer(function);
     isByName = Boolean.TRUE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverMultiKeyExecution(isByName));
     server1.invoke(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer);
     server2.invoke(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer);
     server3.invoke(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::checkBucketsOnServer);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::checkBucketsOnServer);
   }
 
   @Test
-  public void testserverMultiKeyExecution_SocektTimeOut() {
+  public void testserverMultiKeyExecution_SocketTimeOut() {
     createScenario();
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
     registerFunctionAtServer(function);
     isByName = Boolean.TRUE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverMultiKeyExecutionSocketTimeOut(isByName));
   }
 
@@ -249,7 +250,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   public void testserverMultiKeyExecution_byInlineFunction() {
     createScenario();
     client.invoke(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverMultiKeyExecution_Inline);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverMultiKeyExecution_Inline);
   }
 
   /*
@@ -262,7 +263,7 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   public void testserverMultiKeyExecution_FunctionInvocationTargetException() {
     createScenario();
     client.invoke(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::serverMultiKeyExecution_FunctionInvocationTargetException);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::serverMultiKeyExecution_FunctionInvocationTargetException);
   }
 
   /*
@@ -272,10 +273,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testserverMultiKeyExecutionNoResult_byName() {
     createScenario();
-    Function function = new TestFunction(false, TEST_FUNCTION7);
+    Function<Object> function = new TestFunction<>(false, TEST_FUNCTION7);
     registerFunctionAtServer(function);
     isByName = Boolean.TRUE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverMultiKeyExecutionNoResult(isByName));
   }
 
@@ -286,10 +287,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testserverMultiKeyExecution_byInstance() {
     createScenario();
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     registerFunctionAtServer(function);
     isByName = Boolean.FALSE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverMultiKeyExecution(isByName));
   }
 
@@ -300,10 +301,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testserverMultiKeyExecutionOnASingleBucket_byName() {
     createScenario();
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     registerFunctionAtServer(function);
     isByName = Boolean.TRUE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverMultiKeyExecutionOnASingleBucket(isByName));
   }
 
@@ -314,10 +315,10 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   @Test
   public void testserverMultiKeyExecutionOnASingleBucket_byInstance() {
     createScenario();
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     registerFunctionAtServer(function);
     isByName = Boolean.FALSE;
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .serverMultiKeyExecutionOnASingleBucket(isByName));
   }
 
@@ -326,33 +327,30 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
    * failover to other available server
    */
   @Test
-  public void testServerFailoverWithTwoServerAliveHA() {
+  public void testServerFailoverWithTwoServerAliveHA() throws 
InterruptedException {
     IgnoredException.addIgnoredException("FunctionInvocationTargetException");
-    ArrayList commonAttributes =
+    ArrayList<Object> commonAttributes =
         createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
     createClientServerScenarion(commonAttributes, 20, 20, 20);
-    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_HA);
     registerFunctionAtServer(function);
-    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putOperation);
+    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putOperation);
 
     int AsyncInvocationArrSize = 1;
-    AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
+    AsyncInvocation<?>[] async = new 
AsyncInvocation<?>[AsyncInvocationArrSize];
     async[0] = client.invokeAsync(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::executeFunctionHA);
-    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-    
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::executeFunctionHA);
+    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+    
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
     client.invoke(() -> PRClientServerRegionFunctionExecutionDUnitTest
         .verifyDeadAndLiveServers(2));
-    ThreadUtils.join(async[0], 6 * 60 * 1000);
-    if (async[0].getException() != null) {
-      Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
-    }
-    List l = (List) async[0].getReturnValue();
 
-    assertEquals(2, l.size());
+    List<?> l = (List<?>) async[0].get();
+
+    assertThat(l).hasSize(2);
   }
 
   /*
@@ -360,31 +358,28 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
    * failover to other available server
    */
   @Test
-  public void testServerCacheClosedFailoverWithTwoServerAliveHA() {
+  public void testServerCacheClosedFailoverWithTwoServerAliveHA() throws 
InterruptedException {
     IgnoredException.addIgnoredException("FunctionInvocationTargetException");
-    ArrayList commonAttributes =
+    ArrayList<Object> commonAttributes =
         createCommonServerAttributes("TestPartitionedRegion", null, 1, null);
     createClientServerScenarion(commonAttributes, 20, 20, 20);
-    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_HA);
     registerFunctionAtServer(function);
-    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::stopServerHA);
-    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::putOperation);
+    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::stopServerHA);
+    
client.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::putOperation);
     int AsyncInvocationArrSize = 1;
-    AsyncInvocation[] async = new AsyncInvocation[AsyncInvocationArrSize];
+    AsyncInvocation<?>[] async = new 
AsyncInvocation<?>[AsyncInvocationArrSize];
     async[0] = client.invokeAsync(
-        
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::executeFunctionHA);
-    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::startServerHA);
-    
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::closeCacheHA);
-    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
+        
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::executeFunctionHA);
+    
server2.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+    
server3.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::startServerHA);
+    
server1.invoke(PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::closeCacheHA);
+    client.invoke(() -> 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest
         .verifyDeadAndLiveServers(2));
-    ThreadUtils.join(async[0], 5 * 60 * 1000);
-    if (async[0].getException() != null) {
-      Assert.fail("UnExpected Exception Occurred : ", async[0].getException());
-    }
-    List l = (List) async[0].getReturnValue();
-    assertEquals(2, l.size());
+
+    List<?> l = (List<?>) async[0].get();
+    assertThat(l).hasSize(2);
   }
 
   @Test
@@ -392,20 +387,90 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
     createScenario();
     server1
         .invoke(
-            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
     server1
         .invoke(
-            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
     server1
         .invoke(
-            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
     client
         .invoke(
-            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest::registerFunction);
+            (SerializableRunnableIF) 
PRClientServerRegionFunctionExecutionNoSingleHopDistributedTest::registerFunction);
     client.invoke(
         
PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714);
   }
 
+  /**
+   * This test case verifies that if the execution of a function handled
+   * by a Function Execution thread times out at the client, the 
ServerConnection
+   * thread will eventually be released.
+   * In order to test this, a slow function will be executed by a client
+   * with a small time-out a number of times equal to the number of servers
+   * in the cluster * the max number of threads configured.
+   * After the function executions have timed-out, another request will be
+   * sent by the client to any server and it should be served timely.
+   * If the ServerConnection threads had not been released, this new
+   * request will never be served because there would be not ServerConnection
+   * threads available and the test case will time-out.
+   */
+  @Test
+  public void 
testClientFunctionExecutionTimingOutDoesNotLeaveServerConnectionThreadsHanged() 
{
+    // Set client connect-timeout to a very high value so that if there are no
+    // ServerConnection threads available the test will time-out before the 
client times-out.
+    int connectTimeout = (int) (GeodeAwaitility.getTimeout().toMillis() * 2);
+    int maxThreads = 2;
+    createScenarioWithClientConnectTimeout(connectTimeout, maxThreads);
+
+    // The function must be executed a number of times equal
+    // to the number of servers * the max-threads, to check if all the
+    // threads are hanged.
+    int executions = (3 * maxThreads);
+
+    // functionTimeoutSecs should be lower than the
+    // time taken by the slow function to return all
+    // the results
+    int functionTimeoutSecs = 2;
+
+    Function<?> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_SLOW);
+    registerFunctionAtServer(function);
+
+    // Run the function that will time-out at the client
+    // the number of specified times.
+    IntStream.range(0, executions)
+        .forEach(i -> assertThatThrownBy(() -> client
+            .invoke(() -> executeSlowFunctionOnRegionNoFilter(function, 
PartitionedRegionName,
+                functionTimeoutSecs)))
+                    
.getCause().getCause().isInstanceOf(ServerConnectivityException.class));
+
+    // Make sure that the get returns timely. If it hangs, it means
+    // that there are no threads available in the servers to handle the
+    // request because they were hanged due to the previous function
+    // executions.
+    await().until(() -> {
+      client.invoke(() -> executeGet(PartitionedRegionName, "key"));
+      return true;
+    });
+  }
+
+  private Object executeGet(String regionName, Object key) {
+    Region<?, ?> region = cache.getRegion(regionName);
+    return region.get(key);
+  }
+
+  private Object executeSlowFunctionOnRegionNoFilter(Function<?> function, 
String regionName,
+      int functionTimeoutSecs) {
+    FunctionService.registerFunction(function);
+    Region<?, ?> region = cache.getRegion(regionName);
+
+    Execution execution = FunctionService.onRegion(region);
+
+    Object[] args = {Boolean.TRUE};
+    return execution.setArguments(args).execute(function.getId(), 
functionTimeoutSecs,
+        TimeUnit.SECONDS).getResult();
+  }
+
+
   public static void registerFunction() {
     FunctionService.registerFunction(new FunctionAdapter() {
       @Override
@@ -457,57 +522,56 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
 
   public static void executeFunction() {
 
-    Region region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    Region<Object, Object> region = cache.getRegion(PartitionedRegionName);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 10); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
     try {
-      ResultCollector rc1 =
+      ResultCollector<?, ?> rc1 =
           
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId());
 
-      HashMap resultMap = ((HashMap) rc1.getResult());
-      assertEquals(3, resultMap.size());
+      HashMap<?, ?> resultMap = ((HashMap<?, ?>) rc1.getResult());
+      assertThat(resultMap).hasSize(3);
 
-      for (Object o : resultMap.entrySet()) {
-        Map.Entry entry = (Map.Entry) o;
-        ArrayList resultListForMember = (ArrayList) entry.getValue();
+      for (Map.Entry<?, ?> o : resultMap.entrySet()) {
+        ArrayList<?> resultListForMember = (ArrayList<?>) o.getValue();
 
         for (Object result : resultListForMember) {
-          assertEquals(Boolean.TRUE, result);
+          assertThat(result).isEqualTo(true);
         }
       }
     } catch (Exception e) {
       logger.info("Got an exception : " + e.getMessage());
-      assertTrue(e instanceof CacheClosedException);
+      assertThat(e).isInstanceOf(CacheClosedException.class);
     }
   }
 
   private static Object executeFunctionHA() {
-    Region region = cache.getRegion(PartitionedRegionName);
+    Region<Object, Object> region = cache.getRegion(PartitionedRegionName);
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 10); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(true, TestFunction.TEST_FUNCTION_HA);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_HA);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
-    ResultCollector rc1 =
+    ResultCollector<?, ?> rc1 =
         
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(function.getId());
-    List l = ((List) rc1.getResult());
+    List<?> l = ((List<?>) rc1.getResult());
     logger.info("Result size : " + l.size());
     return l;
   }
 
   private static void putOperation() {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 10); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
@@ -520,132 +584,119 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
   }
 
   private void createScenario() {
-    ArrayList commonAttributes =
+    ArrayList<Object> commonAttributes =
         createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
     createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20);
   }
 
+  private void createScenarioWithClientConnectTimeout(int connectTimeout, int 
maxThreads) {
+    ArrayList<Object> commonAttributes =
+        createCommonServerAttributes("TestPartitionedRegion", null, 0, null);
+    createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20, 
maxThreads, connectTimeout);
+  }
+
+
   private void createScenarioForBucketFilter() {
-    ArrayList commonAttributes = 
createCommonServerAttributes("TestPartitionedRegion",
+    ArrayList<Object> commonAttributes = 
createCommonServerAttributes("TestPartitionedRegion",
         new BucketFilterPRResolver(), 0, null);
     createClientServerScenarioNoSingleHop(commonAttributes, 20, 20, 20);
   }
 
   private static void checkBucketsOnServer() {
     PartitionedRegion region = (PartitionedRegion) 
cache.getRegion(PartitionedRegionName);
-    HashMap localBucket2RegionMap = (HashMap) 
region.getDataStore().getSizeLocally();
+    HashMap<Integer, Integer> localBucket2RegionMap =
+        (HashMap<Integer, Integer>) region.getDataStore().getSizeLocally();
     logger.info(
         "Size of the " + PartitionedRegionName + " in this VM :- " + 
localBucket2RegionMap.size());
-    Set entrySet = localBucket2RegionMap.entrySet();
-    assertNotNull(entrySet);
+    Set<Map.Entry<Integer, Integer>> entrySet = 
localBucket2RegionMap.entrySet();
+    assertThat(entrySet).isNotNull();
   }
 
   private static void serverAllKeyExecution(Boolean isByName) {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets / 2); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
-    try {
-      int j = 0;
-      HashSet<Integer> origVals = new HashSet<>();
-      for (String item : testKeysSet) {
-        Integer val = j++;
-        origVals.add(val);
-        region.put(item, val);
-      }
-      ResultCollector rc1 = executeOnAll(dataSet, Boolean.TRUE, function, 
isByName);
-      List resultList = (List) rc1.getResult();
-      logger.info("Result size : " + resultList.size());
-      logger.info("Result are SSSS : " + resultList);
-      assertEquals(3, resultList.size());
-
-      for (Object result : resultList) {
-        assertEquals(Boolean.TRUE, result);
-      }
-      ResultCollector rc2 = executeOnAll(dataSet, testKeysSet, function, 
isByName);
-      List l2 = ((List) rc2.getResult());
-      assertEquals(3, l2.size());
-      HashSet<Integer> foundVals = new HashSet<>();
-      for (Object value : l2) {
-        ArrayList subL = (ArrayList) (value);
-        assertTrue(subL.size() > 0);
-        for (Object o : subL) {
-          assertTrue(foundVals.add((Integer) o));
-        }
-      }
-      assertEquals(origVals, foundVals);
 
-    } catch (Exception e) {
-      Assert.fail("Test failed after the put operation", e);
+    int j = 0;
+    HashSet<Integer> origVals = new HashSet<>();
+    for (String item : testKeysSet) {
+      Integer val = j++;
+      origVals.add(val);
+      region.put(item, val);
+    }
+    ResultCollector<?, ?> rc1 = executeOnAll(dataSet, Boolean.TRUE, function, 
isByName);
+    List<?> resultList = (List<?>) rc1.getResult();
+    assertThat(resultList).hasSize(3);
 
+    for (Object result : resultList) {
+      assertThat(result).isEqualTo(true);
     }
+    ResultCollector<?, ?> rc2 = executeOnAll(dataSet, testKeysSet, function, 
isByName);
+    List<?> l2 = (List<?>) rc2.getResult();
+    assertThat(l2).hasSize(3);
+    HashSet<Integer> foundVals = new HashSet<>();
+    for (Object value : l2) {
+      List<?> subL = (List<?>) value;
+      assertThat(subL).hasSizeGreaterThan(0);
+      for (Object o : subL) {
+        assertThat(foundVals.add((Integer) o)).isTrue();
+      }
+    }
+    assertThat(foundVals).containsExactlyInAnyOrderElementsOf(origVals);
   }
 
   public static void getAll() {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final List<String> testKeysList = new ArrayList<>();
     for (int i = (totalNumBuckets * 3); i > 0; i--) {
       testKeysList.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    try {
-      int j = 0;
-      Map<String, Integer> origVals = new HashMap<>();
-      for (String key : testKeysList) {
-        Integer val = j++;
-        origVals.put(key, val);
-        region.put(key, val);
-      }
-      Map resultMap = region.getAll(testKeysList);
-      assertEquals(resultMap, origVals);
-      Wait.pause(2000);
-      Map secondResultMap = region.getAll(testKeysList);
-      assertEquals(secondResultMap, origVals);
-
-    } catch (Exception e) {
-      Assert.fail("Test failed after the put operation", e);
-
+    int j = 0;
+    Map<String, Integer> origVals = new HashMap<>();
+    for (String key : testKeysList) {
+      Integer val = j++;
+      origVals.put(key, val);
+      region.put(key, val);
     }
+    Map<String, Integer> resultMap = region.getAll(testKeysList);
+    assertThat(resultMap).containsExactlyInAnyOrderEntriesOf(origVals);
+    await().untilAsserted(
+        () -> 
assertThat(region.getAll(testKeysList)).containsExactlyInAnyOrderEntriesOf(origVals));
   }
 
   public static void putAll() {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final List<String> testKeysList = new ArrayList<>();
     for (int i = (totalNumBuckets * 3); i > 0; i--) {
       testKeysList.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    try {
-      int j = 0;
-      Map<String, Integer> origVals = new HashMap<>();
-      for (String key : testKeysList) {
-        Integer val = j++;
-        origVals.put(key, val);
-        region.put(key, val);
-      }
-      Map resultMap = region.getAll(testKeysList);
-      assertEquals(resultMap, origVals);
-      Wait.pause(2000);
-      Map secondResultMap = region.getAll(testKeysList);
-      assertEquals(secondResultMap, origVals);
-
-    } catch (Exception e) {
-      Assert.fail("Test failed after the put operation", e);
-
+    int j = 0;
+    Map<String, Integer> origVals = new HashMap<>();
+    for (String key : testKeysList) {
+      Integer val = j++;
+      origVals.put(key, val);
+      region.put(key, val);
     }
+    Map<String, Integer> resultMap = region.getAll(testKeysList);
+    assertThat(resultMap).containsExactlyInAnyOrderEntriesOf(origVals);
+    await().untilAsserted(
+        () -> 
assertThat(region.getAll(testKeysList)).containsExactlyInAnyOrderEntriesOf(origVals));
   }
 
   private static void serverMultiKeyExecutionOnASingleBucket(Boolean isByName) 
{
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 2); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
@@ -656,192 +707,163 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
       region.put(value, val);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    for (String o : testKeysSet) {
-      try {
-        Set<String> singleKeySet = Collections.singleton(o);
-        Function function = new TestFunction(true, TEST_FUNCTION2);
-        FunctionService.registerFunction(function);
-        Execution dataSet = FunctionService.onRegion(region);
-        ResultCollector rc1 = execute(dataSet, singleKeySet, Boolean.TRUE, 
function, isByName);
-        List l = ((List) rc1.getResult());
-        assertEquals(1, l.size());
-
-        ResultCollector rc2 =
-            execute(dataSet, singleKeySet, new HashSet<>(singleKeySet), 
function, isByName);
-        List l2 = ((List) rc2.getResult());
-
-        assertEquals(1, l2.size());
-        List subList = (List) l2.iterator().next();
-        assertEquals(1, subList.size());
-        assertEquals(region.get(singleKeySet.iterator().next()), 
subList.iterator().next());
-      } catch (Exception expected) {
-        logger.info("Exception : " + expected.getMessage());
-        expected.printStackTrace();
-        fail("Test failed after the put operation");
-      }
+    for (String key : testKeysSet) {
+      Set<String> singleKeySet = Collections.singleton(key);
+      Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
+      FunctionService.registerFunction(function);
+      Execution dataSet = FunctionService.onRegion(region);
+      ResultCollector<?, ?> rc1 = execute(dataSet, singleKeySet, Boolean.TRUE, 
function, isByName);
+      List<?> list1 = (List<?>) rc1.getResult();
+      assertThat(list1).hasSize(1);
+
+      ResultCollector<?, ?> rc2 =
+          execute(dataSet, singleKeySet, new HashSet<>(singleKeySet), 
function, isByName);
+      List<?> list2 = (List<?>) rc2.getResult();
+
+      assertThat(list2).hasSize(1);
+      List<Integer> subList = (List<Integer>) list2.iterator().next();
+      assertThat(subList).hasSize(1);
+      
assertThat(subList).containsOnly(region.get(singleKeySet.iterator().next()));
     }
   }
 
   private static void serverMultiKeyExecution(Boolean isByName) {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 2); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
-    try {
-      int j = 0;
-      HashSet<Integer> origVals = new HashSet<>();
-      for (String element : testKeysSet) {
-        Integer val = j++;
-        origVals.add(val);
-        region.put(element, val);
-      }
-      ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
-      List l = ((List) rc1.getResult());
-      logger.info("Result size : " + l.size());
-      assertEquals(3, l.size());
-      for (Object item : l) {
-        assertEquals(Boolean.TRUE, item);
-      }
-
-      ResultCollector rc2 = execute(dataSet, testKeysSet, testKeysSet, 
function, isByName);
-      List l2 = ((List) rc2.getResult());
-      assertEquals(3, l2.size());
-      HashSet<Integer> foundVals = new HashSet<>();
-      for (Object value : l2) {
-        ArrayList subL = (ArrayList) value;
-        assertTrue(subL.size() > 0);
-        for (Object o : subL) {
-          assertTrue(foundVals.add((Integer) o));
-        }
-      }
-      assertEquals(origVals, foundVals);
 
-    } catch (Exception e) {
-      Assert.fail("Test failed after the put operation", e);
+    int j = 0;
+    HashSet<Integer> origVals = new HashSet<>();
+    for (String element : testKeysSet) {
+      Integer val = j++;
+      origVals.add(val);
+      region.put(element, val);
+    }
+    ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
+    List<?> l = (List<?>) rc1.getResult();
+    assertThat(l).hasSize(3);
+    for (Object item : l) {
+      assertThat(item).isEqualTo(true);
+    }
 
+    ResultCollector<?, ?> rc2 = execute(dataSet, testKeysSet, testKeysSet, 
function, isByName);
+    List<?> l2 = (List<?>) rc2.getResult();
+    assertThat(l2).hasSize(3);
+    HashSet<Integer> foundVals = new HashSet<>();
+    for (Object value : l2) {
+      List<?> subL = (List<?>) value;
+      assertThat(subL).hasSizeGreaterThan(0);
+      for (Object o : subL) {
+        assertThat(foundVals.add((Integer) o)).isTrue();
+      }
     }
+    assertThat(foundVals).containsExactlyInAnyOrderElementsOf(origVals);
   }
 
 
   private static void serverMultiKeyExecutionSocketTimeOut(Boolean isByName) {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 2); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
-    try {
-      int j = 0;
-      for (String value : testKeysSet) {
-        Integer val = j++;
-        region.put(value, val);
-      }
-      ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
-      List l = ((List) rc1.getResult());
-      logger.info("Result size : " + l.size());
-      assertEquals(3, l.size());
-      for (Object o : l) {
-        assertEquals(Boolean.TRUE, o);
-      }
-
-    } catch (Exception e) {
-      Assert.fail("Test failed after the function execution", e);
 
+    int j = 0;
+    for (String value : testKeysSet) {
+      Integer val = j++;
+      region.put(value, val);
+    }
+    ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
+    List<?> l = (List<?>) rc1.getResult();
+    logger.info("Result size : " + l.size());
+    assertThat(l).hasSize(3);
+    for (Object o : l) {
+      assertThat(o).isEqualTo(true);
     }
   }
 
   private static void serverSingleKeyExecutionSocketTimeOut(Boolean isByName) {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final String testKey = "execKey";
     final Set<String> testKeysSet = new HashSet<>();
     testKeysSet.add(testKey);
     DistributedSystem.setThreadsSocketPolicy(false);
 
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
+    Function<Object> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_SOCKET_TIMEOUT);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
 
     region.put(testKey, 1);
-    try {
-      ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
-      assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0));
 
-      ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function, 
isByName);
-      assertEquals(testKey, ((List) rs2.getResult()).get(0));
+    ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
+    assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo(true);
 
-    } catch (Exception ex) {
-      ex.printStackTrace();
-      logger.info("Exception : ", ex);
-      Assert.fail("Test failed after the put operation", ex);
-    }
+    ResultCollector<?, ?> rs2 = execute(dataSet, testKeysSet, testKey, 
function, isByName);
+    assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo(testKey);
   }
 
   private static void serverMultiKeyExecution_Inline() {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 2); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
     Execution dataSet = FunctionService.onRegion(region);
-    try {
-      int j = 0;
-      for (String value : testKeysSet) {
-        Integer val = j++;
-        region.put(value, val);
-      }
-      ResultCollector rc1 =
-          
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new 
FunctionAdapter() {
-            @Override
-            public void execute(FunctionContext context) {
-              @SuppressWarnings("unchecked")
-              final ResultSender<Object> resultSender = 
context.getResultSender();
-              if (context.getArguments() instanceof String) {
-                resultSender.lastResult("Success");
-              } else if (context.getArguments() instanceof Boolean) {
-                resultSender.lastResult(Boolean.TRUE);
-              }
-            }
 
-            @Override
-            public String getId() {
-              return getClass().getName();
+    int j = 0;
+    for (String value : testKeysSet) {
+      Integer val = j++;
+      region.put(value, val);
+    }
+    ResultCollector<?, ?> rc1 =
+        dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new 
FunctionAdapter() {
+          @Override
+          public void execute(FunctionContext context) {
+            @SuppressWarnings("unchecked")
+            final ResultSender<Object> resultSender = 
context.getResultSender();
+            if (context.getArguments() instanceof String) {
+              resultSender.lastResult("Success");
+            } else if (context.getArguments() instanceof Boolean) {
+              resultSender.lastResult(Boolean.TRUE);
             }
+          }
 
-            @Override
-            public boolean hasResult() {
-              return true;
-            }
-          });
-      List l = ((List) rc1.getResult());
-      logger.info("Result size : " + l.size());
-      assertEquals(3, l.size());
-      for (Object o : l) {
-        assertEquals(Boolean.TRUE, o);
-      }
-    } catch (Exception e) {
-      logger.info("Exception : " + e.getMessage());
-      e.printStackTrace();
-      fail("Test failed after the put operation");
+          @Override
+          public String getId() {
+            return getClass().getName();
+          }
 
+          @Override
+          public boolean hasResult() {
+            return true;
+          }
+        });
+    List<?> list = (List<?>) rc1.getResult();
+    logger.info("Result size : " + list.size());
+    assertThat(list).hasSize(3);
+    for (Object item : list) {
+      assertThat(item).isEqualTo(true);
     }
   }
 
   private static void 
serverMultiKeyExecution_FunctionInvocationTargetException() {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 2); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
@@ -853,49 +875,43 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
       Integer val = j++;
       region.put(o, val);
     }
-    try {
-      ResultCollector rc1 =
-          
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new 
FunctionAdapter() {
-            @Override
-            public void execute(FunctionContext context) {
-              if (context.isPossibleDuplicate()) {
-                context.getResultSender().lastResult(retryCount);
-                return;
-              }
-              if (context.getArguments() instanceof Boolean) {
-                throw new FunctionInvocationTargetException("I have been 
thrown from TestFunction");
-              }
+    ResultCollector<?, ?> rc1 =
+        dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new 
FunctionAdapter() {
+          @Override
+          public void execute(FunctionContext context) {
+            if (context.isPossibleDuplicate()) {
+              context.getResultSender().lastResult(retryCount);
+              return;
             }
-
-            @Override
-            public String getId() {
-              return getClass().getName();
+            if (context.getArguments() instanceof Boolean) {
+              throw new FunctionInvocationTargetException("I have been thrown 
from TestFunction");
             }
+          }
 
-            @Override
-            public boolean hasResult() {
-              return true;
-            }
-          });
+          @Override
+          public String getId() {
+            return getClass().getName();
+          }
 
-      List list = (ArrayList) rc1.getResult();
-      assertEquals(list.get(0), 0);
-    } catch (Throwable e) {
-      e.printStackTrace();
-      Assert.fail("This is not expected Exception", e);
-    }
+          @Override
+          public boolean hasResult() {
+            return true;
+          }
+        });
 
+    List<?> list = (List<?>) rc1.getResult();
+    assertThat(list.get(0)).isEqualTo(0);
   }
 
   private static void serverMultiKeyExecutionNoResult(Boolean isByName) {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<String> testKeysSet = new HashSet<>();
     for (int i = (totalNumBuckets * 2); i > 0; i--) {
       testKeysSet.add("execKey-" + i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(false, TEST_FUNCTION7);
+    Function<Object> function = new TestFunction<>(false, TEST_FUNCTION7);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
     try {
@@ -906,18 +922,11 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
         Integer val = j++;
         region.put(o, val);
       }
-      ResultCollector rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
-      rc1.getResult();
-      Thread.sleep(20000);
-      fail("Test failed after the put operation");
-    } catch (FunctionException expected) {
-      expected.printStackTrace();
-      logger.info("Exception : " + expected.getMessage());
-      assertTrue(expected.getMessage()
-          .startsWith((String.format("Cannot %s result as the 
Function#hasResult() is false",
-              "return any"))));
-    } catch (Exception notexpected) {
-      Assert.fail("Test failed during execute or sleeping", notexpected);
+      ResultCollector<?, ?> rc1 = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
+      
assertThatThrownBy(rc1::getResult).isExactlyInstanceOf(FunctionException.class)
+          .hasMessageStartingWith(
+              String.format("Cannot %s result as the Function#hasResult() is 
false",
+                  "return any"));
     } finally {
       cache.getLogger()
           .info("<ExpectedException action=remove>" + "FunctionException" + 
"</ExpectedException>");
@@ -926,179 +935,149 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
 
   private static void serverSingleKeyExecution(Boolean isByName) {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final String testKey = "execKey";
     final Set<String> testKeysSet = new HashSet<>();
     testKeysSet.add(testKey);
     DistributedSystem.setThreadsSocketPolicy(false);
 
-    Function function = new TestFunction(true, TEST_FUNCTION2);
+    Function<Object> function = new TestFunction<>(true, TEST_FUNCTION2);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
-    try {
-      execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName);
-    } catch (Exception expected) {
-      assertTrue(expected.getMessage().contains("No target node found for KEY 
= " + testKey)
-          || expected.getMessage().startsWith("Server could not send the 
reply")
-          || expected.getMessage().startsWith("Unexpected exception during"));
-    }
+
+    execute(dataSet, testKeysSet, Boolean.TRUE, function, isByName);
 
     region.put(testKey, 1);
-    try {
-      ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
-      assertEquals(Boolean.TRUE, ((List) rs.getResult()).get(0));
 
-      ResultCollector rs2 = execute(dataSet, testKeysSet, testKey, function, 
isByName);
-      assertEquals(1, ((List) rs2.getResult()).get(0));
+    ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, isByName);
+    assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo(true);
 
-      HashMap<String, Integer> putData = new HashMap<>();
-      putData.put(testKey + "1", 2);
-      putData.put(testKey + "2", 3);
+    ResultCollector<?, ?> rs2 = execute(dataSet, testKeysSet, testKey, 
function, isByName);
+    assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo(1);
 
-      ResultCollector rs1 = execute(dataSet, testKeysSet, putData, function, 
isByName);
-      assertEquals(Boolean.TRUE, ((List) rs1.getResult()).get(0));
+    HashMap<String, Integer> putData = new HashMap<>();
+    putData.put(testKey + "1", 2);
+    putData.put(testKey + "2", 3);
 
-      assertEquals((Integer) 2, region.get(testKey + "1"));
-      assertEquals((Integer) 3, region.get(testKey + "2"));
+    ResultCollector<?, ?> rs1 = execute(dataSet, testKeysSet, putData, 
function, isByName);
+    assertThat(((List<?>) rs1.getResult()).get(0)).isEqualTo(true);
 
-    } catch (Exception ex) {
-      ex.printStackTrace();
-      logger.info("Exception : ", ex);
-      Assert.fail("Test failed after the put operation", ex);
-    }
+    assertThat(region.get(testKey + "1")).isEqualTo(2);
+    assertThat(region.get(testKey + "2")).isEqualTo(3);
   }
 
   private static void 
serverSingleKeyExecution_FunctionInvocationTargetException() {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final String testKey = "execKey";
     final Set<String> testKeysSet = new HashSet<>();
     testKeysSet.add(testKey);
     DistributedSystem.setThreadsSocketPolicy(false);
 
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION);
+    Function<Object> function =
+        new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_REEXECUTE_EXCEPTION);
     FunctionService.registerFunction(function);
     Execution dataSet = FunctionService.onRegion(region);
 
     region.put(testKey, 1);
-    try {
-      ResultCollector rs = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, false);
-      ArrayList list = (ArrayList) rs.getResult();
-      assertTrue(((Integer) list.get(0)) >= 5);
-    } catch (Exception ex) {
-      ex.printStackTrace();
-      Assert.fail("This is not expected Exception", ex);
-    }
+
+    ResultCollector<?, ?> rs = execute(dataSet, testKeysSet, Boolean.TRUE, 
function, false);
+    ArrayList<?> list = (ArrayList<?>) rs.getResult();
+    assertThat(((Integer) list.get(0))).isGreaterThanOrEqualTo(5);
   }
 
   private static void serverSingleKeyExecution_Inline() {
     Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final String testKey = "execKey";
     final Set<String> testKeysSet = new HashSet<>();
     testKeysSet.add(testKey);
     DistributedSystem.setThreadsSocketPolicy(false);
 
     Execution dataSet = FunctionService.onRegion(region);
-    try {
-      cache.getLogger()
-          .info("<ExpectedException action=add>" + "No target node found for 
KEY = "
-              + "|Server could not send the reply" + "|Unexpected exception 
during"
-              + "</ExpectedException>");
-      dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new 
FunctionAdapter() {
-        @Override
-        public void execute(FunctionContext context) {
-          @SuppressWarnings("unchecked")
-          final ResultSender<Object> resultSender = context.getResultSender();
-          if (context.getArguments() instanceof String) {
-            resultSender.lastResult("Success");
-          }
-          resultSender.lastResult("Failure");
-        }
 
-        @Override
-        public String getId() {
-          return getClass().getName();
+    FunctionAdapter functionAdapter = new FunctionAdapter() {
+      @Override
+      public void execute(FunctionContext context) {
+        @SuppressWarnings("unchecked")
+        final ResultSender<Object> resultSender = context.getResultSender();
+        if (context.getArguments() instanceof String) {
+          resultSender.lastResult("Success");
         }
+        resultSender.lastResult("Failure");
+      }
 
-        @Override
-        public boolean hasResult() {
-          return true;
-        }
-      });
-    } catch (Exception expected) {
-      logger.debug("Exception occurred : " + expected.getMessage());
-      assertTrue(expected.getMessage().contains("No target node found for KEY 
= " + testKey)
-          || expected.getMessage().startsWith("Server could not send the 
reply")
-          || expected.getMessage().startsWith("Unexpected exception during"));
-    } finally {
-      cache.getLogger()
-          .info("<ExpectedException action=remove>" + "No target node found 
for KEY = "
-              + "|Server could not send the reply" + "|Unexpected exception 
during"
-              + "</ExpectedException>");
-    }
+      @Override
+      public String getId() {
+        return getClass().getName();
+      }
+
+      @Override
+      public boolean hasResult() {
+        return true;
+      }
+    };
+
+    
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(functionAdapter);
 
     region.put(testKey, 1);
-    try {
-      ResultCollector rs =
-          
dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new 
FunctionAdapter() {
-            @Override
-            public void execute(FunctionContext context) {
-              @SuppressWarnings("unchecked")
-              final ResultSender<Object> resultSender = 
context.getResultSender();
-              if (context.getArguments() instanceof String) {
-                resultSender.lastResult("Success");
-              } else {
-                resultSender.lastResult("Failure");
-              }
-            }
 
-            @Override
-            public String getId() {
-              return getClass().getName();
+    ResultCollector<?, ?> rs =
+        dataSet.withFilter(testKeysSet).setArguments(Boolean.TRUE).execute(new 
FunctionAdapter() {
+          @Override
+          public void execute(FunctionContext context) {
+            @SuppressWarnings("unchecked")
+            final ResultSender<Object> resultSender = 
context.getResultSender();
+            if (context.getArguments() instanceof String) {
+              resultSender.lastResult("Success");
+            } else {
+              resultSender.lastResult("Failure");
             }
+          }
 
-            @Override
-            public boolean hasResult() {
-              return true;
-            }
-          });
-      assertEquals("Failure", ((List) rs.getResult()).get(0));
-
-      ResultCollector rs2 =
-          dataSet.withFilter(testKeysSet).setArguments(testKey).execute(new 
FunctionAdapter() {
-            @Override
-            public void execute(FunctionContext context) {
-              @SuppressWarnings("unchecked")
-              final ResultSender<Object> resultSender = 
context.getResultSender();
-              if (context.getArguments() instanceof String) {
-                resultSender.lastResult("Success");
-              } else {
-                resultSender.lastResult("Failure");
-              }
-            }
+          @Override
+          public String getId() {
+            return getClass().getName();
+          }
 
-            @Override
-            public String getId() {
-              return getClass().getName();
+          @Override
+          public boolean hasResult() {
+            return true;
+          }
+        });
+    assertThat(((List<?>) rs.getResult()).get(0)).isEqualTo("Failure");
+
+    ResultCollector<?, ?> rs2 =
+        dataSet.withFilter(testKeysSet).setArguments(testKey).execute(new 
FunctionAdapter() {
+          @Override
+          public void execute(FunctionContext context) {
+            @SuppressWarnings("unchecked")
+            final ResultSender<Object> resultSender = 
context.getResultSender();
+            if (context.getArguments() instanceof String) {
+              resultSender.lastResult("Success");
+            } else {
+              resultSender.lastResult("Failure");
             }
+          }
 
-            @Override
-            public boolean hasResult() {
-              return true;
-            }
-          });
-      assertEquals("Success", ((List) rs2.getResult()).get(0));
+          @Override
+          public String getId() {
+            return getClass().getName();
+          }
+
+          @Override
+          public boolean hasResult() {
+            return true;
+          }
+        });
+
+    assertThat(((List<?>) rs2.getResult()).get(0)).isEqualTo("Success");
 
-    } catch (Exception ex) {
-      ex.printStackTrace();
-      logger.info("Exception : ", ex);
-      Assert.fail("Test failed after the put operation", ex);
-    }
   }
 
-  private static ResultCollector execute(Execution dataSet, Set testKeysSet, 
Serializable args,
-      Function function, Boolean isByName) {
+  private static ResultCollector<?, ?> execute(Execution dataSet, Set<?> 
testKeysSet,
+      Serializable args,
+      Function<?> function, Boolean isByName) {
     if (isByName) {// by name
       return 
dataSet.withFilter(testKeysSet).setArguments(args).execute(function.getId());
     } else { // By Instance
@@ -1106,8 +1085,8 @@ public class 
PRClientServerRegionFunctionExecutionNoSingleHopDUnitTest
     }
   }
 
-  private static ResultCollector executeOnAll(Execution dataSet, Serializable 
args,
-      Function function, Boolean isByName) {
+  private static ResultCollector<?, ?> executeOnAll(Execution dataSet, 
Serializable args,
+      Function<?> function, Boolean isByName) {
     if (isByName) {// by name
       return dataSet.setArguments(args).execute(function.getId());
     } else { // By Instance
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
index 1a0c92d1aa..107b50a26e 100755
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerTestBase.java
@@ -17,11 +17,9 @@ package org.apache.geode.internal.cache.execute;
 import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,6 +44,7 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.PoolManager;
 import org.apache.geode.cache.client.internal.PoolImpl;
 import org.apache.geode.cache.execute.Function;
@@ -57,8 +56,6 @@ import org.apache.geode.distributed.DistributedSystem;
 import org.apache.geode.internal.cache.functions.TestFunction;
 import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.Assert;
-import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.SerializableCallableIF;
 import org.apache.geode.test.dunit.SerializableRunnable;
@@ -79,6 +76,8 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
 
   protected static Cache cache = null;
 
+  protected String hostName;
+
   static String PartitionedRegionName = "TestPartitionedRegion"; // default 
name
 
   protected static String regionName = "TestRegion"; // default name
@@ -93,15 +92,15 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
 
   @Override
   public final void postSetUp() throws Exception {
-    Host host = Host.getHost(0);
-    server1 = host.getVM(0);
-    server2 = host.getVM(1);
-    server3 = host.getVM(2);
-    client = host.getVM(3);
+    server1 = VM.getVM(0);
+    server2 = VM.getVM(1);
+    server3 = VM.getVM(2);
+    client = VM.getVM(3);
+    hostName = NetworkUtils.getServerHostName();
     postSetUpPRClientServerTestBase();
   }
 
-  protected void postSetUpPRClientServerTestBase() throws Exception {}
+  protected void postSetUpPRClientServerTestBase() {}
 
   private enum ExecuteFunctionMethod {
     ExecuteFunctionByObject, ExecuteFunctionById
@@ -119,7 +118,8 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     return ExecuteFunctionMethod.ExecuteFunctionByObject == 
functionExecutionType;
   }
 
-  ArrayList createCommonServerAttributes(String regionName, PartitionResolver 
pr, int red,
+  ArrayList<Object> createCommonServerAttributes(String regionName, 
PartitionResolver<?, ?> pr,
+      int red,
       String colocatedWithRegion) {
     ArrayList<Object> commonAttributes = new ArrayList<>();
     commonAttributes.add(regionName); // 0
@@ -130,123 +130,111 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     return commonAttributes;
   }
 
-  public static Integer createCacheServer(ArrayList commonAttributes, Integer 
localMaxMemory) {
-    AttributesFactory factory = new AttributesFactory();
-    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+  public static Integer createCacheServer(ArrayList<Object> commonAttributes,
+      Integer localMaxMemory) {
+    return createCacheServer(commonAttributes, localMaxMemory, -1);
+  }
+
+  public static Integer createCacheServer(ArrayList<Object> commonAttributes,
+      Integer localMaxMemory,
+      int maxThreads) {
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+    PartitionAttributesFactory<Object, Object> paf = new 
PartitionAttributesFactory<>();
 
-    paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1));
+    paf.setPartitionResolver((PartitionResolver<Object, Object>) 
commonAttributes.get(1));
     paf.setRedundantCopies((Integer) commonAttributes.get(2));
     paf.setTotalNumBuckets((Integer) commonAttributes.get(3));
     paf.setColocatedWith((String) commonAttributes.get(4));
     paf.setLocalMaxMemory(localMaxMemory);
-    PartitionAttributes partitionAttributes = paf.create();
+    PartitionAttributes<?, ?> partitionAttributes = paf.create();
     factory.setDataPolicy(DataPolicy.PARTITION);
     factory.setPartitionAttributes(partitionAttributes);
-    RegionAttributes attrs = factory.create();
+    RegionAttributes<Object, Object> attrs = factory.create();
 
-    Region region = cache.createRegion((String) commonAttributes.get(0), 
attrs);
-    assertNotNull(region);
+    Region<Object, Object> region = cache.createRegion((String) 
commonAttributes.get(0), attrs);
+    assertThat(region).isNotNull();
     CacheServer server1 = cache.addCacheServer();
-    assertNotNull(server1);
+    assertThat(server1).isNotNull();
     int port = getRandomAvailableTCPPort();
     server1.setPort(port);
-    try {
-      server1.start();
-    } catch (IOException e) {
-      Assert.fail("Failed to start the Server", e);
+    if (maxThreads > 0) {
+      server1.setMaxThreads(maxThreads);
     }
-    assertTrue(server1.isRunning());
+    assertThatNoException().isThrownBy(server1::start);
+    assertThat(server1.isRunning()).isTrue();
 
     return server1.getPort();
   }
 
-  private static Integer createSelectorCacheServer(ArrayList commonAttributes,
+  private static Integer createSelectorCacheServer(ArrayList<Object> 
commonAttributes,
       Integer localMaxMemory) throws Exception {
-    AttributesFactory factory = new AttributesFactory();
-    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+    PartitionAttributesFactory<Object, Object> paf = new 
PartitionAttributesFactory<>();
 
-    paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1));
+    paf.setPartitionResolver((PartitionResolver<Object, Object>) 
commonAttributes.get(1));
     paf.setRedundantCopies((Integer) commonAttributes.get(2));
     paf.setTotalNumBuckets((Integer) commonAttributes.get(3));
     paf.setColocatedWith((String) commonAttributes.get(4));
     paf.setLocalMaxMemory(localMaxMemory);
-    PartitionAttributes partitionAttributes = paf.create();
+    PartitionAttributes<?, ?> partitionAttributes = paf.create();
     factory.setDataPolicy(DataPolicy.PARTITION);
     factory.setPartitionAttributes(partitionAttributes);
-    RegionAttributes attrs = factory.create();
+    RegionAttributes<Object, Object> attrs = factory.create();
 
-    Region region = cache.createRegion((String) commonAttributes.get(0), 
attrs);
-    assertNotNull(region);
+    Region<Object, Object> region = cache.createRegion((String) 
commonAttributes.get(0), attrs);
+    assertThat(region).isNotNull();
     CacheServer server1 = cache.addCacheServer();
-    assertNotNull(server1);
+    assertThat(server1).isNotNull();
     int port = getRandomAvailableTCPPort();
     server1.setPort(port);
     server1.setMaxThreads(16);
     server1.start();
-    assertTrue(server1.isRunning());
+    assertThat(server1.isRunning()).isTrue();
 
     return server1.getPort();
   }
 
-  private static Integer createCacheServerWith2Regions(ArrayList 
commonAttributes,
+  private static Integer createCacheServerWith2Regions(ArrayList<Object> 
commonAttributes,
       Integer localMaxMemory) throws Exception {
-    AttributesFactory factory = new AttributesFactory();
-    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+    PartitionAttributesFactory<Object, Object> paf = new 
PartitionAttributesFactory<>();
 
-    paf.setPartitionResolver((PartitionResolver) commonAttributes.get(1));
+    paf.setPartitionResolver((PartitionResolver<Object, Object>) 
commonAttributes.get(1));
     paf.setRedundantCopies((Integer) commonAttributes.get(2));
     paf.setTotalNumBuckets((Integer) commonAttributes.get(3));
     paf.setColocatedWith((String) commonAttributes.get(4));
     paf.setLocalMaxMemory(localMaxMemory);
-    PartitionAttributes partitionAttributes = paf.create();
+    PartitionAttributes<?, ?> partitionAttributes = paf.create();
     factory.setDataPolicy(DataPolicy.PARTITION);
     factory.setPartitionAttributes(partitionAttributes);
-    RegionAttributes attrs = factory.create();
+    RegionAttributes<Object, Object> attrs = factory.create();
 
-    Region region1 = cache.createRegion(PartitionedRegionName + "1", attrs);
-    assertNotNull(region1);
-    Region region2 = cache.createRegion(PartitionedRegionName + "2", attrs);
-    assertNotNull(region2);
+    Region<Object, Object> region1 = cache.createRegion(PartitionedRegionName 
+ "1", attrs);
+    assertThat(region1).isNotNull();
+    Region<Object, Object> region2 = cache.createRegion(PartitionedRegionName 
+ "2", attrs);
+    assertThat(region2).isNotNull();
     CacheServer server1 = cache.addCacheServer();
-    assertNotNull(server1);
+    assertThat(server1).isNotNull();
     int port = getRandomAvailableTCPPort();
     server1.setPort(port);
     server1.start();
-    assertTrue(server1.isRunning());
+    assertThat(server1.isRunning()).isTrue();
 
     return server1.getPort();
   }
 
   public static Integer createCacheServer() throws Exception {
     CacheServer server1 = cache.addCacheServer();
-    assertNotNull(server1);
+    assertThat(server1).isNotNull();
     int port = getRandomAvailableTCPPort();
     server1.setPort(port);
     server1.start();
-    assertTrue(server1.isRunning());
+    assertThat(server1.isRunning()).isTrue();
 
     return server1.getPort();
   }
 
-  private static Integer createCacheServerWithDR() throws Exception {
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    factory.setDataPolicy(DataPolicy.REPLICATE);
-    assertNotNull(cache);
-    Region region = cache.createRegion(regionName, factory.create());
-    assertNotNull(region);
-
-    CacheServer server1 = cache.addCacheServer();
-    assertNotNull(server1);
-    int port = getRandomAvailableTCPPort();
-    server1.setPort(port);
-    server1.start();
-    assertTrue(server1.isRunning());
-
-    return server1.getPort();
-  }
-
-  public static void createCacheClient(String host, Integer port1, Integer 
port2, Integer port3) {
+  public static void createCacheClient(String host, int port1, int port2, int 
port3) {
     CacheServerTestUtil.disableShufflingOfEndpoints();
     Pool p;
 
@@ -260,16 +248,16 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
     pool = (PoolImpl) p;
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.LOCAL);
     factory.setDataPolicy(DataPolicy.EMPTY);
     factory.setPoolName(p.getName());
-    RegionAttributes attrs = factory.create();
-    Region region = cache.createRegion(PartitionedRegionName, attrs);
-    assertNotNull(region);
+    RegionAttributes<Object, Object> attrs = factory.create();
+    Region<Object, Object> region = cache.createRegion(PartitionedRegionName, 
attrs);
+    assertThat(region).isNotNull();
   }
 
-  private static void createCacheClient_SingleConnection(String host, Integer 
port1) {
+  private static void createCacheClient_SingleConnection(String host, int 
port1) {
     CacheServerTestUtil.disableShufflingOfEndpoints();
     Pool p;
 
@@ -282,17 +270,17 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
     pool = (PoolImpl) p;
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.LOCAL);
     factory.setDataPolicy(DataPolicy.EMPTY);
     factory.setPoolName(p.getName());
-    RegionAttributes attrs = factory.create();
-    Region region = cache.createRegion(PartitionedRegionName, attrs);
-    assertNotNull(region);
+    RegionAttributes<Object, Object> attrs = factory.create();
+    Region<Object, Object> region = cache.createRegion(PartitionedRegionName, 
attrs);
+    assertThat(region).isNotNull();
   }
 
-  private static void createCacheClientWith2Regions(String host, Integer 
port1, Integer port2,
-      Integer port3) {
+  private static void createCacheClientWith2Regions(String host, int port1, 
int port2,
+      int port3) {
     CacheServerTestUtil.disableShufflingOfEndpoints();
     Pool p;
 
@@ -306,22 +294,22 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
     pool = (PoolImpl) p;
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setDataPolicy(DataPolicy.EMPTY);
     factory.setPoolName(p.getName());
-    RegionAttributes attrs = factory.create();
-    Region region1 = cache.createRegion(PartitionedRegionName + "1", attrs);
-    assertNotNull(region1);
+    RegionAttributes<Object, Object> attrs = factory.create();
+    Region<Object, Object> region1 = cache.createRegion(PartitionedRegionName 
+ "1", attrs);
+    assertThat(region1).isNotNull();
 
-    factory = new AttributesFactory();
+    factory = new AttributesFactory<>();
     factory.setDataPolicy(DataPolicy.EMPTY);
     attrs = factory.create();
-    Region region2 = cache.createRegion(PartitionedRegionName + "2", attrs);
-    assertNotNull(region2);
+    Region<Object, Object> region2 = cache.createRegion(PartitionedRegionName 
+ "2", attrs);
+    assertThat(region2).isNotNull();
   }
 
-  private static void createSingleHopCacheClient(String host, Integer port1, 
Integer port2,
-      Integer port3) {
+  private static void createSingleHopCacheClient(String host, int port1, int 
port2,
+      int port3) {
     CacheServerTestUtil.disableShufflingOfEndpoints();
 
     Pool p;
@@ -335,17 +323,17 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
     pool = (PoolImpl) p;
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.LOCAL);
     factory.setDataPolicy(DataPolicy.EMPTY);
     factory.setPoolName(p.getName());
-    RegionAttributes attrs = factory.create();
-    Region region = cache.createRegion(PartitionedRegionName, attrs);
-    assertNotNull(region);
+    RegionAttributes<Object, Object> attrs = factory.create();
+    Region<Object, Object> region = cache.createRegion(PartitionedRegionName, 
attrs);
+    assertThat(region).isNotNull();
   }
 
-  private static void createNoSingleHopCacheClient(String host, Integer port1, 
Integer port2,
-      Integer port3) {
+  private static void createNoSingleHopCacheClient(String host, int port1, int 
port2,
+      int port3) {
     CacheServerTestUtil.disableShufflingOfEndpoints();
 
     Pool p;
@@ -359,36 +347,44 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
     pool = (PoolImpl) p;
-    AttributesFactory factory = new AttributesFactory();
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
     factory.setScope(Scope.LOCAL);
     factory.setDataPolicy(DataPolicy.EMPTY);
     factory.setPoolName(p.getName());
-    RegionAttributes attrs = factory.create();
-    Region region = cache.createRegion(PartitionedRegionName, attrs);
-    assertNotNull(region);
+    RegionAttributes<Object, Object> attrs = factory.create();
+    Region<Object, Object> region = cache.createRegion(PartitionedRegionName, 
attrs);
+    assertThat(region).isNotNull();
   }
 
-  private static void createCacheClientWithoutRegion(String host, Integer 
port1, Integer port2,
-      Integer port3) {
+  private static void createNoSingleHopCacheClient(String host,
+      int port1, int port2, int port3, int connectTimeout) {
     CacheServerTestUtil.disableShufflingOfEndpoints();
-    logger
-        .info("PRClientServerTestBase#createCacheClientWithoutRegion : 
creating pool");
 
     Pool p;
     try {
-      p = PoolManager.createFactory().addServer(host, port1)
-          .addServer(host, port2).addServer(host, port3).setPingInterval(250)
-          
.setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000)
-          
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(1)
-          .create("PRClientServerTestBaseWithoutRegion");
+      PoolFactory factory = PoolManager.createFactory().addServer(host, port1)
+          .addServer(host, port2).addServer(host, port3).setPingInterval(2000)
+          .setSubscriptionEnabled(true).setReadTimeout(2000)
+          .setSocketBufferSize(1000).setRetryAttempts(0)
+          .setSocketConnectTimeout(connectTimeout)
+          .setPRSingleHopEnabled(false);
+
+      p = factory.create("PRClientServerTestBase");
     } finally {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
     pool = (PoolImpl) p;
+    AttributesFactory<Object, Object> factory = new AttributesFactory<>();
+    factory.setScope(Scope.LOCAL);
+    factory.setDataPolicy(DataPolicy.EMPTY);
+    factory.setPoolName(p.getName());
+    RegionAttributes<Object, Object> attrs = factory.create();
+    Region<Object, Object> region = cache.createRegion(PartitionedRegionName, 
attrs);
+    assertThat(region).isNotNull();
   }
 
-  private static void createCacheClientWithDistributedRegion(String host, 
Integer port1,
-      Integer port2, Integer port3) throws Exception {
+  private static void createCacheClientWithoutRegion(String host, int port1, 
int port2,
+      int port3) {
     CacheServerTestUtil.disableShufflingOfEndpoints();
     logger
         .info("PRClientServerTestBase#createCacheClientWithoutRegion : 
creating pool");
@@ -398,70 +394,64 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
       p = PoolManager.createFactory().addServer(host, port1)
           .addServer(host, port2).addServer(host, port3).setPingInterval(250)
           
.setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(2000)
-          
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(0)
+          
.setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10).setRetryAttempts(1)
           .create("PRClientServerTestBaseWithoutRegion");
     } finally {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
     pool = (PoolImpl) p;
-    AttributesFactory factory = new AttributesFactory();
-    factory.setScope(Scope.DISTRIBUTED_ACK);
-    factory.setDataPolicy(DataPolicy.REPLICATE);
-    assertNotNull(cache);
-    Region region = cache.createRegion(regionName, factory.create());
-    assertNotNull(region);
   }
 
-  void createClientServerScenarion(ArrayList commonAttributes, int 
localMaxMemoryServer1,
+  void createClientServerScenarion(ArrayList<Object> commonAttributes, int 
localMaxMemoryServer1,
       int localMaxMemoryServer2, int localMaxMemoryServer3) {
     createCacheInClientServer();
-    Integer port1 = server1.invoke(() -> PRClientServerTestBase
+    int port1 = server1.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer1));
-    Integer port2 = server2.invoke(() -> PRClientServerTestBase
+    int port2 = server2.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer2));
-    Integer port3 = server3.invoke(() -> PRClientServerTestBase
+    int port3 = server3.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer3));
     client.invoke(() -> PRClientServerTestBase
-        .createCacheClient(NetworkUtils.getServerHostName(server1.getHost()), 
port1, port2, port3));
+        .createCacheClient(hostName, port1, port2, port3));
   }
 
-  void createClientServerScenarion_SingleConnection(ArrayList commonAttributes,
+  void createClientServerScenarion_SingleConnection(ArrayList<Object> 
commonAttributes,
       int localMaxMemoryServer1,
       int localMaxMemoryServer2) {
     createCacheInClientServer();
-    Integer port1 = server1.invoke(() -> PRClientServerTestBase
+    int port1 = server1.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer1));
     server2.invoke(() -> 
PRClientServerTestBase.createCacheServer(commonAttributes,
         localMaxMemoryServer2));
     client.invoke(() -> 
PRClientServerTestBase.createCacheClient_SingleConnection(
-        NetworkUtils.getServerHostName(server1.getHost()), port1));
+        hostName, port1));
   }
 
 
 
-  void createClientServerScenarionWith2Regions(ArrayList commonAttributes,
+  void createClientServerScenarionWith2Regions(ArrayList<Object> 
commonAttributes,
       int localMaxMemoryServer1, int localMaxMemoryServer2,
       int localMaxMemoryServer3) {
     createCacheInClientServer();
-    Integer port1 = server1.invoke(() -> PRClientServerTestBase
+    int port1 = server1.invoke(() -> PRClientServerTestBase
         .createCacheServerWith2Regions(commonAttributes, 
localMaxMemoryServer1));
-    Integer port2 = server2.invoke(() -> PRClientServerTestBase
+    int port2 = server2.invoke(() -> PRClientServerTestBase
         .createCacheServerWith2Regions(commonAttributes, 
localMaxMemoryServer2));
-    Integer port3 = server3.invoke(() -> PRClientServerTestBase
+    int port3 = server3.invoke(() -> PRClientServerTestBase
         .createCacheServerWith2Regions(commonAttributes, 
localMaxMemoryServer3));
     client.invoke(() -> PRClientServerTestBase.createCacheClientWith2Regions(
-        NetworkUtils.getServerHostName(server1.getHost()), port1, port2, 
port3));
+        hostName, port1, port2, port3));
   }
 
-  void createClientServerScenarioSingleHop(ArrayList commonAttributes,
+  void createClientServerScenarioSingleHop(ArrayList<Object> commonAttributes,
       int localMaxMemoryServer1, int localMaxMemoryServer2,
       int localMaxMemoryServer3) {
     createCacheInClientServer();
-    Integer port1 = server1.invoke(() -> PRClientServerTestBase
+    int port1 = server1.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer1));
-    Integer port2 = server2.invoke(() -> PRClientServerTestBase
+    int port2 = server2.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer2));
-    Integer port3 = server3.invoke(() -> PRClientServerTestBase
+    int port3 = server3.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer3));
     // Workaround for the issue that hostnames returned by the client metadata 
may
     // not match those configured by the pool, leading to multiple copies
@@ -475,33 +465,49 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     return cache.getDistributedSystem().getDistributedMember().getHost();
   }
 
-  void createClientServerScenarioNoSingleHop(ArrayList commonAttributes,
+  void createClientServerScenarioNoSingleHop(ArrayList<Object> 
commonAttributes,
       int localMaxMemoryServer1, int localMaxMemoryServer2,
       int localMaxMemoryServer3) {
     createCacheInClientServer();
-    Integer port1 = server1.invoke(() -> PRClientServerTestBase
+    int port1 = server1.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer1));
-    Integer port2 = server2.invoke(() -> PRClientServerTestBase
+    int port2 = server2.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer2));
-    Integer port3 = server3.invoke(() -> PRClientServerTestBase
+    int port3 = server3.invoke(() -> PRClientServerTestBase
         .createCacheServer(commonAttributes, localMaxMemoryServer3));
     client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient(
-        NetworkUtils.getServerHostName(server1.getHost()), port1, port2, 
port3));
+        hostName, port1, port2, port3));
+  }
+
+  void createClientServerScenarioNoSingleHop(ArrayList<Object> 
commonAttributes,
+      int localMaxMemoryServer1, int localMaxMemoryServer2,
+      int localMaxMemoryServer3,
+      int maxThreads,
+      int connectTimeout) {
+    createCacheInClientServer();
+    int port1 = server1.invoke(() -> PRClientServerTestBase
+        .createCacheServer(commonAttributes, localMaxMemoryServer1, 
maxThreads));
+    int port2 = server2.invoke(() -> PRClientServerTestBase
+        .createCacheServer(commonAttributes, localMaxMemoryServer2, 
maxThreads));
+    int port3 = server3.invoke(() -> PRClientServerTestBase
+        .createCacheServer(commonAttributes, localMaxMemoryServer3, 
maxThreads));
+    client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient(
+        hostName, port1, port2, port3, connectTimeout));
   }
 
-  void createClientServerScenarioSelectorNoSingleHop(ArrayList 
commonAttributes,
+  void createClientServerScenarioSelectorNoSingleHop(ArrayList<Object> 
commonAttributes,
       int localMaxMemoryServer1,
       int localMaxMemoryServer2,
       int localMaxMemoryServer3) {
     createCacheInClientServer();
-    Integer port1 = server1.invoke(() -> PRClientServerTestBase
+    int port1 = server1.invoke(() -> PRClientServerTestBase
         .createSelectorCacheServer(commonAttributes, localMaxMemoryServer1));
-    Integer port2 = server2.invoke(() -> PRClientServerTestBase
+    int port2 = server2.invoke(() -> PRClientServerTestBase
         .createSelectorCacheServer(commonAttributes, localMaxMemoryServer2));
-    Integer port3 = server3.invoke(() -> PRClientServerTestBase
+    int port3 = server3.invoke(() -> PRClientServerTestBase
         .createSelectorCacheServer(commonAttributes, localMaxMemoryServer3));
     client.invoke(() -> PRClientServerTestBase.createNoSingleHopCacheClient(
-        NetworkUtils.getServerHostName(server1.getHost()), port1, port2, 
port3));
+        hostName, port1, port2, port3));
   }
 
 
@@ -509,15 +515,15 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     logger.info(
         "PRClientServerTestBase#createClientServerScenarionWithoutRegion : 
creating client server");
     createCacheInClientServer();
-    Integer port1 = server1.invoke(
+    int port1 = server1.invoke(
         (SerializableCallableIF<Integer>) 
PRClientServerTestBase::createCacheServer);
-    Integer port2 = server2.invoke(
+    int port2 = server2.invoke(
         (SerializableCallableIF<Integer>) 
PRClientServerTestBase::createCacheServer);
-    Integer port3 = server3.invoke(
+    int port3 = server3.invoke(
         (SerializableCallableIF<Integer>) 
PRClientServerTestBase::createCacheServer);
 
     client.invoke(() -> PRClientServerTestBase.createCacheClientWithoutRegion(
-        NetworkUtils.getServerHostName(server1.getHost()), port1, port2, 
port3));
+        hostName, port1, port2, port3));
   }
 
   void runOnAllServers(SerializableRunnable runnable) {
@@ -526,7 +532,7 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     server3.invoke(runnable);
   }
 
-  void registerFunctionAtServer(Function function) {
+  void registerFunctionAtServer(Function<?> function) {
     server1.invoke(PRClientServerTestBase.class, "registerFunction", new 
Object[] {function});
 
     server2.invoke(PRClientServerTestBase.class, "registerFunction", new 
Object[] {function});
@@ -534,7 +540,7 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     server3.invoke(PRClientServerTestBase.class, "registerFunction", new 
Object[] {function});
   }
 
-  public static void registerFunction(Function function) {
+  public static void registerFunction(Function<?> function) {
     FunctionService.registerFunction(function);
   }
 
@@ -559,24 +565,20 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
   }
 
   private void createCache(Properties props) {
-    try {
-      DistributedSystem ds = getSystem(props);
-      assertNotNull(ds);
-      ds.disconnect();
-      ds = getSystem(props);
-      cache = CacheFactory.create(ds);
-      assertNotNull(cache);
-    } catch (Exception e) {
-      Assert.fail("Failed while creating the cache", e);
-    }
+    DistributedSystem ds = getSystem(props);
+    assertThat(ds).isNotNull();
+    ds.disconnect();
+    ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+    assertThat(cache).isNotNull();
   }
 
   static void startServerHA() throws Exception {
     Wait.pause(2000);
-    Collection bridgeServers = cache.getCacheServers();
+    Collection<?> bridgeServers = cache.getCacheServers();
     logger
         .info("Start Server cache servers list : " + bridgeServers.size());
-    Iterator bridgeIterator = bridgeServers.iterator();
+    Iterator<?> bridgeIterator = bridgeServers.iterator();
     CacheServer bridgeServer = (CacheServer) bridgeIterator.next();
     logger.info("start Server cache server" + bridgeServer);
     bridgeServer.start();
@@ -584,7 +586,7 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
 
   static void stopServerHA() {
     Wait.pause(1000);
-    Iterator iter = cache.getCacheServers().iterator();
+    Iterator<?> iter = cache.getCacheServers().iterator();
     if (iter.hasNext()) {
       CacheServer server = (CacheServer) iter.next();
       server.stop();
@@ -616,13 +618,13 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
 
   void serverBucketFilterExecution(Set<Integer> bucketFilterSet) {
     Region<Integer, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<Integer> testKeysSet = new HashSet<>();
     for (int i = 150; i > 0; i--) {
       testKeysSet.add(i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+    Function<?> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
     if (shouldRegisterFunctionsOnClient()) {
       FunctionService.registerFunction(function);
     }
@@ -636,24 +638,24 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     ResultCollector<Integer, List<Integer>> rc =
         dataSet.withBucketFilter(bucketFilterSet).execute(function.getId());
     List<Integer> results = rc.getResult();
-    assertEquals(bucketFilterSet.size(), results.size());
+    assertThat(results.size()).isEqualTo(bucketFilterSet.size());
     for (Integer bucket : results) {
       bucketFilterSet.remove(bucket);
     }
-    assertTrue(bucketFilterSet.isEmpty());
+    assertThat(bucketFilterSet).isEmpty();
   }
 
   void serverBucketFilterOverrideExecution(Set<Integer> bucketFilterSet,
       Set<Integer> ketFilterSet) {
 
     Region<Integer, Integer> region = cache.getRegion(PartitionedRegionName);
-    assertNotNull(region);
+    assertThat(region).isNotNull();
     final HashSet<Integer> testKeysSet = new HashSet<>();
     for (int i = 150; i > 0; i--) {
       testKeysSet.add(i);
     }
     DistributedSystem.setThreadsSocketPolicy(false);
-    Function function = new TestFunction(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
+    Function<?> function = new TestFunction<>(true, 
TestFunction.TEST_FUNCTION_BUCKET_FILTER);
     if (shouldRegisterFunctionsOnClient()) {
       FunctionService.registerFunction(function);
     }
@@ -671,20 +673,21 @@ public class PRClientServerTestBase extends 
JUnit4CacheTestCase {
     ResultCollector<Integer, List<Integer>> rc = 
dataSet.withBucketFilter(bucketFilterSet)
         .withFilter(ketFilterSet).execute(function.getId());
     List<Integer> results = rc.getResult();
-    assertEquals(expectedBucketSet.size(), results.size());
+    assertThat(results.size()).isEqualTo(expectedBucketSet.size());
     for (Integer bucket : results) {
       expectedBucketSet.remove(bucket);
     }
-    assertTrue(expectedBucketSet.isEmpty());
+    assertThat(expectedBucketSet).isEmpty();
   }
 
-  public static class BucketFilterPRResolver implements PartitionResolver, 
Serializable {
+  public static class BucketFilterPRResolver
+      implements PartitionResolver<Object, Object>, Serializable {
 
     @Override
     public void close() {}
 
     @Override
-    public Object getRoutingObject(EntryOperation opDetails) {
+    public Object getRoutingObject(EntryOperation<Object, Object> opDetails) {
       Object key = opDetails.getKey();
       return getBucketID(key);
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
index 69a0450294..a63c8a1b52 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSender.java
@@ -16,6 +16,8 @@
 package org.apache.geode.internal.cache.execute;
 
 
+import java.util.function.BiFunction;
+
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.cache.execute.Function;
@@ -23,8 +25,10 @@ import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
 import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager;
 import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionFunctionStreamingMessage;
 import org.apache.geode.internal.serialization.KnownVersion;
@@ -43,7 +47,7 @@ public class PartitionedRegionFunctionResultSender implements 
InternalResultSend
 
   private static final Logger logger = LogService.getLogger();
 
-  PartitionedRegionFunctionStreamingMessage msg = null;
+  private final PartitionedRegionFunctionStreamingMessage msg;
 
   private final DistributionManager dm;
 
@@ -53,15 +57,15 @@ public class PartitionedRegionFunctionResultSender 
implements InternalResultSend
 
   private final boolean forwardExceptions;
 
-  private ResultCollector rc;
+  private final ResultCollector rc;
 
-  private ServerToClientFunctionResultSender serverSender;
+  private final ServerToClientFunctionResultSender serverSender;
 
   private boolean localLastResultReceived = false;
 
-  private boolean onlyLocal = false;
+  private final boolean onlyLocal;
 
-  private boolean onlyRemote = false;
+  private final boolean onlyRemote;
 
   private boolean completelyDoneFromRemote = false;
 
@@ -73,6 +77,7 @@ public class PartitionedRegionFunctionResultSender implements 
InternalResultSend
 
   private BucketMovedException bme;
 
+  private BiFunction<String, InternalDistributedSystem, FunctionStats> 
functionStatsFunctionProvider;
 
   public KnownVersion getClientVersion() {
     if (serverSender != null && serverSender.sc != null) { // is a 
client-server connection
@@ -81,41 +86,40 @@ public class PartitionedRegionFunctionResultSender 
implements InternalResultSend
     return null;
   }
 
-  /**
-   * Have to combine next two constructor in one and make a new class which 
will send Results back.
-   *
-   */
   public PartitionedRegionFunctionResultSender(DistributionManager dm, 
PartitionedRegion pr,
-      long time, PartitionedRegionFunctionStreamingMessage msg, Function 
function,
-      int[] bucketArray) {
-    this.msg = msg;
-    this.dm = dm;
-    this.pr = pr;
-    this.time = time;
-    this.function = function;
-    this.bucketArray = bucketArray;
-
-    forwardExceptions = false;
+      long time, PartitionedRegionFunctionStreamingMessage msg,
+      Function function, int[] bucketArray) {
+    this(dm, pr, time, null, null, false, false, false, function, bucketArray, 
msg,
+        (x, y) -> FunctionStatsManager.getFunctionStats((String) x, 
(InternalDistributedSystem) y));
   }
 
-  /**
-   * Have to combine next two constructor in one and make a new class which 
will send Results back.
-   *
-   */
   public PartitionedRegionFunctionResultSender(DistributionManager dm,
       PartitionedRegion partitionedRegion, long time, ResultCollector rc,
       ServerToClientFunctionResultSender sender, boolean onlyLocal, boolean 
onlyRemote,
       boolean forwardExceptions, Function function, int[] bucketArray) {
+    this(dm, partitionedRegion, time, rc, sender, onlyLocal, onlyRemote, 
forwardExceptions,
+        function, bucketArray, null,
+        (x, y) -> FunctionStatsManager.getFunctionStats((String) x, 
(InternalDistributedSystem) y));
+  }
+
+  PartitionedRegionFunctionResultSender(DistributionManager dm,
+      PartitionedRegion partitionedRegion, long time, ResultCollector rc,
+      ServerToClientFunctionResultSender sender, boolean onlyLocal, boolean 
onlyRemote,
+      boolean forwardExceptions, Function function, int[] bucketArray,
+      PartitionedRegionFunctionStreamingMessage msg,
+      BiFunction functionStatsFunctionProvider) {
     this.dm = dm;
     pr = partitionedRegion;
     this.time = time;
     this.rc = rc;
+    this.msg = msg;
     serverSender = sender;
     this.onlyLocal = onlyLocal;
     this.onlyRemote = onlyRemote;
     this.forwardExceptions = forwardExceptions;
     this.function = function;
     this.bucketArray = bucketArray;
+    this.functionStatsFunctionProvider = functionStatsFunctionProvider;
   }
 
   private void checkForBucketMovement(Object oneResult) {
@@ -201,7 +205,7 @@ public class PartitionedRegionFunctionResultSender 
implements InternalResultSend
           // call a synchronized method as local node is also waiting to send 
lastResult
           lastResult(oneResult, rc, false, true, 
dm.getDistributionManagerId());
         }
-        FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+        functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
             .incResultsReceived();
       }
       // incrementing result sent stats.
@@ -210,7 +214,7 @@ public class PartitionedRegionFunctionResultSender 
implements InternalResultSend
       // time the stats for the result sent is again incremented : Once the PR 
team comes with the
       // concept of the Streaming FunctionOperation
       // for the partitioned Region then it will be simple to fix this problem.
-      FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+      functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
           .incResultsReturned();
     }
   }
@@ -319,14 +323,14 @@ public class PartitionedRegionFunctionResultSender 
implements InternalResultSend
       if (dm == null) {
         
FunctionStatsManager.getFunctionStats(function.getId()).incResultsReceived();
       } else {
-        FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+        functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
             .incResultsReceived();
       }
     }
     if (dm == null) {
       
FunctionStatsManager.getFunctionStats(function.getId()).incResultsReturned();
     } else {
-      FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+      functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
           .incResultsReturned();
     }
   }
@@ -360,21 +364,31 @@ public class PartitionedRegionFunctionResultSender 
implements InternalResultSend
             "PartitionedRegionFunctionResultSender adding result to 
ResultCollector on local node {}",
             oneResult);
         rc.addResult(dm.getDistributionManagerId(), oneResult);
-        FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+        functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
             .incResultsReceived();
       }
       // incrementing result sent stats.
-      FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem())
+      functionStatsFunctionProvider.apply(function.getId(), dm.getSystem())
           .incResultsReturned();
     }
   }
 
   private void clientSend(Object oneResult, DistributedMember memberID) {
-    serverSender.sendResult(oneResult, memberID);
+    try {
+      serverSender.sendResult(oneResult, memberID);
+    } catch (FunctionException e) {
+      logger.warn("Exception when sending result to client", e);
+      setException(e);
+    }
   }
 
   private void lastClientSend(DistributedMember memberID, Object lastResult) {
-    serverSender.lastResult(lastResult, memberID);
+    try {
+      serverSender.lastResult(lastResult, memberID);
+    } catch (FunctionException e) {
+      logger.warn("Exception when sending last result to client", e);
+      setException(e);
+    }
   }
 
   @Override
@@ -411,5 +425,4 @@ public class PartitionedRegionFunctionResultSender 
implements InternalResultSend
   public boolean isLastResultReceived() {
     return localLastResultReceived;
   }
-
 }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java
new file mode 100644
index 0000000000..9ac049d663
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionResultSenderTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static 
org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultSenderTest.MethodToInvoke.LAST_RESULT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionDataStore;
+import org.apache.geode.internal.cache.execute.metrics.FunctionStats;
+import org.apache.geode.test.junit.runners.GeodeParamsRunner;
+
+@RunWith(GeodeParamsRunner.class)
+public class PartitionedRegionFunctionResultSenderTest {
+  private DistributionManager dm = mock(DistributionManager.class);
+  private PartitionedRegion region = mock(PartitionedRegion.class);
+  private PartitionedRegionDataStore dataStore = 
mock(PartitionedRegionDataStore.class);
+  private ATestResultCollector rc = new ATestResultCollector();
+  private ServerToClientFunctionResultSender 
serverToClientFunctionResultSender =
+      mock(ServerToClientFunctionResultSender.class);
+  private FunctionStats functionStats = mock(FunctionStats.class);
+
+  enum MethodToInvoke {
+    SEND_EXCEPTION, LAST_RESULT
+  }
+
+  @BeforeEach
+  public void setUp() {
+    when(region.getDataStore()).thenReturn(dataStore);
+    when(dataStore.areAllBucketsHosted(any())).thenReturn(true);
+  }
+
+  @Test
+  public void 
whenResponseToClientInLastResultFailsEndResultsIsCalled_NotOnlyLocal_OnlyRemote()
 {
+    doThrow(new FunctionException()).when(serverToClientFunctionResultSender)
+        .lastResult(any(), any());
+    PartitionedRegionFunctionResultSender sender =
+        new PartitionedRegionFunctionResultSender(dm,
+            region, 1, rc, serverToClientFunctionResultSender, false, true, 
true,
+            new TestFunction(), new int[2], null, (x, y) -> functionStats);
+
+    sender.lastResult(new Object(), true, rc, null);
+
+    assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+  }
+
+  @ParameterizedTest(name = "{displayName} with {arguments}")
+  @EnumSource(MethodToInvoke.class)
+  public void 
whenResponseToClientInLastResultFailsEndResultsIsCalled_OnlyLocal_NotOnlyRemote(
+      MethodToInvoke methodToInvoke) {
+    doThrow(new 
FunctionException("IOException")).when(serverToClientFunctionResultSender)
+        .lastResult(any(), any());
+
+    PartitionedRegionFunctionResultSender sender =
+        new PartitionedRegionFunctionResultSender(dm,
+            region, 1, rc, serverToClientFunctionResultSender, true, false, 
true,
+            new TestFunction(), new int[2]);
+
+    if (methodToInvoke == LAST_RESULT) {
+      sender.lastResult(new Object());
+    } else {
+      sender.sendException(new Exception());
+    }
+
+    assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+  }
+
+  @ParameterizedTest(name = "{displayName} with {arguments}")
+  @EnumSource(MethodToInvoke.class)
+  public void 
whenResponseToClientInSendResultFailsEndResultsIsCalled_NotOnlyLocal_OnlyRemote(
+      MethodToInvoke methodToInvoke) {
+    doThrow(new 
FunctionException("IOException")).when(serverToClientFunctionResultSender)
+        .sendResult(any(), any());
+    PartitionedRegionFunctionResultSender sender =
+        new PartitionedRegionFunctionResultSender(dm,
+            region, 1, rc, serverToClientFunctionResultSender, false, true, 
true,
+            new TestFunction(), new int[2], null, (x, y) -> functionStats);
+
+    if (methodToInvoke == LAST_RESULT) {
+      sender.lastResult(new Object());
+    } else {
+      sender.sendException(new Exception());
+    }
+
+    assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+  }
+
+  @ParameterizedTest(name = "{displayName} with {arguments}")
+  @EnumSource(MethodToInvoke.class)
+  public void 
whenResponseToClientInSendResultFailsEndResultsIsCalled_NotOnlyLocal_NotOnlyRemote(
+      MethodToInvoke methodToInvoke) {
+    doThrow(new 
FunctionException("IOException")).when(serverToClientFunctionResultSender)
+        .sendResult(any(), any());
+    PartitionedRegionFunctionResultSender sender =
+        new PartitionedRegionFunctionResultSender(dm,
+            region, 1, rc, serverToClientFunctionResultSender, false, false, 
true,
+            new TestFunction(), new int[2], null, (x, y) -> functionStats);
+
+    if (methodToInvoke == LAST_RESULT) {
+      sender.lastResult(new Object(), true, rc, null);
+    } else {
+      sender.sendException(new Exception());
+    }
+
+    assertThat(rc.isEndResultsCalled()).isEqualTo(true);
+  }
+
+  private static class TestFunction implements Function {
+    @Override
+    public void execute(FunctionContext context) {}
+  }
+
+  private static class ATestResultCollector implements ResultCollector {
+    private volatile boolean isEndResultsCalled = false;
+
+    @Override
+    public Object getResult() throws FunctionException {
+      return null;
+    }
+
+    @Override
+    public Object getResult(long timeout, TimeUnit unit)
+        throws FunctionException, InterruptedException {
+      return null;
+    }
+
+    @Override
+    public void addResult(DistributedMember memberID, Object 
resultOfSingleExecution) {}
+
+    @Override
+    public void endResults() {
+      isEndResultsCalled = true;
+    }
+
+    @Override
+    public void clearResults() {}
+
+    public boolean isEndResultsCalled() {
+      return isEndResultsCalled;
+    }
+  }
+}
diff --git 
a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
 
b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
index 951a9b7ca1..a46840fd0e 100755
--- 
a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
+++ 
b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java
@@ -99,6 +99,7 @@ public class TestFunction<T> implements Function<T>, 
Declarable2, DataSerializab
   public static final String TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP =
       "executeFunctionSingleHopForceNetworkHop";
   public static final String TEST_FUNCTION_GET_NETWORK_HOP = 
"executeFunctionGetNetworkHop";
+  public static final String TEST_FUNCTION_SLOW = "SlowFunction";
   private static final String ID = "id";
   private static final String HAVE_RESULTS = "haveResults";
   private final Properties props;
@@ -197,6 +198,8 @@ public class TestFunction<T> implements Function<T>, 
Declarable2, DataSerializab
       executeSingleHopForceNetworkHop(context);
     } else if (id.equals(TEST_FUNCTION_GET_NETWORK_HOP)) {
       executeGetNetworkHop(context);
+    } else if (id.equals(TEST_FUNCTION_SLOW)) {
+      executeSlowFunction(context);
     } else if (noAckTest.equals("true")) {
       execute1(context);
     }
@@ -1041,6 +1044,22 @@ public class TestFunction<T> implements Function<T>, 
Declarable2, DataSerializab
     context.getResultSender().lastResult(networkHopType);
   }
 
+  private void executeSlowFunction(FunctionContext context) {
+    int entries = 4;
+    int waitBetweenEntriesMs = 5000;
+    for (int i = 0; i < entries; i++) {
+      try {
+        Thread.sleep(waitBetweenEntriesMs);
+      } catch (InterruptedException e) {
+        context.getResultSender().sendException(e);
+        Thread.currentThread().interrupt();
+        return;
+      }
+      context.getResultSender().sendResult(i);
+    }
+    context.getResultSender().lastResult(entries);
+  }
+
   /**
    * Get the function identifier, used by clients to invoke this function
    *
@@ -1096,12 +1115,12 @@ public class TestFunction<T> implements Function<T>, 
Declarable2, DataSerializab
 
   @Override
   public boolean isHA() {
-
     if (getId().equals(TEST_FUNCTION10)) {
       return true;
     }
     if (getId().equals(TEST_FUNCTION_NONHA_SERVER) || 
getId().equals(TEST_FUNCTION_NONHA_REGION)
-        || getId().equals(TEST_FUNCTION_NONHA_NOP) || 
getId().equals(TEST_FUNCTION_NONHA)) {
+        || getId().equals(TEST_FUNCTION_NONHA_NOP) || 
getId().equals(TEST_FUNCTION_NONHA)
+        || getId().equals(TEST_FUNCTION_SLOW)) {
       return false;
     }
     return Boolean.parseBoolean(props.getProperty(HAVE_RESULTS));
diff --git 
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
 
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
index bce7ed9ade..f23ee9ad23 100644
--- 
a/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
+++ 
b/geode-dunit/src/main/java/org/apache/geode/test/junit/rules/ServerStarterRule.java
@@ -66,6 +66,7 @@ public class ServerStarterRule extends 
MemberStarterRule<ServerStarterRule> impl
   private PdxSerializer pdxSerializer = null;
   private boolean pdxReadSerialized = false;
   private boolean pdxReadSerializedUserSet = false;
+  private int maxThreads = -1;
   // By default we start one server per jvm
   private int serverCount = 1;
 
@@ -123,6 +124,11 @@ public class ServerStarterRule extends 
MemberStarterRule<ServerStarterRule> impl
     servers.clear();
   }
 
+  public ServerStarterRule withMaxThreads(int maxThreads) {
+    this.maxThreads = maxThreads;
+    return this;
+  }
+
   public ServerStarterRule withPDXPersistent() {
     pdxPersistent = true;
     pdxPersistentUserSet = true;
@@ -219,6 +225,9 @@ public class ServerStarterRule extends 
MemberStarterRule<ServerStarterRule> impl
       } else {
         server.setPort(0);
       }
+      if (maxThreads >= 0) {
+        server.setMaxThreads(maxThreads);
+      }
       try {
         server.start();
       } catch (IOException e) {
diff --git 
a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
 
b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
index 81f334b6bf..70a8520acc 100644
--- 
a/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
+++ 
b/geode-dunit/src/main/resources/org/apache/geode/test/dunit/internal/sanctioned-geode-dunit-serializables.txt
@@ -180,4 +180,4 @@ 
org/apache/geode/test/junit/rules/LocatorLauncherStartupRule,false,autoStart:boo
 org/apache/geode/test/junit/rules/LocatorStarterRule,false
 
org/apache/geode/test/junit/rules/MemberStarterRule,false,autoStart:boolean,availableHttpPort:int,availableJmxPort:int,cleanWorkingDir:boolean,firstLevelChildrenFile:java/util/List,httpPort:int,jmxPort:int,logFile:boolean,memberPort:int,name:java/lang/String,properties:java/util/Properties,restore:org/apache/geode/test/junit/rules/accessible/AccessibleRestoreSystemProperties,systemProperties:java/util/Properties
 
org/apache/geode/test/junit/rules/ServerLauncherStartupRule,false,autoStart:boolean,builderOperator:java/util/function/UnaryOperator,launcher:org/apache/geode/distributed/ServerLauncher,properties:java/util/Properties,temp:org/junit/rules/TemporaryFolder
-org/apache/geode/test/junit/rules/ServerStarterRule,false,availableLocatorPort:int,embeddedLocatorPort:int,pdxPersistent:boolean,pdxPersistentUserSet:boolean,pdxReadSerialized:boolean,pdxReadSerializedUserSet:boolean,pdxSerializer:org/apache/geode/pdx/PdxSerializer,regions:java/util/Map,serverCount:int
+org/apache/geode/test/junit/rules/ServerStarterRule,false,availableLocatorPort:int,embeddedLocatorPort:int,maxThreads:int,pdxPersistent:boolean,pdxPersistentUserSet:boolean,pdxReadSerialized:boolean,pdxReadSerializedUserSet:boolean,pdxSerializer:org/apache/geode/pdx/PdxSerializer,regions:java/util/Map,serverCount:int

Reply via email to