This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-6008 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 071c4e4c98144741e30adcee19d72bec2cdc2c34 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Fri Mar 20 15:56:22 2020 -0700 GEODE-6008 work in progress --- ...tServerHostNameVerificationDistributedTest.java | 2 + .../internal/tcp/TCPConduitBareBonesDUnitTest.java | 76 ++++++++++++++++++++++ .../apache/geode/distributed/internal/DMStats.java | 2 + .../distributed/internal/DistributionStats.java | 9 +++ .../internal/LonerDistributionManager.java | 5 ++ .../org/apache/geode/internal/tcp/Connection.java | 29 +++++++-- .../org/apache/geode/internal/tcp/TCPConduit.java | 4 +- 7 files changed, 118 insertions(+), 9 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java index 8f1db5f..8ce2b34 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/client/internal/ClientServerHostNameVerificationDistributedTest.java @@ -39,6 +39,7 @@ import org.apache.geode.cache.client.NoAvailableServersException; import org.apache.geode.cache.ssl.CertStores; import org.apache.geode.cache.ssl.CertificateBuilder; import org.apache.geode.cache.ssl.CertificateMaterial; +import org.apache.geode.internal.inet.LocalHostUtil; import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.rules.ClusterStartupRule; @@ -162,6 +163,7 @@ public class ClientServerHostNameVerificationDistributedTest { .sanDnsName(InetAddress.getLoopbackAddress().getHostName()) .sanDnsName(InetAddress.getLocalHost().getHostName()) .sanDnsName(InetAddress.getLocalHost().getCanonicalHostName()) + .sanIpAddress(LocalHostUtil.getLocalHost()) .sanIpAddress(InetAddress.getLocalHost()) .generate(); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java new file mode 100644 index 0000000..77eb02e --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/tcp/TCPConduitBareBonesDUnitTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.tcp; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; + +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.distributed.internal.DistributionConfig; +import org.apache.geode.distributed.internal.DistributionConfigImpl; +import org.apache.geode.distributed.internal.DistributionImpl; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.test.dunit.DistributedTestUtils; +import org.apache.geode.test.dunit.rules.DistributedRule; + +public class TCPConduitBareBonesDUnitTest { + + @Rule + public DistributedRule distributedRule = new DistributedRule(0); + + @Test + public void testShortTermSharedConnections() throws Exception { + // create and close many shared, ordered connections. Also see CloseConnectionTest, + // which ensures that the outgoing socket in a shared connection is properly closed + Properties properties = new Properties(); + properties.put(ConfigurationProperties.LOG_LEVEL, "fine"); + properties.put(ConfigurationProperties.LOCATORS, DistributedTestUtils.getLocators()); + DistributionConfig configuration = new DistributionConfigImpl(properties); + InternalDistributedSystem distributedSystem = + (InternalDistributedSystem) InternalDistributedSystem.connect(configuration.toProperties()); + TCPConduit conduit = + ((DistributionImpl) distributedSystem.getDistributionManager().getDistribution()) + .getDirectChannel().getConduit(); + + InternalDistributedMember memberID = + distributedSystem.getDistributionManager().getDistribution().getView().getCreator(); + ConnectionTable.threadWantsSharedResources(); + + try { + for (int i = 0; i < 1000; i++) { + long buffersSize = + distributedSystem.getDistributionManager().getStats().getSenderBufferSize(true); + // connect to the dunit locator. The first iteration will get the Connection that was formed + // during startup, but we'll close it and start creating new ones. + Connection connection = + conduit.getConnection(memberID, true, false, System.currentTimeMillis(), 15000, 0); + System.out.println("Test iteration " + i + ": " + connection); + connection.requestClose("for test"); + // the connection should be stopped at this point + assertThat(connection.isReceiverStopped()).isTrue(); + // make sure there are no double releases of ByteBuffers + assertThat(distributedSystem.getDistributionManager().getStats().getSenderBufferSize(true)) + .isEqualTo(buffersSize); + } + } finally { + distributedSystem.disconnect(); + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java index 0f880b2..f910e9d 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DMStats.java @@ -475,6 +475,8 @@ public interface DMStats extends MembershipStatistics { */ void incSenderBufferSize(int inc, boolean direct); + long getSenderBufferSize(boolean direct); + /** * @since GemFire 5.0.2.4 */ diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java index b08a512..c44f29a 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java @@ -2287,6 +2287,15 @@ public class DistributionStats implements DMStats { } @Override + public long getSenderBufferSize(boolean direct) { + if (direct) { + return stats.getLong(senderDirectBufferSizeId); + } else { + return stats.getLong(senderHeapBufferSizeId); + } + } + + @Override public void incMessagesBeingReceived(boolean newMsg, int bytes) { if (newMsg) { stats.incInt(messagesBeingReceivedId, 1); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java index f2e9f51..9cb34d2 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java @@ -800,6 +800,11 @@ public class LonerDistributionManager implements DistributionManager { public void incSenderBufferSize(int inc, boolean direct) {} @Override + public long getSenderBufferSize(boolean direct) { + return 0; + } + + @Override public long startSocketLock() { return 0; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 8c8a2fc..893d42d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -471,6 +471,12 @@ public class Connection implements Runnable { private volatile boolean ackTimedOut; /** + * a Reader thread for an shared Connection will remain around in order to + * ensure that the socket is properly closed. + */ + private volatile boolean isResidualReaderThread; + + /** * creates a "reader" connection that we accepted (it was initiated by an explicit connect being * done on the other side). */ @@ -1326,7 +1332,7 @@ public class Connection implements Runnable { } // make sure our socket is closed asyncClose(false); - if (!isReceiver) { + if (!isReceiver && !isResidualReaderThread) { // receivers release the input buffer when exiting run(). Senders use the // inputBuffer for reading direct-reply responses releaseInputBuffer(); @@ -1465,6 +1471,7 @@ public class Connection implements Runnable { asyncClose(false); } } + releaseInputBuffer(); // make sure that if the reader thread exits we notify a thread waiting for the handshake. @@ -1508,6 +1515,9 @@ public class Connection implements Runnable { } private void readMessages() { + if (closing.get()) { + return; + } // take a snapshot of uniqueId to detect reconnect attempts SocketChannel channel; try { @@ -1552,7 +1562,7 @@ public class Connection implements Runnable { } // we should not change the state of the connection if we are a handshake reader thread // as there is a race between this thread and the application thread doing direct ack - boolean isHandShakeReader = false; + boolean handshakeHasBeenRead = false; // if we're using SSL/TLS the input buffer may already have data to process boolean skipInitialRead = getInputBuffer().position() > 0; try { @@ -1607,7 +1617,7 @@ public class Connection implements Runnable { } processInputBuffer(); - if (!isHandShakeReader && !isReceiver && (handshakeRead || handshakeCancelled)) { + if (!handshakeHasBeenRead && !isReceiver && (handshakeRead || handshakeCancelled)) { if (logger.isDebugEnabled()) { if (handshakeRead) { logger.debug("handshake has been read {}", this); @@ -1615,12 +1625,16 @@ public class Connection implements Runnable { logger.debug("handshake has been cancelled {}", this); } } - isHandShakeReader = true; + handshakeHasBeenRead = true; // Once we have read the handshake for unshared connections, the reader can skip // processing messages if (!sharedResource || asyncMode) { break; + } else { + // not exiting and not a Reader spawned from a ServerSocket.accept(), so + // let's set some state noting that this is happening + isResidualReaderThread = true; } } @@ -1663,7 +1677,7 @@ public class Connection implements Runnable { return; } catch (Exception e) { - owner.getConduit().getCancelCriterion().checkCancelInProgress(null); + owner.getConduit().getCancelCriterion().checkCancelInProgress(e); if (!stopped && !isSocketClosed()) { logger.fatal(String.format("%s exception in channel read", p2pReaderName()), e); } @@ -1676,14 +1690,15 @@ public class Connection implements Runnable { } } } finally { - if (!isHandShakeReader || (sharedResource && !asyncMode)) { + isResidualReaderThread = false; + if (!handshakeHasBeenRead || (sharedResource && !asyncMode)) { synchronized (stateLock) { connectionState = STATE_IDLE; } } if (logger.isDebugEnabled()) { logger.debug("readMessages terminated id={} from {} isHandshakeReader={}", conduitIdStr, - remoteAddr, isHandShakeReader); + remoteAddr, handshakeHasBeenRead); } } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java index 6f5edda..51af0de 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java @@ -305,14 +305,14 @@ public class TCPConduit implements Runnable { logger.warn("exception parsing p2p.idleConnectionTimeout", e); } - s = p.getProperty("membership_port_range_start"); + s = p.getProperty("membership_port_range_start", "" + DEFAULT_MEMBERSHIP_PORT_RANGE[0]); try { tcpPortRange[0] = Integer.parseInt(s); } catch (Exception e) { logger.warn("Exception parsing membership-port-range start port.", e); } - s = p.getProperty("membership_port_range_end"); + s = p.getProperty("membership_port_range_end", "" + DEFAULT_MEMBERSHIP_PORT_RANGE[1]); try { tcpPortRange[1] = Integer.parseInt(s); } catch (Exception e) {