This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new 7838d5d521 GEODE-9484: Improve sending message to multy destinations 
(#7664) (#7853)
7838d5d521 is described below

commit 7838d5d5210ab3afe243b2045242d7bfd6acc6ed
Author: Mario Kevo <48509719+mk...@users.noreply.github.com>
AuthorDate: Mon Sep 12 10:02:59 2022 +0200

    GEODE-9484: Improve sending message to multy destinations (#7664) (#7853)
    
    Co-authored-by: Mario Ivanac <48509724+miva...@users.noreply.github.com>
---
 ....java => UpdatePropagationDistributedTest.java} | 107 ++++++++++++++++---
 ...ava => UpdatePropagationPRDistributedTest.java} |   2 +-
 .../geode/internal/tcp/CloseConnectionTest.java    |   2 +-
 .../geode/internal/tcp/TCPConduitDUnitTest.java    |   2 +-
 .../distributed/internal/direct/DirectChannel.java |  44 +++++---
 .../org/apache/geode/internal/tcp/Connection.java  |   6 +-
 .../apache/geode/internal/tcp/ConnectionTable.java |  30 ++++--
 .../org/apache/geode/internal/tcp/TCPConduit.java  | 118 ++++++++++++++++++---
 .../internal/tcp/ConnectionTransmissionTest.java   |   2 +-
 .../apache/geode/internal/tcp/TCPConduitTest.java  |  97 ++++++++++++++---
 10 files changed, 343 insertions(+), 67 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
similarity index 78%
rename from 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
rename to 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
index 0b99a144e5..055780782f 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
@@ -20,6 +20,7 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static 
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
@@ -50,6 +51,8 @@ import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.cache30.CacheSerializableRunnable;
 import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import 
org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.NetworkUtils;
@@ -68,53 +71,89 @@ import org.apache.geode.util.internal.GeodeGlossary;
  * the same across servers
  */
 @Category({ClientSubscriptionTest.class})
-public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
+public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase {
 
   private static final String REGION_NAME = 
"UpdatePropagationDUnitTest_region";
 
   private VM server1 = null;
   private VM server2 = null;
+  private VM server3 = null;
   private VM client1 = null;
   private VM client2 = null;
 
   private int PORT1;
   private int PORT2;
+  private int PORT3;
+
+  private final int minNumEntries = 2;
+
+  private String hostnameServer1;
+  private String hostnameServer3;
 
   @Override
   public final void postSetUp() throws Exception {
     disconnectAllFromDS();
 
     final Host host = Host.getHost(0);
-    // Server1 VM
+
     server1 = host.getVM(0);
 
-    // Server2 VM
     server2 = host.getVM(1);
 
-    // Client 1 VM
-    client1 = host.getVM(2);
+    server3 = host.getVM(2);
 
-    // client 2 VM
-    client2 = host.getVM(3);
+    client1 = host.getVM(3);
 
-    PORT1 = server1.invoke(this::createServerCache);
-    PORT2 = server2.invoke(this::createServerCache);
+    client2 = host.getVM(4);
 
-    client1.invoke(
-        () -> 
createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, 
PORT2));
-    client2.invoke(
-        () -> 
createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, 
PORT2));
+    PORT1 = server1.invoke(() -> createServerCache());
+    PORT2 = server2.invoke(() -> createServerCache());
+    PORT3 = server3.invoke(() -> createServerCache());
+
+    hostnameServer1 = NetworkUtils.getServerHostName(server1.getHost());
+    hostnameServer3 = NetworkUtils.getServerHostName(server3.getHost());
 
     IgnoredException.addIgnoredException("java.net.SocketException");
     IgnoredException.addIgnoredException("Unexpected IOException");
   }
 
+
+
+  @Test
+  public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception 
{
+    client1.invoke(
+        () -> createClientCache(hostnameServer1, PORT1));
+    client2.invoke(
+        () -> createClientCache(hostnameServer3, PORT3));
+    int entries = 20;
+    AsyncInvocation invocation = client1.invokeAsync(() -> doPuts(entries));
+
+    // Wait for some entries to be put
+    server1.invoke(this::verifyMinEntriesInserted);
+
+    // Simulate crash
+    server2.invoke(() -> {
+      MembershipManagerHelper.crashDistributedSystem(getSystemStatic());
+    });
+
+    invocation.await();
+
+    int notNullEntriesIn1 = client1.invoke(() -> 
getNotNullEntriesNumber(entries));
+    int notNullEntriesIn3 = client2.invoke(() -> 
getNotNullEntriesNumber(entries));
+    assertThat(notNullEntriesIn3).isEqualTo(notNullEntriesIn1);
+  }
+
   /**
    * This tests whether the updates are received by other clients or not , if 
there are situation of
    * Interest List fail over
    */
   @Test
   public void updatesAreProgegatedAfterFailover() {
+    client1.invoke(
+        () -> createClientCache(hostnameServer1, PORT1, PORT2));
+    client2.invoke(
+        () -> createClientCache(hostnameServer1, PORT1, PORT2));
+
     // First create entries on both servers via the two client
     client1.invoke(this::createEntriesK1andK2);
     client2.invoke(this::createEntriesK1andK2);
@@ -248,6 +287,18 @@ public class UpdatePropagationDUnitTest extends 
JUnit4CacheTestCase {
         .addCacheListener(new 
EventTrackingCacheListener()).create(REGION_NAME);
   }
 
+  private void createClientCache(String host, Integer port1) {
+    Properties props = new Properties();
+    props.setProperty(LOCATORS, "");
+    ClientCacheFactory cf = new ClientCacheFactory();
+    cf.addPoolServer(host, port1).setPoolSubscriptionEnabled(false)
+        
.setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000)
+        .setPoolReadTimeout(100).setPoolPingInterval(300);
+    ClientCache cache = getClientCache(cf);
+    cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
+        .create(REGION_NAME);
+  }
+
   private Integer createServerCache() throws Exception {
     Cache cache = getCache();
     RegionAttributes attrs = createCacheServerAttributes();
@@ -305,6 +356,36 @@ public class UpdatePropagationDUnitTest extends 
JUnit4CacheTestCase {
     });
   }
 
+  private void verifyMinEntriesInserted() {
+    await().untilAsserted(() -> assertThat(getCache().getRegion(SEPARATOR + 
REGION_NAME))
+        .hasSizeGreaterThan(minNumEntries));
+  }
+
+  private void doPuts(int entries) throws Exception {
+    Region<String, String> r1 = getCache().getRegion(REGION_NAME);
+    assertThat(r1).isNotNull();
+    for (int i = 0; i < entries; i++) {
+      try {
+        r1.put("" + i, "" + i);
+      } catch (Exception e) {
+      }
+      Thread.sleep(1000);
+    }
+  }
+
+  private int getNotNullEntriesNumber(int entries) {
+    int notNullEntries = 0;
+    Region<String, String> r1 = getCache().getRegion(SEPARATOR + REGION_NAME);
+    assertThat(r1).isNotNull();
+    for (int i = 0; i < entries; i++) {
+      Object value = r1.get("" + i, "" + i);
+      if (value != null) {
+        notNullEntries++;
+      }
+    }
+    return notNullEntries;
+  }
+
   private static class EventTrackingCacheListener extends CacheListenerAdapter 
{
 
     List<EntryEvent> receivedEvents = new ArrayList<>();
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
similarity index 93%
rename from 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
rename to 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
index 47721ceb2c..77d903ee0e 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
@@ -21,7 +21,7 @@ import org.apache.geode.cache.RegionAttributes;
 /**
  * subclass of UpdatePropagationDUnitTest to exercise partitioned regions
  */
-public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest {
+public class UpdatePropagationPRDistributedTest extends 
UpdatePropagationDistributedTest {
 
   @Override
   protected RegionAttributes createCacheServerAttributes() {
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
index cdb5432399..5aeba3fac2 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
@@ -110,7 +110,7 @@ public class CloseConnectionTest implements Serializable {
       InternalDistributedSystem distributedSystem = 
getCache().getInternalDistributedSystem();
       InternalDistributedMember otherMember = 
distributedSystem.getDistributionManager()
           .getOtherNormalDistributionManagerIds().iterator().next();
-      Connection connection = conTable.getConduit().getConnection(otherMember, 
true, false,
+      Connection connection = conTable.getConduit().getConnection(otherMember, 
true,
           System.currentTimeMillis(), 15000, 0);
       await().untilAsserted(() -> {
         // grab the shared, ordered "sender" connection to vm0. It should have 
a residual
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
index 41d64c67f6..794d6e093d 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
@@ -110,7 +110,7 @@ public class TCPConduitDUnitTest extends 
DistributedTestCase {
     assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();
 
     Connection sharedUnordered = connectionTable.get(otherMember, false,
-        System.currentTimeMillis(), 15000, 0);
+        System.currentTimeMillis(), 15000, 0, false);
     sharedUnordered.requestClose("for testing");
     // the sender connection has been closed so we should only have 2 senders 
now
     assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index a8a7bb8c20..eaac79f2b8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -281,11 +281,17 @@ public class DirectChannel {
           directReply = false;
         }
         if (ce != null) {
-          if (failedCe != null) {
-            failedCe.getMembers().addAll(ce.getMembers());
-            failedCe.getCauses().addAll(ce.getCauses());
+
+          if (!retry) {
+            retryInfo = ce;
           } else {
-            failedCe = ce;
+
+            if (failedCe != null) {
+              failedCe.getMembers().addAll(ce.getMembers());
+              failedCe.getCauses().addAll(ce.getCauses());
+            } else {
+              failedCe = ce;
+            }
           }
           ce = null;
         }
@@ -293,6 +299,9 @@ public class DirectChannel {
           if (failedCe != null) {
             throw failedCe;
           }
+          if (retryInfo != null) {
+            continue;
+          }
           return bytesWritten;
         }
 
@@ -338,7 +347,12 @@ public class DirectChannel {
         }
 
         if (ce != null) {
-          retryInfo = ce;
+          if (retryInfo != null) {
+            retryInfo.getMembers().addAll(ce.getMembers());
+            retryInfo.getCauses().addAll(ce.getCauses());
+          } else {
+            retryInfo = ce;
+          }
           ce = null;
         }
 
@@ -423,13 +437,13 @@ public class DirectChannel {
    * @param retry whether this is a retransmission
    * @param ackTimeout the ack warning timeout
    * @param ackSDTimeout the ack severe alert timeout
-   * @param cons a list to hold the connections
+   * @param connectionsList a list to hold the connections
    * @return null if everything went okay, or a ConnectExceptions object if 
some connections
    *         couldn't be obtained
    */
   private ConnectExceptions getConnections(Membership mgr, DistributionMessage 
msg,
       InternalDistributedMember[] destinations, boolean preserveOrder, boolean 
retry,
-      long ackTimeout, long ackSDTimeout, List cons) {
+      long ackTimeout, long ackSDTimeout, List<Connection> connectionsList) {
     ConnectExceptions ce = null;
     for (InternalDistributedMember destination : destinations) {
       if (destination == null) {
@@ -458,12 +472,18 @@ public class DirectChannel {
           if (ackTimeout > 0) {
             startTime = System.currentTimeMillis();
           }
-          Connection con = conduit.getConnection(destination, preserveOrder, 
retry, startTime,
-              ackTimeout, ackSDTimeout);
+          final Connection connection;
+          if (!retry) {
+            connection = conduit.getFirstScanForConnection(destination, 
preserveOrder, startTime,
+                ackTimeout, ackSDTimeout);
+          } else {
+            connection = conduit.getConnection(destination, preserveOrder, 
startTime,
+                ackTimeout, ackSDTimeout);
+          }
 
-          con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
-          cons.add(con);
-          if (con.isSharedResource() && msg instanceof DirectReplyMessage) {
+          connection.setInUse(true, startTime, 0, 0, null); // fix for 
bug#37657
+          connectionsList.add(connection);
+          if (connection.isSharedResource() && msg instanceof 
DirectReplyMessage) {
             DirectReplyMessage directMessage = (DirectReplyMessage) msg;
             directMessage.registerProcessor();
           }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 44205d4d63..9e921d7d03 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -961,7 +961,7 @@ public class Connection implements Runnable {
       final ConnectionTable t,
       final boolean preserveOrder, final InternalDistributedMember remoteAddr,
       final boolean sharedResource,
-      final long startTime, final long ackTimeout, final long ackSATimeout)
+      final long startTime, final long ackTimeout, final long ackSATimeout, 
boolean doNotRetry)
       throws IOException, DistributedSystemDisconnectedException {
     boolean success = false;
     Connection conn = null;
@@ -1021,7 +1021,9 @@ public class Connection implements Runnable {
             // do not change the text of this exception - it is looked for in 
exception handlers
             throw new IOException("Cannot form connection to alert listener " 
+ remoteAddr);
           }
-
+          if (doNotRetry) {
+            throw new IOException("Connection not created in first try to " + 
remoteAddr);
+          }
           // Wait briefly...
           interrupted = Thread.interrupted() || interrupted;
           try {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index f54f7bd9cd..f1d157d27f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -269,6 +269,7 @@ public class ConnectionTable {
    * @param startTime the ms clock start time for the operation
    * @param ackThreshold the ms ack-wait-threshold, or zero
    * @param ackSAThreshold the ms ack-severe_alert-threshold, or zero
+   * @param doNotRetry whether we should perform reattempt to create connection
    * @return the Connection, or null if someone else already created or closed 
it
    * @throws IOException if unable to connect
    */
@@ -276,13 +277,14 @@ public class ConnectionTable {
       boolean sharedResource,
       boolean preserveOrder, Map<DistributedMember, Object> m, 
PendingConnection pc, long startTime,
       long ackThreshold,
-      long ackSAThreshold) throws IOException, 
DistributedSystemDisconnectedException {
+      long ackSAThreshold, boolean doNotRetry)
+      throws IOException, DistributedSystemDisconnectedException {
     // handle new pending connection
     Connection con = null;
     try {
       long senderCreateStartTime = owner.getStats().startSenderCreate();
       con = Connection.createSender(owner.getMembership(), this, 
preserveOrder, id,
-          sharedResource, startTime, ackThreshold, ackSAThreshold);
+          sharedResource, startTime, ackThreshold, ackSAThreshold, doNotRetry);
       owner.getStats().incSenders(sharedResource, preserveOrder, 
senderCreateStartTime);
     } finally {
       // our connection failed to notify anyone waiting for our pending con
@@ -350,11 +352,14 @@ public class ConnectionTable {
    * @param startTime the ms clock start time for the operation
    * @param ackTimeout the ms ack-wait-threshold, or zero
    * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
+   * @param doNotRetryWaitForConnection whether we should perform reattempt 
(or wait) to create
+   *        connection
    * @return the new Connection, or null if an error
    * @throws IOException if unable to create the connection
    */
   private Connection getSharedConnection(InternalDistributedMember id, boolean 
scheduleTimeout,
-      boolean preserveOrder, long startTime, long ackTimeout, long 
ackSATimeout)
+      boolean preserveOrder, long startTime, long ackTimeout, long 
ackSATimeout,
+      boolean doNotRetryWaitForConnection)
       throws IOException, DistributedSystemDisconnectedException {
 
     final Map<DistributedMember, Object> m =
@@ -387,7 +392,7 @@ public class ConnectionTable {
         logger.debug("created PendingConnection {}", pc);
       }
       result = handleNewPendingConnection(id, true, preserveOrder, m, pc,
-          startTime, ackTimeout, ackSATimeout);
+          startTime, ackTimeout, ackSATimeout, doNotRetryWaitForConnection);
       if (!preserveOrder && scheduleTimeout) {
         scheduleIdleTimeout(result);
       }
@@ -400,6 +405,10 @@ public class ConnectionTable {
           throw new IOException("Cannot form connection to alert listener " + 
id);
         }
 
+        if (doNotRetryWaitForConnection) {
+          return null;
+        }
+
         result = ((PendingConnection) 
mEntry).waitForConnect(owner.getMembership(),
             startTime, ackTimeout, ackSATimeout);
         if (logger.isDebugEnabled()) {
@@ -425,11 +434,13 @@ public class ConnectionTable {
    * @param startTime the ms clock start time for the operation
    * @param ackTimeout the ms ack-wait-threshold, or zero
    * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
+   * @param doNotRetry whether we should perform reattempt to create connection
    * @return the connection, or null if an error
    * @throws IOException if the connection could not be created
    */
   Connection getThreadOwnedConnection(InternalDistributedMember id, long 
startTime, long ackTimeout,
-      long ackSATimeout) throws IOException, 
DistributedSystemDisconnectedException {
+      long ackSATimeout, boolean doNotRetry)
+      throws IOException, DistributedSystemDisconnectedException {
     Connection result;
 
     // Look for result in the thread local
@@ -449,7 +460,7 @@ public class ConnectionTable {
     // OK, we have to create a new connection.
     long senderCreateStartTime = owner.getStats().startSenderCreate();
     result = Connection.createSender(owner.getMembership(), this, true, id, 
false, startTime,
-        ackTimeout, ackSATimeout);
+        ackTimeout, ackSATimeout, doNotRetry);
     owner.getStats().incSenders(false, true, senderCreateStartTime);
     if (logger.isDebugEnabled()) {
       logger.debug("ConnectionTable: created an ordered connection: {}", 
result);
@@ -521,11 +532,12 @@ public class ConnectionTable {
    * @param startTime the ms clock start time
    * @param ackTimeout the ms ack-wait-threshold, or zero
    * @param ackSATimeout the ms ack-severe-alert-threshold, or zero
+   * @param doNotRetry whether we should perform reattempt to create connection
    * @return the new Connection, or null if a problem
    * @throws IOException if the connection could not be created
    */
   protected Connection get(InternalDistributedMember id, boolean 
preserveOrder, long startTime,
-      long ackTimeout, long ackSATimeout)
+      long ackTimeout, long ackSATimeout, boolean doNotRetry)
       throws IOException, DistributedSystemDisconnectedException {
     if (closed) {
       owner.getCancelCriterion().checkCancelInProgress(null);
@@ -535,9 +547,9 @@ public class ConnectionTable {
     boolean threadOwnsResources = threadOwnsResources();
     if (!preserveOrder || !threadOwnsResources) {
       result = getSharedConnection(id, threadOwnsResources, preserveOrder, 
startTime, ackTimeout,
-          ackSATimeout);
+          ackSATimeout, doNotRetry);
     } else {
-      result = getThreadOwnedConnection(id, startTime, ackTimeout, 
ackSATimeout);
+      result = getThreadOwnedConnection(id, startTime, ackTimeout, 
ackSATimeout, doNotRetry);
     }
     if (result != null) {
       Assert.assertTrue(result.getPreserveOrder() == preserveOrder);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 4d6d9c8216..843b49c25f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -719,7 +719,6 @@ public class TCPConduit implements Runnable {
    *
    * @param memberAddress the IDS associated with the remoteId
    * @param preserveOrder whether this is an ordered or unordered connection
-   * @param retry false if this is the first attempt
    * @param startTime the time this operation started
    * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be 
transmitted (or zero)
    * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the 
operation to be transmitted
@@ -728,7 +727,7 @@ public class TCPConduit implements Runnable {
    * @return the connection
    */
   public Connection getConnection(InternalDistributedMember memberAddress,
-      final boolean preserveOrder, boolean retry, long startTime, long 
ackTimeout,
+      final boolean preserveOrder, long startTime, long ackTimeout,
       long ackSATimeout) throws IOException, 
DistributedSystemDisconnectedException {
     if (stopped) {
       throw new DistributedSystemDisconnectedException("The conduit is 
stopped");
@@ -742,7 +741,7 @@ public class TCPConduit implements Runnable {
       try {
         // If this is the second time through this loop, we had problems.
         // Tear down the connection so that it gets rebuilt.
-        if (retry || conn != null) { // not first time in loop
+        if (conn != null) { // not first time in loop
           if (!membership.memberExists(memberAddress)
               || membership.isShunned(memberAddress)
               || membership.shutdownInProgress()) {
@@ -777,18 +776,15 @@ public class TCPConduit implements Runnable {
 
           // Close the connection (it will get rebuilt later).
           getStats().incReconnectAttempts();
-          if (conn != null) {
-            try {
-              if (logger.isDebugEnabled()) {
-                logger.debug("Closing old connection.  conn={} before 
retrying. memberInTrouble={}",
-                    conn, memberInTrouble);
-              }
-              conn.closeForReconnect("closing before retrying");
-            } catch (CancelException ex) {
-              throw ex;
-            } catch (Exception ex) {
-              // ignored
+          try {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Closing old connection.  conn={} before retrying. 
memberInTrouble={}",
+                  conn, memberInTrouble);
             }
+            conn.closeForReconnect("closing before retrying");
+          } catch (CancelException ex) {
+            throw ex;
+          } catch (Exception ignored) {
           }
         } // not first time in loop
 
@@ -801,7 +797,7 @@ public class TCPConduit implements Runnable {
           do {
             retryForOldConnection = false;
             conn = getConTable().get(memberAddress, preserveOrder, startTime, 
ackTimeout,
-                ackSATimeout);
+                ackSATimeout, false);
             if (conn == null) {
               // conduit may be closed - otherwise an ioexception would be 
thrown
               problem = new IOException(
@@ -909,6 +905,98 @@ public class TCPConduit implements Runnable {
     }
   }
 
+  /**
+   * Return a connection to the given member. This method performs quick scan 
for connection.
+   * Only one attempt to create a connection to the given member .
+   *
+   * @param memberAddress the IDS associated with the remoteId
+   * @param preserveOrder whether this is an ordered or unordered connection
+   * @param startTime the time this operation started
+   * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be 
transmitted (or zero)
+   * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the 
operation to be transmitted
+   *        (or zero)
+   *
+   * @return the connection
+   */
+  public Connection getFirstScanForConnection(InternalDistributedMember 
memberAddress,
+      final boolean preserveOrder, long startTime, long ackTimeout,
+      long ackSATimeout) throws IOException, 
DistributedSystemDisconnectedException {
+    if (stopped) {
+      throw new DistributedSystemDisconnectedException("The conduit is 
stopped");
+    }
+
+    Connection connection = null;
+    stopper.checkCancelInProgress(null);
+    boolean interrupted = Thread.interrupted();
+    try {
+
+      Exception problem = null;
+      try {
+        connection = getConnectionThatIsNotClosed(memberAddress, 
preserveOrder, startTime,
+            ackTimeout, ackSATimeout);
+
+        // we have a connection; fall through and return it
+      } catch (ConnectionException e) {
+        // Race condition between acquiring the connection and attempting
+        // to use it: another thread closed it.
+        problem = e;
+        // No need to retry since Connection.createSender has already
+        // done retries and now member is really unreachable for some reason
+        // even though it may be in the view
+      } catch (IOException e) {
+        problem = e;
+        // don't keep trying to connect to an alert listener
+        if (AlertingAction.isThreadAlerting()) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Giving up connecting to alert listener {}", 
memberAddress);
+          }
+        }
+      }
+
+      if (problem != null) {
+        if (problem instanceof IOException) {
+          if (problem.getMessage() != null
+              && problem.getMessage().startsWith("Cannot form connection to 
alert listener")) {
+            throw new AlertingIOException((IOException) problem);
+          }
+          throw (IOException) problem;
+        }
+        throw new IOException(
+            String.format("Problem connecting to %s", memberAddress), problem);
+      }
+      // Success!
+
+      return connection;
+    } finally {
+      if (interrupted) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  private Connection getConnectionThatIsNotClosed(InternalDistributedMember 
memberAddress,
+      final boolean preserveOrder, long startTime, long ackTimeout, long 
ackSATimeout)
+      throws IOException, ConnectionException {
+    boolean debugEnabled = logger.isDebugEnabled();
+    Connection connection;
+    while (true) {
+      connection = getConTable().get(memberAddress, preserveOrder, startTime, 
ackTimeout,
+          ackSATimeout, true);
+      if (connection == null) {
+        throw new IOException("Unable to reconnect to server; possible 
shutdown: " + memberAddress);
+      }
+
+      if (!connection.isClosing() && 
connection.getRemoteAddress().equals(memberAddress)) {
+        return connection;
+      }
+      if (debugEnabled) {
+        logger.debug("Got an old connection for {}: {}@{}", memberAddress, 
connection,
+            connection.hashCode());
+      }
+      connection.closeOldConnection("closing old connection");
+    }
+  }
+
   @Override
   public String toString() {
     return String.valueOf(id);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
index 5a041eb167..906a021dec 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
@@ -168,7 +168,7 @@ public class ConnectionTransmissionTest {
     senderAddr.setDirectChannelPort(conduit.getPort());
 
     return spy(Connection.createSender(membership, writerTable, true, 
remoteAddr, true,
-        System.currentTimeMillis(), 1000, 1000));
+        System.currentTimeMillis(), 1000, 1000, false));
   }
 
   private Connection createReceiverConnectionOnFirstAccept(final 
ServerSocketChannel acceptorSocket,
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
index 392a5993c0..e1b3ddf49e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
@@ -94,7 +94,8 @@ public class TCPConduitTest {
             TCPConduit -> connectionTable, socketCreator, doNothing(), false);
     InternalDistributedMember member = mock(InternalDistributedMember.class);
     doThrow(new IOException("Cannot form connection to alert listener"))
-        .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), 
anyLong(), anyLong());
+        .when(connectionTable)
+        .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
     when(membership.memberExists(eq(member)))
         .thenReturn(true);
     when(membership.isShunned(same(member)))
@@ -102,7 +103,7 @@ public class TCPConduitTest {
 
     AlertingAction.execute(() -> {
       Throwable thrown = catchThrowable(() -> {
-        tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+        tcpConduit.getConnection(member, false, 0L, 0L, 0L);
       });
 
       assertThat(thrown)
@@ -123,13 +124,14 @@ public class TCPConduitTest {
     doThrow(new IOException("Cannot form connection to alert listener"))
         // getConnection will loop indefinitely until connectionTable returns 
connection
         .doReturn(connection)
-        .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), 
anyLong(), anyLong());
+        .when(connectionTable)
+        .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
     when(membership.memberExists(eq(member)))
         .thenReturn(true);
     when(membership.isShunned(same(member)))
         .thenReturn(false);
 
-    Connection value = tcpConduit.getConnection(member, false, false, 0L, 0L, 
0L);
+    Connection value = tcpConduit.getConnection(member, false, 0L, 0L, 0L);
 
     assertThat(value)
         .isSameAs(connection);
@@ -143,12 +145,13 @@ public class TCPConduitTest {
             TCPConduit -> connectionTable, socketCreator, doNothing(), false);
     InternalDistributedMember member = mock(InternalDistributedMember.class);
     doThrow(new IOException("Cannot form connection to alert listener"))
-        .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), 
anyLong(), anyLong());
+        .when(connectionTable)
+        .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
     when(membership.memberExists(eq(member)))
         .thenReturn(false);
 
     Throwable thrown = catchThrowable(() -> {
-      tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+      tcpConduit.getConnection(member, false, 0L, 0L, 0L);
     });
 
     assertThat(thrown)
@@ -164,14 +167,15 @@ public class TCPConduitTest {
             TCPConduit -> connectionTable, socketCreator, doNothing(), false);
     InternalDistributedMember member = mock(InternalDistributedMember.class);
     doThrow(new IOException("Cannot form connection to alert listener"))
-        .when(connectionTable).get(same(member), anyBoolean(), anyLong(), 
anyLong(), anyLong());
+        .when(connectionTable)
+        .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
     when(membership.memberExists(same(member)))
         .thenReturn(true);
     when(membership.isShunned(same(member)))
         .thenReturn(true);
 
     Throwable thrown = catchThrowable(() -> {
-      tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+      tcpConduit.getConnection(member, false, 0L, 0L, 0L);
     });
 
     assertThat(thrown)
@@ -188,7 +192,8 @@ public class TCPConduitTest {
             TCPConduit -> connectionTable, socketCreator, doNothing(), false);
     InternalDistributedMember member = mock(InternalDistributedMember.class);
     doThrow(new IOException("Cannot form connection to alert listener"))
-        .when(connectionTable).get(same(member), anyBoolean(), anyLong(), 
anyLong(), anyLong());
+        .when(connectionTable)
+        .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
     when(membership.memberExists(same(member)))
         .thenReturn(true);
     when(membership.isShunned(same(member)))
@@ -197,7 +202,7 @@ public class TCPConduitTest {
         .thenReturn(true);
 
     Throwable thrown = catchThrowable(() -> {
-      tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+      tcpConduit.getConnection(member, false, 0L, 0L, 0L);
     });
 
     assertThat(thrown)
@@ -214,7 +219,8 @@ public class TCPConduitTest {
             TCPConduit -> connectionTable, socketCreator, doNothing(), false);
     InternalDistributedMember member = mock(InternalDistributedMember.class);
     doThrow(new IOException("Cannot form connection to alert listener"))
-        .when(connectionTable).get(same(member), anyBoolean(), anyLong(), 
anyLong(), anyLong());
+        .when(connectionTable)
+        .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
     when(membership.memberExists(same(member)))
         .thenReturn(true);
     when(membership.isShunned(same(member)))
@@ -223,7 +229,7 @@ public class TCPConduitTest {
         .thenReturn(true);
 
     Throwable thrown = catchThrowable(() -> {
-      tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
+      tcpConduit.getConnection(member, false, 0L, 0L, 0L);
     });
 
     assertThat(thrown)
@@ -231,6 +237,73 @@ public class TCPConduitTest {
         .hasMessage("Abandoned because shutdown is in progress");
   }
 
+  @Test
+  public void 
getFirstScanForConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting()
+      throws Exception {
+    TCPConduit tcpConduit =
+        new TCPConduit(membership, 0, localHost, false, directChannel, 
mock(BufferPool.class),
+            new Properties(),
+            TCPConduit -> connectionTable, socketCreator, doNothing(), false);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    doThrow(new IOException("Cannot form connection to alert listener"))
+        .when(connectionTable)
+        .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
+
+    AlertingAction.execute(() -> {
+      Throwable thrown = catchThrowable(() -> {
+        tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
+      });
+
+      assertThat(thrown)
+          .isInstanceOf(AlertingIOException.class);
+    });
+  }
+
+  @Test
+  public void 
getFirstScanForConnectionRethrows_ifCaughtIOException_whileNotAlerting()
+      throws Exception {
+    TCPConduit tcpConduit =
+        new TCPConduit(membership, 0, localHost, false, directChannel, 
mock(BufferPool.class),
+            new Properties(),
+            TCPConduit -> connectionTable, socketCreator, doNothing(), false);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    Connection connection = mock(Connection.class);
+    doThrow(new IOException("Connection not created in first try"))
+        .doReturn(connection)
+        .when(connectionTable)
+        .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
+
+    Throwable thrown = catchThrowable(() -> {
+      tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
+    });
+
+    assertThat(thrown)
+        .isInstanceOf(IOException.class);
+  }
+
+
+  @Test
+  public void 
getFirstScanForConnectionRethrows_ifCaughtIOException_whithoutMessage()
+      throws Exception {
+    TCPConduit tcpConduit =
+        new TCPConduit(membership, 0, localHost, false, directChannel, 
mock(BufferPool.class),
+            new Properties(),
+            TCPConduit -> connectionTable, socketCreator, doNothing(), false);
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    Connection connection = mock(Connection.class);
+    doThrow(new IOException())
+        .doReturn(connection)
+        .when(connectionTable)
+        .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), 
anyBoolean());
+
+    Throwable thrown = catchThrowable(() -> {
+      tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
+    });
+
+    assertThat(thrown)
+        .isInstanceOf(IOException.class);
+  }
+
   private Runnable doNothing() {
     return () -> {
       // nothing

Reply via email to