This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch revert-7381-newfeature1/GEODE-9484 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 758ef27045019cbe5654aed42b52898cc41ceaa8 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Wed May 4 21:02:16 2022 +0200 Revert "GEODE-9484: Improve sending message to multy destinations (#7381)" This reverts commit 62cd12c7f0bbb3d092011555e714e57ce041791a. --- ...edTest.java => UpdatePropagationDUnitTest.java} | 109 +++---------------- ...Test.java => UpdatePropagationPRDUnitTest.java} | 2 +- .../geode/internal/tcp/CloseConnectionTest.java | 2 +- .../geode/internal/tcp/TCPConduitDUnitTest.java | 2 +- .../distributed/internal/direct/DirectChannel.java | 44 +++----- .../org/apache/geode/internal/tcp/Connection.java | 6 +- .../apache/geode/internal/tcp/ConnectionTable.java | 30 ++---- .../org/apache/geode/internal/tcp/TCPConduit.java | 117 +++------------------ .../internal/tcp/ConnectionTransmissionTest.java | 2 +- .../apache/geode/internal/tcp/TCPConduitTest.java | 74 +++---------- 10 files changed, 68 insertions(+), 320 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java similarity index 78% rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java index 58de5b4762..0b99a144e5 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationDUnitTest.java @@ -20,7 +20,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; -import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -51,8 +50,6 @@ import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.cache30.CacheSerializableRunnable; import org.apache.geode.distributed.internal.ServerLocationAndMemberId; -import org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper; -import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.NetworkUtils; @@ -71,76 +68,45 @@ import org.apache.geode.util.internal.GeodeGlossary; * the same across servers */ @Category({ClientSubscriptionTest.class}) -public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase { +public class UpdatePropagationDUnitTest extends JUnit4CacheTestCase { private static final String REGION_NAME = "UpdatePropagationDUnitTest_region"; private VM server1 = null; private VM server2 = null; - private VM server3 = null; private VM client1 = null; private VM client2 = null; private int PORT1; private int PORT2; - private int PORT3; - - private final int minNumEntries = 2; - - private String hostnameServer1; - private String hostnameServer3; @Override public final void postSetUp() throws Exception { disconnectAllFromDS(); final Host host = Host.getHost(0); - + // Server1 VM server1 = host.getVM(0); + // Server2 VM server2 = host.getVM(1); - server3 = host.getVM(2); - - client1 = host.getVM(3); - - client2 = host.getVM(4); + // Client 1 VM + client1 = host.getVM(2); - PORT1 = server1.invoke(() -> createServerCache()); - PORT2 = server2.invoke(() -> createServerCache()); - PORT3 = server3.invoke(() -> createServerCache()); + // client 2 VM + client2 = host.getVM(3); - hostnameServer1 = NetworkUtils.getServerHostName(server1.getHost()); - hostnameServer3 = NetworkUtils.getServerHostName(server3.getHost()); - - IgnoredException.addIgnoredException("java.net.SocketException"); - IgnoredException.addIgnoredException("Unexpected IOException"); - } + PORT1 = server1.invoke(this::createServerCache); + PORT2 = server2.invoke(this::createServerCache); - - - @Test - public void updatesArePropagatedToAllMembersWhenOneKilled() throws Exception { client1.invoke( - () -> createClientCache(hostnameServer1, PORT1)); + () -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2)); client2.invoke( - () -> createClientCache(hostnameServer3, PORT3)); - int entries = 20; - AsyncInvocation invocation = client1.invokeAsync(() -> doPuts(entries)); - - // Wait for some entries to be put - server1.invoke(this::verifyMinEntriesInserted); + () -> createClientCache(NetworkUtils.getServerHostName(server1.getHost()), PORT1, PORT2)); - // Simulate crash - server2.invoke(() -> { - MembershipManagerHelper.crashDistributedSystem(getSystemStatic()); - }); - - invocation.await(); - - int notNullEntriesIn1 = client1.invoke(() -> getNotNullEntriesNumber(entries)); - int notNullEntriesIn3 = client2.invoke(() -> getNotNullEntriesNumber(entries)); - assertThat(notNullEntriesIn3).isEqualTo(notNullEntriesIn1); + IgnoredException.addIgnoredException("java.net.SocketException"); + IgnoredException.addIgnoredException("Unexpected IOException"); } /** @@ -149,11 +115,6 @@ public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase { */ @Test public void updatesAreProgegatedAfterFailover() { - client1.invoke( - () -> createClientCache(hostnameServer1, PORT1, PORT2)); - client2.invoke( - () -> createClientCache(hostnameServer1, PORT1, PORT2)); - // First create entries on both servers via the two client client1.invoke(this::createEntriesK1andK2); client2.invoke(this::createEntriesK1andK2); @@ -287,18 +248,6 @@ public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase { .addCacheListener(new EventTrackingCacheListener()).create(REGION_NAME); } - private void createClientCache(String host, Integer port1) { - Properties props = new Properties(); - props.setProperty(LOCATORS, ""); - ClientCacheFactory cf = new ClientCacheFactory(); - cf.addPoolServer(host, port1).setPoolSubscriptionEnabled(false) - .setPoolSubscriptionRedundancy(-1).setPoolMinConnections(4).setPoolSocketBufferSize(1000) - .setPoolReadTimeout(100).setPoolPingInterval(300); - ClientCache cache = getClientCache(cf); - cache.createClientRegionFactory(ClientRegionShortcut.PROXY) - .create(REGION_NAME); - } - private Integer createServerCache() throws Exception { Cache cache = getCache(); RegionAttributes attrs = createCacheServerAttributes(); @@ -309,7 +258,7 @@ public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase { server.setPort(port); server.setNotifyBySubscription(true); server.start(); - return new Integer(server.getPort()); + return server.getPort(); } protected RegionAttributes createCacheServerAttributes() { @@ -356,36 +305,6 @@ public class UpdatePropagationDistributedTest extends JUnit4CacheTestCase { }); } - private void verifyMinEntriesInserted() { - await().untilAsserted(() -> assertThat(getCache().getRegion(SEPARATOR + REGION_NAME)) - .hasSizeGreaterThan(minNumEntries)); - } - - private void doPuts(int entries) throws Exception { - Region<String, String> r1 = getCache().getRegion(REGION_NAME); - assertThat(r1).isNotNull(); - for (int i = 0; i < entries; i++) { - try { - r1.put("" + i, "" + i); - } catch (Exception e) { - } - Thread.sleep(1000); - } - } - - private int getNotNullEntriesNumber(int entries) { - int notNullEntries = 0; - Region<String, String> r1 = getCache().getRegion(SEPARATOR + REGION_NAME); - assertThat(r1).isNotNull(); - for (int i = 0; i < entries; i++) { - Object value = r1.get("" + i, "" + i); - if (value != null) { - notNullEntries++; - } - } - return notNullEntries; - } - private static class EventTrackingCacheListener extends CacheListenerAdapter { List<EntryEvent> receivedEvents = new ArrayList<>(); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java similarity index 93% rename from geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java rename to geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java index 77d903ee0e..47721ceb2c 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/tier/sockets/UpdatePropagationPRDUnitTest.java @@ -21,7 +21,7 @@ import org.apache.geode.cache.RegionAttributes; /** * subclass of UpdatePropagationDUnitTest to exercise partitioned regions */ -public class UpdatePropagationPRDistributedTest extends UpdatePropagationDistributedTest { +public class UpdatePropagationPRDUnitTest extends UpdatePropagationDUnitTest { @Override protected RegionAttributes createCacheServerAttributes() { diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java index 5aeba3fac2..cdb5432399 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/CloseConnectionTest.java @@ -110,7 +110,7 @@ public class CloseConnectionTest implements Serializable { InternalDistributedSystem distributedSystem = getCache().getInternalDistributedSystem(); InternalDistributedMember otherMember = distributedSystem.getDistributionManager() .getOtherNormalDistributionManagerIds().iterator().next(); - Connection connection = conTable.getConduit().getConnection(otherMember, true, + Connection connection = conTable.getConduit().getConnection(otherMember, true, false, System.currentTimeMillis(), 15000, 0); await().untilAsserted(() -> { // grab the shared, ordered "sender" connection to vm0. It should have a residual diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java index 794d6e093d..41d64c67f6 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitDUnitTest.java @@ -110,7 +110,7 @@ public class TCPConduitDUnitTest extends DistributedTestCase { assertThat(connectionTable.hasReceiversFor(otherMember)).isTrue(); Connection sharedUnordered = connectionTable.get(otherMember, false, - System.currentTimeMillis(), 15000, 0, false); + System.currentTimeMillis(), 15000, 0); sharedUnordered.requestClose("for testing"); // the sender connection has been closed so we should only have 2 senders now assertThat(ConnectionTable.getNumSenderSharedConnections()).isEqualTo(2); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java index eaac79f2b8..a8a7bb8c20 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java @@ -281,17 +281,11 @@ public class DirectChannel { directReply = false; } if (ce != null) { - - if (!retry) { - retryInfo = ce; + if (failedCe != null) { + failedCe.getMembers().addAll(ce.getMembers()); + failedCe.getCauses().addAll(ce.getCauses()); } else { - - if (failedCe != null) { - failedCe.getMembers().addAll(ce.getMembers()); - failedCe.getCauses().addAll(ce.getCauses()); - } else { - failedCe = ce; - } + failedCe = ce; } ce = null; } @@ -299,9 +293,6 @@ public class DirectChannel { if (failedCe != null) { throw failedCe; } - if (retryInfo != null) { - continue; - } return bytesWritten; } @@ -347,12 +338,7 @@ public class DirectChannel { } if (ce != null) { - if (retryInfo != null) { - retryInfo.getMembers().addAll(ce.getMembers()); - retryInfo.getCauses().addAll(ce.getCauses()); - } else { - retryInfo = ce; - } + retryInfo = ce; ce = null; } @@ -437,13 +423,13 @@ public class DirectChannel { * @param retry whether this is a retransmission * @param ackTimeout the ack warning timeout * @param ackSDTimeout the ack severe alert timeout - * @param connectionsList a list to hold the connections + * @param cons a list to hold the connections * @return null if everything went okay, or a ConnectExceptions object if some connections * couldn't be obtained */ private ConnectExceptions getConnections(Membership mgr, DistributionMessage msg, InternalDistributedMember[] destinations, boolean preserveOrder, boolean retry, - long ackTimeout, long ackSDTimeout, List<Connection> connectionsList) { + long ackTimeout, long ackSDTimeout, List cons) { ConnectExceptions ce = null; for (InternalDistributedMember destination : destinations) { if (destination == null) { @@ -472,18 +458,12 @@ public class DirectChannel { if (ackTimeout > 0) { startTime = System.currentTimeMillis(); } - final Connection connection; - if (!retry) { - connection = conduit.getFirstScanForConnection(destination, preserveOrder, startTime, - ackTimeout, ackSDTimeout); - } else { - connection = conduit.getConnection(destination, preserveOrder, startTime, - ackTimeout, ackSDTimeout); - } + Connection con = conduit.getConnection(destination, preserveOrder, retry, startTime, + ackTimeout, ackSDTimeout); - connection.setInUse(true, startTime, 0, 0, null); // fix for bug#37657 - connectionsList.add(connection); - if (connection.isSharedResource() && msg instanceof DirectReplyMessage) { + con.setInUse(true, startTime, 0, 0, null); // fix for bug#37657 + cons.add(con); + if (con.isSharedResource() && msg instanceof DirectReplyMessage) { DirectReplyMessage directMessage = (DirectReplyMessage) msg; directMessage.registerProcessor(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 9e921d7d03..44205d4d63 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -961,7 +961,7 @@ public class Connection implements Runnable { final ConnectionTable t, final boolean preserveOrder, final InternalDistributedMember remoteAddr, final boolean sharedResource, - final long startTime, final long ackTimeout, final long ackSATimeout, boolean doNotRetry) + final long startTime, final long ackTimeout, final long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { boolean success = false; Connection conn = null; @@ -1021,9 +1021,7 @@ public class Connection implements Runnable { // do not change the text of this exception - it is looked for in exception handlers throw new IOException("Cannot form connection to alert listener " + remoteAddr); } - if (doNotRetry) { - throw new IOException("Connection not created in first try to " + remoteAddr); - } + // Wait briefly... interrupted = Thread.interrupted() || interrupted; try { diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java index f1d157d27f..f54f7bd9cd 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java @@ -269,7 +269,6 @@ public class ConnectionTable { * @param startTime the ms clock start time for the operation * @param ackThreshold the ms ack-wait-threshold, or zero * @param ackSAThreshold the ms ack-severe_alert-threshold, or zero - * @param doNotRetry whether we should perform reattempt to create connection * @return the Connection, or null if someone else already created or closed it * @throws IOException if unable to connect */ @@ -277,14 +276,13 @@ public class ConnectionTable { boolean sharedResource, boolean preserveOrder, Map<DistributedMember, Object> m, PendingConnection pc, long startTime, long ackThreshold, - long ackSAThreshold, boolean doNotRetry) - throws IOException, DistributedSystemDisconnectedException { + long ackSAThreshold) throws IOException, DistributedSystemDisconnectedException { // handle new pending connection Connection con = null; try { long senderCreateStartTime = owner.getStats().startSenderCreate(); con = Connection.createSender(owner.getMembership(), this, preserveOrder, id, - sharedResource, startTime, ackThreshold, ackSAThreshold, doNotRetry); + sharedResource, startTime, ackThreshold, ackSAThreshold); owner.getStats().incSenders(sharedResource, preserveOrder, senderCreateStartTime); } finally { // our connection failed to notify anyone waiting for our pending con @@ -352,14 +350,11 @@ public class ConnectionTable { * @param startTime the ms clock start time for the operation * @param ackTimeout the ms ack-wait-threshold, or zero * @param ackSATimeout the ms ack-severe-alert-threshold, or zero - * @param doNotRetryWaitForConnection whether we should perform reattempt (or wait) to create - * connection * @return the new Connection, or null if an error * @throws IOException if unable to create the connection */ private Connection getSharedConnection(InternalDistributedMember id, boolean scheduleTimeout, - boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout, - boolean doNotRetryWaitForConnection) + boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { final Map<DistributedMember, Object> m = @@ -392,7 +387,7 @@ public class ConnectionTable { logger.debug("created PendingConnection {}", pc); } result = handleNewPendingConnection(id, true, preserveOrder, m, pc, - startTime, ackTimeout, ackSATimeout, doNotRetryWaitForConnection); + startTime, ackTimeout, ackSATimeout); if (!preserveOrder && scheduleTimeout) { scheduleIdleTimeout(result); } @@ -405,10 +400,6 @@ public class ConnectionTable { throw new IOException("Cannot form connection to alert listener " + id); } - if (doNotRetryWaitForConnection) { - return null; - } - result = ((PendingConnection) mEntry).waitForConnect(owner.getMembership(), startTime, ackTimeout, ackSATimeout); if (logger.isDebugEnabled()) { @@ -434,13 +425,11 @@ public class ConnectionTable { * @param startTime the ms clock start time for the operation * @param ackTimeout the ms ack-wait-threshold, or zero * @param ackSATimeout the ms ack-severe-alert-threshold, or zero - * @param doNotRetry whether we should perform reattempt to create connection * @return the connection, or null if an error * @throws IOException if the connection could not be created */ Connection getThreadOwnedConnection(InternalDistributedMember id, long startTime, long ackTimeout, - long ackSATimeout, boolean doNotRetry) - throws IOException, DistributedSystemDisconnectedException { + long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { Connection result; // Look for result in the thread local @@ -460,7 +449,7 @@ public class ConnectionTable { // OK, we have to create a new connection. long senderCreateStartTime = owner.getStats().startSenderCreate(); result = Connection.createSender(owner.getMembership(), this, true, id, false, startTime, - ackTimeout, ackSATimeout, doNotRetry); + ackTimeout, ackSATimeout); owner.getStats().incSenders(false, true, senderCreateStartTime); if (logger.isDebugEnabled()) { logger.debug("ConnectionTable: created an ordered connection: {}", result); @@ -532,12 +521,11 @@ public class ConnectionTable { * @param startTime the ms clock start time * @param ackTimeout the ms ack-wait-threshold, or zero * @param ackSATimeout the ms ack-severe-alert-threshold, or zero - * @param doNotRetry whether we should perform reattempt to create connection * @return the new Connection, or null if a problem * @throws IOException if the connection could not be created */ protected Connection get(InternalDistributedMember id, boolean preserveOrder, long startTime, - long ackTimeout, long ackSATimeout, boolean doNotRetry) + long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { if (closed) { owner.getCancelCriterion().checkCancelInProgress(null); @@ -547,9 +535,9 @@ public class ConnectionTable { boolean threadOwnsResources = threadOwnsResources(); if (!preserveOrder || !threadOwnsResources) { result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout, - ackSATimeout, doNotRetry); + ackSATimeout); } else { - result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout, doNotRetry); + result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout); } if (result != null) { Assert.assertTrue(result.getPreserveOrder() == preserveOrder); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java index 1b75be5465..4d6d9c8216 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java @@ -719,6 +719,7 @@ public class TCPConduit implements Runnable { * * @param memberAddress the IDS associated with the remoteId * @param preserveOrder whether this is an ordered or unordered connection + * @param retry false if this is the first attempt * @param startTime the time this operation started * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero) * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted @@ -727,7 +728,7 @@ public class TCPConduit implements Runnable { * @return the connection */ public Connection getConnection(InternalDistributedMember memberAddress, - final boolean preserveOrder, long startTime, long ackTimeout, + final boolean preserveOrder, boolean retry, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { if (stopped) { throw new DistributedSystemDisconnectedException("The conduit is stopped"); @@ -741,7 +742,7 @@ public class TCPConduit implements Runnable { try { // If this is the second time through this loop, we had problems. // Tear down the connection so that it gets rebuilt. - if (conn != null) { // not first time in loop + if (retry || conn != null) { // not first time in loop if (!membership.memberExists(memberAddress) || membership.isShunned(memberAddress) || membership.shutdownInProgress()) { @@ -776,15 +777,18 @@ public class TCPConduit implements Runnable { // Close the connection (it will get rebuilt later). getStats().incReconnectAttempts(); - try { - if (logger.isDebugEnabled()) { - logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}", - conn, memberInTrouble); + if (conn != null) { + try { + if (logger.isDebugEnabled()) { + logger.debug("Closing old connection. conn={} before retrying. memberInTrouble={}", + conn, memberInTrouble); + } + conn.closeForReconnect("closing before retrying"); + } catch (CancelException ex) { + throw ex; + } catch (Exception ex) { + // ignored } - conn.closeForReconnect("closing before retrying"); - } catch (CancelException ex) { - throw ex; - } catch (Exception ignored) { } } // not first time in loop @@ -797,7 +801,7 @@ public class TCPConduit implements Runnable { do { retryForOldConnection = false; conn = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, - ackSATimeout, false); + ackSATimeout); if (conn == null) { // conduit may be closed - otherwise an ioexception would be thrown problem = new IOException( @@ -905,97 +909,6 @@ public class TCPConduit implements Runnable { } } - /** - * Return a connection to the given member. This method performs quick scan for connection. - * Only one attempt to create a connection to the given member . - * - * @param memberAddress the IDS associated with the remoteId - * @param preserveOrder whether this is an ordered or unordered connection - * @param startTime the time this operation started - * @param ackTimeout the ack-wait-threshold * 1000 for the operation to be transmitted (or zero) - * @param ackSATimeout the ack-severe-alert-threshold * 1000 for the operation to be transmitted - * (or zero) - * - * @return the connection - */ - public Connection getFirstScanForConnection(InternalDistributedMember memberAddress, - final boolean preserveOrder, long startTime, long ackTimeout, - long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { - if (stopped) { - throw new DistributedSystemDisconnectedException("The conduit is stopped"); - } - - Connection connection = null; - stopper.checkCancelInProgress(null); - boolean interrupted = Thread.interrupted(); - try { - - Exception problem = null; - try { - connection = getConnectionThatIsNotClosed(memberAddress, preserveOrder, startTime, - ackTimeout, ackSATimeout); - - // we have a connection; fall through and return it - } catch (ConnectionException e) { - // Race condition between acquiring the connection and attempting - // to use it: another thread closed it. - problem = e; - // No need to retry since Connection.createSender has already - // done retries and now member is really unreachable for some reason - // even though it may be in the view - } catch (IOException e) { - problem = e; - // don't keep trying to connect to an alert listener - if (AlertingAction.isThreadAlerting()) { - if (logger.isDebugEnabled()) { - logger.debug("Giving up connecting to alert listener {}", memberAddress); - } - } - } - - if (problem != null) { - if (problem instanceof IOException) { - if (problem.getMessage().startsWith("Cannot form connection to alert listener")) { - throw new AlertingIOException((IOException) problem); - } - throw (IOException) problem; - } - throw new IOException( - String.format("Problem connecting to %s", memberAddress), problem); - } - // Success! - - return connection; - } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } - } - } - - private Connection getConnectionThatIsNotClosed(InternalDistributedMember memberAddress, - final boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) - throws IOException, ConnectionException { - boolean debugEnabled = logger.isDebugEnabled(); - Connection connection; - while (true) { - connection = getConTable().get(memberAddress, preserveOrder, startTime, ackTimeout, - ackSATimeout, true); - if (connection == null) { - throw new IOException("Unable to reconnect to server; possible shutdown: " + memberAddress); - } - - if (!connection.isClosing() && connection.getRemoteAddress().equals(memberAddress)) { - return connection; - } - if (debugEnabled) { - logger.debug("Got an old connection for {}: {}@{}", memberAddress, connection, - connection.hashCode()); - } - connection.closeOldConnection("closing old connection"); - } - } - @Override public String toString() { return String.valueOf(id); diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java index 906a021dec..5a041eb167 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTransmissionTest.java @@ -168,7 +168,7 @@ public class ConnectionTransmissionTest { senderAddr.setDirectChannelPort(conduit.getPort()); return spy(Connection.createSender(membership, writerTable, true, remoteAddr, true, - System.currentTimeMillis(), 1000, 1000, false)); + System.currentTimeMillis(), 1000, 1000)); } private Connection createReceiverConnectionOnFirstAccept(final ServerSocketChannel acceptorSocket, diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java index 3f380fa5f1..392a5993c0 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java @@ -94,8 +94,7 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable) - .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); when(membership.memberExists(eq(member))) .thenReturn(true); when(membership.isShunned(same(member))) @@ -103,7 +102,7 @@ public class TCPConduitTest { AlertingAction.execute(() -> { Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -124,14 +123,13 @@ public class TCPConduitTest { doThrow(new IOException("Cannot form connection to alert listener")) // getConnection will loop indefinitely until connectionTable returns connection .doReturn(connection) - .when(connectionTable) - .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); when(membership.memberExists(eq(member))) .thenReturn(true); when(membership.isShunned(same(member))) .thenReturn(false); - Connection value = tcpConduit.getConnection(member, false, 0L, 0L, 0L); + Connection value = tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); assertThat(value) .isSameAs(connection); @@ -145,13 +143,12 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable) - .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + .when(connectionTable).get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong()); when(membership.memberExists(eq(member))) .thenReturn(false); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -167,15 +164,14 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable) - .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); when(membership.memberExists(same(member))) .thenReturn(true); when(membership.isShunned(same(member))) .thenReturn(true); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -192,8 +188,7 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable) - .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); when(membership.memberExists(same(member))) .thenReturn(true); when(membership.isShunned(same(member))) @@ -202,7 +197,7 @@ public class TCPConduitTest { .thenReturn(true); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -219,8 +214,7 @@ public class TCPConduitTest { TCPConduit -> connectionTable, socketCreator, doNothing(), false); InternalDistributedMember member = mock(InternalDistributedMember.class); doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable) - .get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); + .when(connectionTable).get(same(member), anyBoolean(), anyLong(), anyLong(), anyLong()); when(membership.memberExists(same(member))) .thenReturn(true); when(membership.isShunned(same(member))) @@ -229,7 +223,7 @@ public class TCPConduitTest { .thenReturn(true); Throwable thrown = catchThrowable(() -> { - tcpConduit.getConnection(member, false, 0L, 0L, 0L); + tcpConduit.getConnection(member, false, false, 0L, 0L, 0L); }); assertThat(thrown) @@ -237,50 +231,6 @@ public class TCPConduitTest { .hasMessage("Abandoned because shutdown is in progress"); } - @Test - public void getFirstScanForConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting() - throws Exception { - TCPConduit tcpConduit = - new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class), - new Properties(), - TCPConduit -> connectionTable, socketCreator, doNothing(), false); - InternalDistributedMember member = mock(InternalDistributedMember.class); - doThrow(new IOException("Cannot form connection to alert listener")) - .when(connectionTable) - .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); - - AlertingAction.execute(() -> { - Throwable thrown = catchThrowable(() -> { - tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L); - }); - - assertThat(thrown) - .isInstanceOf(AlertingIOException.class); - }); - } - - @Test - public void getFirstScanForConnectionRethrows_ifCaughtIOException_whileNotAlerting() - throws Exception { - TCPConduit tcpConduit = - new TCPConduit(membership, 0, localHost, false, directChannel, mock(BufferPool.class), - new Properties(), - TCPConduit -> connectionTable, socketCreator, doNothing(), false); - InternalDistributedMember member = mock(InternalDistributedMember.class); - Connection connection = mock(Connection.class); - doThrow(new IOException("Connection not created in first try")) - .doReturn(connection) - .when(connectionTable) - .get(eq(member), anyBoolean(), anyLong(), anyLong(), anyLong(), anyBoolean()); - - Throwable thrown = catchThrowable(() -> { - tcpConduit.getFirstScanForConnection(member, false, 0L, 0L, 0L); - }); - - assertThat(thrown) - .isInstanceOf(IOException.class); - } - private Runnable doNothing() { return () -> { // nothing