This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 0ad4c8a9f6 Revert "GEODE-9484: Improve sending message to multy
destinations (#7381)" (#7655)
0ad4c8a9f6 is described below
commit 0ad4c8a9f66b1e9dd861efd7c76b28135a07cb36
Author: Mario Ivanac <[email protected]>
AuthorDate: Fri May 6 01:47:13 2022 +0200
Revert "GEODE-9484: Improve sending message to multy destinations (#7381)"
(#7655)
This reverts commit 62cd12c7f0bbb3d092011555e714e57ce041791a.
---
...edTest.java => UpdatePropagationDUnitTest.java} | 109 +++----------------
...Test.java => UpdatePropagationPRDUnitTest.java} | 2 +-
.../geode/internal/tcp/CloseConnectionTest.java | 2 +-
.../geode/internal/tcp/TCPConduitDUnitTest.java | 2 +-
.../distributed/internal/direct/DirectChannel.java | 44 +++-----
.../org/apache/geode/internal/tcp/Connection.java | 6 +-
.../apache/geode/internal/tcp/ConnectionTable.java | 30 ++----
.../org/apache/geode/internal/tcp/TCPConduit.java | 117 +++------------------
.../internal/tcp/ConnectionTransmissionTest.java | 2 +-
.../apache/geode/internal/tcp/TCPConduitTest.java | 74 +++----------
10 files changed, 68 insertions(+), 320 deletions(-)
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
similarity index 78%
rename from
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
rename to
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
index 58de5b4762..0b99a144e5 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java
@@ -20,7 +20,6 @@ import static
org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static
org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
@@ -51,8 +50,6 @@ import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
-import
org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
-import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.NetworkUtils;
@@ -71,76 +68,45 @@ import org.apache.geode.util.internal.GeodeGlossary;
* the same across servers
*/
@Category({ClientSubscriptionTest.class})
-public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase {
+public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase {
private static final String REGION_NAME =
"UpdatePropagationDUnitTest_region";
private VM server1 = null;
private VM server2 = null;
- private VM server3 = null;
private VM client1 = null;
private VM client2 = null;
private int PORT1;
private int PORT2;
- private int PORT3;
-
- private final int minNumEntries = 2;
-
- private String hostnameServer1;
- private String hostnameServer3;
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
-
+ // Server1 VM
server1 = host.getVM(0);
+ // Server2 VM
server2 = host.getVM(1);
- server3 = host.getVM(2);
-
- client1 = host.getVM(3);
-
- client2 = host.getVM(4);
+ // Client 1 VM
+ client1 = host.getVM(2);
- PORT1 = server1.invoke(() -> createServerCache());
- PORT2 = server2.invoke(() -> createServerCache());
- PORT3 = server3.invoke(() -> createServerCache());
+ // client 2 VM
+ client2 = host.getVM(3);
- hostnameServer1 = NetworkUtils.getServerHostName(server1.getHost());
- hostnameServer3 = NetworkUtils.getServerHostName(server3.getHost());
-
- IgnoredException.addIgnoredException("java.net.SocketException");
- IgnoredException.addIgnoredException("Unexpected IOException");
- }
+ PORT1 = server1.invoke(this::createServerCache);
+ PORT2 = server2.invoke(this::createServerCache);
-
-
- @Test
- public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception
{
client1.invoke(
- () -> createClientCache(hostnameServer1, PORT1));
+ () ->
createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1,
PORT2));
client2.invoke(
- () -> createClientCache(hostnameServer3, PORT3));
- int entries = 20;
- AsyncInvocation invocation = client1.invokeAsync(() -> doPuts(entries));
-
- // Wait for some entries to be put
- server1.invoke(this::verifyMinEntriesInserted);
+ () ->
createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1,
PORT2));
- // Simulate crash
- server2.invoke(() -> {
- MembershipManagerHelper.crashDistributedSystem(getSystemStatic());
- });
-
- invocation.await();
-
- int notNullEntriesIn1 = client1.invoke(() ->
getNotNullEntriesNumber(entries));
- int notNullEntriesIn3 = client2.invoke(() ->
getNotNullEntriesNumber(entries));
- assertThat(notNullEntriesIn3).isEqualTo(notNullEntriesIn1);
+ IgnoredException.addIgnoredException("java.net.SocketException");
+ IgnoredException.addIgnoredException("Unexpected IOException");
}
/**
@@ -149,11 +115,6 @@ public class UpdatePropagationDistributedTest extends
JUnit4CacheTestCase {
*/
@Test
public void updatesAreProgegatedAfterFailover() {
- client1.invoke(
- () -> createClientCache(hostnameServer1, PORT1, PORT2));
- client2.invoke(
- () -> createClientCache(hostnameServer1, PORT1, PORT2));
-
// First create entries on both servers via the two client
client1.invoke(this::createEntriesK1andK2);
client2.invoke(this::createEntriesK1andK2);
@@ -287,18 +248,6 @@ public class UpdatePropagationDistributedTest extends
JUnit4CacheTestCase {
.addCacheListener(new
EventTrackingCacheListener()).create(REGION_NAME);
}
- private void createClientCache(String host, Integer port1) {
- Properties props = new Properties();
- props.setProperty(LOCATORS, "");
- ClientCacheFactory cf = new ClientCacheFactory();
- cf.addPoolServer(host, port1).setPoolSubscriptionEnabled(false)
-
.setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000)
- .setPoolReadTimeout(100).setPoolPingInterval(300);
- ClientCache cache = getClientCache(cf);
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY)
- .create(REGION_NAME);
- }
-
private Integer createServerCache() throws Exception {
Cache cache = getCache();
RegionAttributes attrs = createCacheServerAttributes();
@@ -309,7 +258,7 @@ public class UpdatePropagationDistributedTest extends
JUnit4CacheTestCase {
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
- return new Integer(server.getPort());
+ return server.getPort();
}
protected RegionAttributes createCacheServerAttributes() {
@@ -356,36 +305,6 @@ public class UpdatePropagationDistributedTest extends
JUnit4CacheTestCase {
});
}
- private void verifyMinEntriesInserted() {
- await().untilAsserted(() -> assertThat(getCache().getRegion(SEPARATOR +
REGION_NAME))
- .hasSizeGreaterThan(minNumEntries));
- }
-
- private void doPuts(int entries) throws Exception {
- Region<String, String> r1 = getCache().getRegion(REGION_NAME);
- assertThat(r1).isNotNull();
- for (int i = 0; i < entries; i++) {
- try {
- r1.put("" + i, "" + i);
- } catch (Exception e) {
- }
- Thread.sleep(1000);
- }
- }
-
- private int getNotNullEntriesNumber(int entries) {
- int notNullEntries = 0;
- Region<String, String> r1 = getCache().getRegion(SEPARATOR + REGION_NAME);
- assertThat(r1).isNotNull();
- for (int i = 0; i < entries; i++) {
- Object value = r1.get("" + i, "" + i);
- if (value != null) {
- notNullEntries++;
- }
- }
- return notNullEntries;
- }
-
private static class EventTrackingCacheListener extends CacheListenerAdapter
{
List<EntryEvent> receivedEvents = new ArrayList<>();
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
similarity index 93%
rename from
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
rename to
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
index 77d903ee0e..47721ceb2c 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java
@@ -21,7 +21,7 @@ import org.apache.geode.cache.RegionAttributes;
/**
* subclass of UpdatePropagationDUnitTest to exercise partitioned regions
*/
-public class UpdatePropagationPRDistributedTest extends
UpdatePropagationDistributedTest {
+public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest {
@Override
protected RegionAttributes createCacheServerAttributes() {
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
index 5aeba3fac2..cdb5432399 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java
@@ -110,7 +110,7 @@ public class CloseConnectionTest implements Serializable {
InternalDistributedSystem distributedSystem =
getCache().getInternalDistributedSystem();
InternalDistributedMember otherMember =
distributedSystem.getDistributionManager()
.getOtherNormalDistributionManagerIds().iterator().next();
- Connection connection = conTable.getConduit().getConnection(otherMember,
true,
+ Connection connection = conTable.getConduit().getConnection(otherMember,
true, false,
System.currentTimeMillis(), 15000, 0);
await().untilAsserted(() -> {
// grab the shared, ordered "sender" connection to vm0. It should have
a residual
diff --git
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
index 794d6e093d..41d64c67f6 100644
---
a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
+++
b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java
@@ -110,7 +110,7 @@ public class TCPConduitDUnitTest extends
DistributedTestCase {
assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue();
Connection sharedUnordered = connectionTable.get(otherMember, false,
- System.currentTimeMillis(), 15000, 0, false);
+ System.currentTimeMillis(), 15000, 0);
sharedUnordered.requestClose("for testing");
// the sender connection has been closed so we should only have 2 senders
now
assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2);
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index eaac79f2b8..a8a7bb8c20 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -281,17 +281,11 @@ public class DirectChannel {
directReply = false;
}
if (ce != null) {
-
- if (!retry) {
- retryInfo = ce;
+ if (failedCe != null) {
+ failedCe.getMembers().addAll(ce.getMembers());
+ failedCe.getCauses().addAll(ce.getCauses());
} else {
-
- if (failedCe != null) {
- failedCe.getMembers().addAll(ce.getMembers());
- failedCe.getCauses().addAll(ce.getCauses());
- } else {
- failedCe = ce;
- }
+ failedCe = ce;
}
ce = null;
}
@@ -299,9 +293,6 @@ public class DirectChannel {
if (failedCe != null) {
throw failedCe;
}
- if (retryInfo != null) {
- continue;
- }
return bytesWritten;
}
@@ -347,12 +338,7 @@ public class DirectChannel {
}
if (ce != null) {
- if (retryInfo != null) {
- retryInfo.getMembers().addAll(ce.getMembers());
- retryInfo.getCauses().addAll(ce.getCauses());
- } else {
- retryInfo = ce;
- }
+ retryInfo = ce;
ce = null;
}
@@ -437,13 +423,13 @@ public class DirectChannel {
* @param retry whether this is a retransmission
* @param ackTimeout the ack warning timeout
* @param ackSDTimeout the ack severe alert timeout
- * @param connectionsList a list to hold the connections
+ * @param cons a list to hold the connections
* @return null if everything went okay, or a ConnectExceptions object if
some connections
* couldn't be obtained
*/
private ConnectExceptions getConnections(Membership mgr, DistributionMessage
msg,
InternalDistributedMember[] destinations, boolean preserveOrder, boolean
retry,
- long ackTimeout, long ackSDTimeout, List<Connection> connectionsList) {
+ long ackTimeout, long ackSDTimeout, List cons) {
ConnectExceptions ce = null;
for (InternalDistributedMember destination : destinations) {
if (destination == null) {
@@ -472,18 +458,12 @@ public class DirectChannel {
if (ackTimeout > 0) {
startTime = System.currentTimeMillis();
}
- final Connection connection;
- if (!retry) {
- connection = conduit.getFirstScanForConnection(destination,
preserveOrder, startTime,
- ackTimeout, ackSDTimeout);
- } else {
- connection = conduit.getConnection(destination, preserveOrder,
startTime,
- ackTimeout, ackSDTimeout);
- }
+ Connection con = conduit.getConnection(destination, preserveOrder,
retry, startTime,
+ ackTimeout, ackSDTimeout);
- connection.setInUse(true, startTime, 0, 0, null); // fix for
bug#37657
- connectionsList.add(connection);
- if (connection.isSharedResource() && msg instanceof
DirectReplyMessage) {
+ con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657
+ cons.add(con);
+ if (con.isSharedResource() && msg instanceof DirectReplyMessage) {
DirectReplyMessage directMessage = (DirectReplyMessage) msg;
directMessage.registerProcessor();
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 9e921d7d03..44205d4d63 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -961,7 +961,7 @@ public class Connection implements Runnable {
final ConnectionTable t,
final boolean preserveOrder, final InternalDistributedMember remoteAddr,
final boolean sharedResource,
- final long startTime, final long ackTimeout, final long ackSATimeout,
boolean doNotRetry)
+ final long startTime, final long ackTimeout, final long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
boolean success = false;
Connection conn = null;
@@ -1021,9 +1021,7 @@ public class Connection implements Runnable {
// do not change the text of this exception - it is looked for in
exception handlers
throw new IOException("Cannot form connection to alert listener "
+ remoteAddr);
}
- if (doNotRetry) {
- throw new IOException("Connection not created in first try to " +
remoteAddr);
- }
+
// Wait briefly...
interrupted = Thread.interrupted() || interrupted;
try {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index f1d157d27f..f54f7bd9cd 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -269,7 +269,6 @@ public class ConnectionTable {
* @param startTime the ms clock start time for the operation
* @param ackThreshold the ms ack-wait-threshold, or zero
* @param ackSAThreshold the ms ack-severe_alert-threshold, or zero
- * @param doNotRetry whether we should perform reattempt to create connection
* @return the Connection, or null if someone else already created or closed
it
* @throws IOException if unable to connect
*/
@@ -277,14 +276,13 @@ public class ConnectionTable {
boolean sharedResource,
boolean preserveOrder, Map<DistributedMember, Object> m,
PendingConnection pc, long startTime,
long ackThreshold,
- long ackSAThreshold, boolean doNotRetry)
- throws IOException, DistributedSystemDisconnectedException {
+ long ackSAThreshold) throws IOException,
DistributedSystemDisconnectedException {
// handle new pending connection
Connection con = null;
try {
long senderCreateStartTime = owner.getStats().startSenderCreate();
con = Connection.createSender(owner.getMembership(), this,
preserveOrder, id,
- sharedResource, startTime, ackThreshold, ackSAThreshold, doNotRetry);
+ sharedResource, startTime, ackThreshold, ackSAThreshold);
owner.getStats().incSenders(sharedResource, preserveOrder,
senderCreateStartTime);
} finally {
// our connection failed to notify anyone waiting for our pending con
@@ -352,14 +350,11 @@ public class ConnectionTable {
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
- * @param doNotRetryWaitForConnection whether we should perform reattempt
(or wait) to create
- * connection
* @return the new Connection, or null if an error
* @throws IOException if unable to create the connection
*/
private Connection getSharedConnection(InternalDistributedMember id, boolean
scheduleTimeout,
- boolean preserveOrder, long startTime, long ackTimeout, long
ackSATimeout,
- boolean doNotRetryWaitForConnection)
+ boolean preserveOrder, long startTime, long ackTimeout, long
ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
final Map<DistributedMember, Object> m =
@@ -392,7 +387,7 @@ public class ConnectionTable {
logger.debug("created PendingConnection {}", pc);
}
result = handleNewPendingConnection(id, true, preserveOrder, m, pc,
- startTime, ackTimeout, ackSATimeout, doNotRetryWaitForConnection);
+ startTime, ackTimeout, ackSATimeout);
if (!preserveOrder && scheduleTimeout) {
scheduleIdleTimeout(result);
}
@@ -405,10 +400,6 @@ public class ConnectionTable {
throw new IOException("Cannot form connection to alert listener " +
id);
}
- if (doNotRetryWaitForConnection) {
- return null;
- }
-
result = ((PendingConnection)
mEntry).waitForConnect(owner.getMembership(),
startTime, ackTimeout, ackSATimeout);
if (logger.isDebugEnabled()) {
@@ -434,13 +425,11 @@ public class ConnectionTable {
* @param startTime the ms clock start time for the operation
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
- * @param doNotRetry whether we should perform reattempt to create connection
* @return the connection, or null if an error
* @throws IOException if the connection could not be created
*/
Connection getThreadOwnedConnection(InternalDistributedMember id, long
startTime, long ackTimeout,
- long ackSATimeout, boolean doNotRetry)
- throws IOException, DistributedSystemDisconnectedException {
+ long ackSATimeout) throws IOException,
DistributedSystemDisconnectedException {
Connection result;
// Look for result in the thread local
@@ -460,7 +449,7 @@ public class ConnectionTable {
// OK, we have to create a new connection.
long senderCreateStartTime = owner.getStats().startSenderCreate();
result = Connection.createSender(owner.getMembership(), this, true, id,
false, startTime,
- ackTimeout, ackSATimeout, doNotRetry);
+ ackTimeout, ackSATimeout);
owner.getStats().incSenders(false, true, senderCreateStartTime);
if (logger.isDebugEnabled()) {
logger.debug("ConnectionTable: created an ordered connection: {}",
result);
@@ -532,12 +521,11 @@ public class ConnectionTable {
* @param startTime the ms clock start time
* @param ackTimeout the ms ack-wait-threshold, or zero
* @param ackSATimeout the ms ack-severe-alert-threshold, or zero
- * @param doNotRetry whether we should perform reattempt to create connection
* @return the new Connection, or null if a problem
* @throws IOException if the connection could not be created
*/
protected Connection get(InternalDistributedMember id, boolean
preserveOrder, long startTime,
- long ackTimeout, long ackSATimeout, boolean doNotRetry)
+ long ackTimeout, long ackSATimeout)
throws IOException, DistributedSystemDisconnectedException {
if (closed) {
owner.getCancelCriterion().checkCancelInProgress(null);
@@ -547,9 +535,9 @@ public class ConnectionTable {
boolean threadOwnsResources = threadOwnsResources();
if (!preserveOrder || !threadOwnsResources) {
result = getSharedConnection(id, threadOwnsResources, preserveOrder,
startTime, ackTimeout,
- ackSATimeout, doNotRetry);
+ ackSATimeout);
} else {
- result = getThreadOwnedConnection(id, startTime, ackTimeout,
ackSATimeout, doNotRetry);
+ result = getThreadOwnedConnection(id, startTime, ackTimeout,
ackSATimeout);
}
if (result != null) {
Assert.assertTrue(result.getPreserveOrder() == preserveOrder);
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 1b75be5465..4d6d9c8216 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -719,6 +719,7 @@ public class TCPConduit implements Runnable {
*
* @param memberAddress the IDS associated with the remoteId
* @param preserveOrder whether this is an ordered or unordered connection
+ * @param retry false if this is the first attempt
* @param startTime the time this operation started
* @param ackTimeout the ack-wait-threshold * 1000 for the operation to be
transmitted (or zero)
* @param ackSATimeout the ack-severe-alert-threshold * 1000 for the
operation to be transmitted
@@ -727,7 +728,7 @@ public class TCPConduit implements Runnable {
* @return the connection
*/
public Connection getConnection(InternalDistributedMember memberAddress,
- final boolean preserveOrder, long startTime, long ackTimeout,
+ final boolean preserveOrder, boolean retry, long startTime, long
ackTimeout,
long ackSATimeout) throws IOException,
DistributedSystemDisconnectedException {
if (stopped) {
throw new DistributedSystemDisconnectedException("The conduit is
stopped");
@@ -741,7 +742,7 @@ public class TCPConduit implements Runnable {
try {
// If this is the second time through this loop, we had problems.
// Tear down the connection so that it gets rebuilt.
- if (conn != null) { // not first time in loop
+ if (retry || conn != null) { // not first time in loop
if (!membership.memberExists(memberAddress)
|| membership.isShunned(memberAddress)
|| membership.shutdownInProgress()) {
@@ -776,15 +777,18 @@ public class TCPConduit implements Runnable {
// Close the connection (it will get rebuilt later).
getStats().incReconnectAttempts();
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Closing old connection. conn={} before retrying.
memberInTrouble={}",
- conn, memberInTrouble);
+ if (conn != null) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing old connection. conn={} before
retrying. memberInTrouble={}",
+ conn, memberInTrouble);
+ }
+ conn.closeForReconnect("closing before retrying");
+ } catch (CancelException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ // ignored
}
- conn.closeForReconnect("closing before retrying");
- } catch (CancelException ex) {
- throw ex;
- } catch (Exception ignored) {
}
} // not first time in loop
@@ -797,7 +801,7 @@ public class TCPConduit implements Runnable {
do {
retryForOldConnection = false;
conn = getConTable().get(memberAddress, preserveOrder, startTime,
ackTimeout,
- ackSATimeout, false);
+ ackSATimeout);
if (conn == null) {
// conduit may be closed - otherwise an ioexception would be
thrown
problem = new IOException(
@@ -905,97 +909,6 @@ public class TCPConduit implements Runnable {
}
}
- /**
- * Return a connection to the given member. This method performs quick scan
for connection.
- * Only one attempt to create a connection to the given member .
- *
- * @param memberAddress the IDS associated with the remoteId
- * @param preserveOrder whether this is an ordered or unordered connection
- * @param startTime the time this operation started
- * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be
transmitted (or zero)
- * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the
operation to be transmitted
- * (or zero)
- *
- * @return the connection
- */
- public Connection getFirstScanForConnection(InternalDistributedMember
memberAddress,
- final boolean preserveOrder, long startTime, long ackTimeout,
- long ackSATimeout) throws IOException,
DistributedSystemDisconnectedException {
- if (stopped) {
- throw new DistributedSystemDisconnectedException("The conduit is
stopped");
- }
-
- Connection connection = null;
- stopper.checkCancelInProgress(null);
- boolean interrupted = Thread.interrupted();
- try {
-
- Exception problem = null;
- try {
- connection = getConnectionThatIsNotClosed(memberAddress,
preserveOrder, startTime,
- ackTimeout, ackSATimeout);
-
- // we have a connection; fall through and return it
- } catch (ConnectionException e) {
- // Race condition between acquiring the connection and attempting
- // to use it: another thread closed it.
- problem = e;
- // No need to retry since Connection.createSender has already
- // done retries and now member is really unreachable for some reason
- // even though it may be in the view
- } catch (IOException e) {
- problem = e;
- // don't keep trying to connect to an alert listener
- if (AlertingAction.isThreadAlerting()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Giving up connecting to alert listener {}",
memberAddress);
- }
- }
- }
-
- if (problem != null) {
- if (problem instanceof IOException) {
- if (problem.getMessage().startsWith("Cannot form connection to alert
listener")) {
- throw new AlertingIOException((IOException) problem);
- }
- throw (IOException) problem;
- }
- throw new IOException(
- String.format("Problem connecting to %s", memberAddress), problem);
- }
- // Success!
-
- return connection;
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- private Connection getConnectionThatIsNotClosed(InternalDistributedMember
memberAddress,
- final boolean preserveOrder, long startTime, long ackTimeout, long
ackSATimeout)
- throws IOException, ConnectionException {
- boolean debugEnabled = logger.isDebugEnabled();
- Connection connection;
- while (true) {
- connection = getConTable().get(memberAddress, preserveOrder, startTime,
ackTimeout,
- ackSATimeout, true);
- if (connection == null) {
- throw new IOException("Unable to reconnect to server; possible
shutdown: " + memberAddress);
- }
-
- if (!connection.isClosing() &&
connection.getRemoteAddress().equals(memberAddress)) {
- return connection;
- }
- if (debugEnabled) {
- logger.debug("Got an old connection for {}: {}@{}", memberAddress,
connection,
- connection.hashCode());
- }
- connection.closeOldConnection("closing old connection");
- }
- }
-
@Override
public String toString() {
return String.valueOf(id);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
index 906a021dec..5a041eb167 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java
@@ -168,7 +168,7 @@ public class ConnectionTransmissionTest {
senderAddr.setDirectChannelPort(conduit.getPort());
return spy(Connection.createSender(membership, writerTable, true,
remoteAddr, true,
- System.currentTimeMillis(), 1000, 1000, false));
+ System.currentTimeMillis(), 1000, 1000));
}
private Connection createReceiverConnectionOnFirstAccept(final
ServerSocketChannel acceptorSocket,
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
index 3f380fa5f1..392a5993c0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
@@ -94,8 +94,7 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable)
- .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
+ .when(connectionTable).get(eq(member), anyBoolean(), anyLong(),
anyLong(), anyLong());
when(membership.memberExists(eq(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
@@ -103,7 +102,7 @@ public class TCPConduitTest {
AlertingAction.execute(() -> {
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -124,14 +123,13 @@ public class TCPConduitTest {
doThrow(new IOException("Cannot form connection to alert listener"))
// getConnection will loop indefinitely until connectionTable returns
connection
.doReturn(connection)
- .when(connectionTable)
- .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
+ .when(connectionTable).get(eq(member), anyBoolean(), anyLong(),
anyLong(), anyLong());
when(membership.memberExists(eq(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(false);
- Connection value = tcpConduit.getConnection(member, false, 0L, 0L, 0L);
+ Connection value = tcpConduit.getConnection(member, false, false, 0L, 0L,
0L);
assertThat(value)
.isSameAs(connection);
@@ -145,13 +143,12 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable)
- .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
+ .when(connectionTable).get(eq(member), anyBoolean(), anyLong(),
anyLong(), anyLong());
when(membership.memberExists(eq(member)))
.thenReturn(false);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -167,15 +164,14 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable)
- .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
+ .when(connectionTable).get(same(member), anyBoolean(), anyLong(),
anyLong(), anyLong());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -192,8 +188,7 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable)
- .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
+ .when(connectionTable).get(same(member), anyBoolean(), anyLong(),
anyLong(), anyLong());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
@@ -202,7 +197,7 @@ public class TCPConduitTest {
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -219,8 +214,7 @@ public class TCPConduitTest {
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable)
- .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
+ .when(connectionTable).get(same(member), anyBoolean(), anyLong(),
anyLong(), anyLong());
when(membership.memberExists(same(member)))
.thenReturn(true);
when(membership.isShunned(same(member)))
@@ -229,7 +223,7 @@ public class TCPConduitTest {
.thenReturn(true);
Throwable thrown = catchThrowable(() -> {
- tcpConduit.getConnection(member, false, 0L, 0L, 0L);
+ tcpConduit.getConnection(member, false, false, 0L, 0L, 0L);
});
assertThat(thrown)
@@ -237,50 +231,6 @@ public class TCPConduitTest {
.hasMessage("Abandoned because shutdown is in progress");
}
- @Test
- public void
getFirstScanForConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting()
- throws Exception {
- TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
- new Properties(),
- TCPConduit -> connectionTable, socketCreator, doNothing(), false);
- InternalDistributedMember member = mock(InternalDistributedMember.class);
- doThrow(new IOException("Cannot form connection to alert listener"))
- .when(connectionTable)
- .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
-
- AlertingAction.execute(() -> {
- Throwable thrown = catchThrowable(() -> {
- tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
- });
-
- assertThat(thrown)
- .isInstanceOf(AlertingIOException.class);
- });
- }
-
- @Test
- public void
getFirstScanForConnectionRethrows_ifCaughtIOException_whileNotAlerting()
- throws Exception {
- TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
- new Properties(),
- TCPConduit -> connectionTable, socketCreator, doNothing(), false);
- InternalDistributedMember member = mock(InternalDistributedMember.class);
- Connection connection = mock(Connection.class);
- doThrow(new IOException("Connection not created in first try"))
- .doReturn(connection)
- .when(connectionTable)
- .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(),
anyBoolean());
-
- Throwable thrown = catchThrowable(() -> {
- tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L);
- });
-
- assertThat(thrown)
- .isInstanceOf(IOException.class);
- }
-
private Runnable doNothing() {
return () -> {
// nothing