This is an automated email from the ASF dual-hosted git repository. echobravo pushed a commit to branch support/1.12 in repository https://gitbox.apache.org/repos/asf/geode.git
commit c13a62199a3720c69c142b0dcd2f20130ac3b1a4 Author: Ernest Burghardt <eburgha...@pivotal.io> AuthorDate: Thu Jun 17 10:39:47 2021 -0500 Revert "GEODE-9141: (2 of 2) Handle in-buffer concurrency" This reverts commit 1a8eb5aec580eb75871060793ea65d62f5f2d959. --- ...LSocketHostNameVerificationIntegrationTest.java | 6 +- .../internal/net/SSLSocketIntegrationTest.java | 3 +- .../apache/geode/codeAnalysis/excludedClasses.txt | 2 +- .../geode/internal/net/ByteBufferSharing.java | 15 - .../geode/internal/net/ByteBufferSharingNoOp.java | 5 - .../geode/internal/net/ByteBufferVendor.java | 144 +++------ .../apache/geode/internal/net/NioSslEngine.java | 50 +-- .../apache/geode/internal/net/SocketCreator.java | 9 +- .../org/apache/geode/internal/tcp/Connection.java | 334 ++++++++++----------- .../geode/internal/net/ByteBufferVendorTest.java | 36 +-- .../geode/internal/net/NioSslEngineTest.java | 41 ++- .../apache/geode/internal/tcp/ConnectionTest.java | 1 - 12 files changed, 285 insertions(+), 361 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java index e86bfea..a70f3b1 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketHostNameVerificationIntegrationTest.java @@ -103,9 +103,6 @@ public class SSLSocketHostNameVerificationIntegrationTest { @Before public void setUp() throws Exception { - - SocketCreatorFactory.close(); // to clear socket creators made in previous tests - IgnoredException.addIgnoredException("javax.net.ssl.SSLException: Read timed out"); this.localHost = InetAddress.getLoopbackAddress(); @@ -175,7 +172,7 @@ public class SSLSocketHostNameVerificationIntegrationTest { try { this.socketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(), - sslEngine, 0, + sslEngine, 0, true, ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()), new BufferPool(mock(DMStats.class))); @@ -208,6 +205,7 @@ public class SSLSocketHostNameVerificationIntegrationTest { sc.handshakeSSLSocketChannel(socket.getChannel(), sslEngine, timeoutMillis, + false, ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize()), new BufferPool(mock(DMStats.class))); } catch (Throwable throwable) { diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java index 13e9d5b..e7ac191 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java @@ -217,7 +217,7 @@ public class SSLSocketIntegrationTest { clientSocket = clientChannel.socket(); NioSslEngine engine = clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(), - clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, + clusterSocketCreator.createSSLEngine("localhost", 1234, true), 0, true, ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class))); clientChannel.configureBlocking(true); @@ -267,6 +267,7 @@ public class SSLSocketIntegrationTest { sc.handshakeSSLSocketChannel(socket.getChannel(), sc.createSSLEngine("localhost", 1234, false), timeoutMillis, + false, ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class))); diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt index cd1af3a..af3bd1e 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt @@ -104,4 +104,4 @@ org/apache/geode/cache/query/internal/xml/ElementType org/apache/geode/cache/query/internal/xml/ElementType$1 org/apache/geode/cache/query/internal/xml/ElementType$2 org/apache/geode/cache/query/internal/xml/ElementType$3 -org/apache/geode/internal/net/ByteBufferVendor$OpenAttemptTimedOut \ No newline at end of file +org/apache/geode/internal/net/ByteBufferSharingImpl$OpenAttemptTimedOut \ No newline at end of file diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java index c8a94ce..cdfa897 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharing.java @@ -42,27 +42,12 @@ public interface ByteBufferSharing extends AutoCloseable { * * Subsequent calls to {@link #getBuffer()} will return that new buffer too. * - * This variant is for use when the buffer is being written to. - * * @return the same buffer or a different (bigger) buffer * @throws IOException if the buffer is no longer accessible */ ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException; /** - * Expand the buffer if needed. This may return a different object so be sure to pay attention to - * the return value if you need access to the potentially- expanded buffer. - * - * Subsequent calls to {@link #getBuffer()} will return that new buffer too. - * - * This variant is for use when the buffer is being read from. - * - * @return the same buffer or a different (bigger) buffer - * @throws IOException if the buffer is no longer accessible - */ - ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException; - - /** * Override {@link AutoCloseable#close()} without throws clause since we don't need one. */ @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java index 4f36e5b..4a8bc49 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferSharingNoOp.java @@ -48,10 +48,5 @@ class ByteBufferSharingNoOp implements ByteBufferSharing { } @Override - public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException { - throw new UnsupportedOperationException("Can't expand buffer when using NioPlainEngine"); - } - - @Override public void close() {} } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java index 1dc74f0..4933247 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/ByteBufferVendor.java @@ -17,7 +17,6 @@ package org.apache.geode.internal.net; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -28,49 +27,49 @@ import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.internal.net.BufferPool.BufferType; /** - * Produces (via {@link #open()}) an {@link ByteBufferSharing} meant to used only within a - * try-with-resources block. The resource controls access to a secondary resource - * (via {@link ByteBufferSharing#getBuffer()}) within the scope of try-with-resources. - * Neither the object returned by {@link #open()}, nor the object returned by invoking - * {@link ByteBufferSharing#getBuffer()} on that object may be used outside the scope of + * An {@link AutoCloseable} meant to be acquired in a try-with-resources statement. The resource (a + * {@link ByteBuffer}) is available (for reading and modification) in the scope of the * try-with-resources. */ -public class ByteBufferVendor { +class ByteBufferVendor implements ByteBufferSharing { static class OpenAttemptTimedOut extends Exception { } - private interface ByteBufferSharingInternal extends ByteBufferSharing { - void releaseBuffer(); - } - - private final Lock lock = new ReentrantLock(); - private final AtomicBoolean isDestructed = new AtomicBoolean(false); - private final AtomicInteger counter = new AtomicInteger(1); - // the object referenced by sharing is guarded by lock - private final ByteBufferSharingInternal sharing; + private final Lock lock; + private final AtomicBoolean isDestructed; + // mutable because in general our ByteBuffer may need to be resized (grown or compacted) + private volatile ByteBuffer buffer; + private final BufferType bufferType; + private final AtomicInteger counter; + private final BufferPool bufferPool; - /* - * These constructors are for use only by the owner of the shared resource. + /** + * This constructor is for use only by the owner of the shared resource (a {@link ByteBuffer}). * * A resource owner must invoke {@link #open()} once for each reference that escapes (is passed * to an external object or is returned to an external caller.) * - * Constructors acquire no locks. The reference count will be 1 after a constructor + * This constructor acquires no lock. The reference count will be 1 after this constructor * completes. */ + ByteBufferVendor(final ByteBuffer buffer, final BufferType bufferType, + final BufferPool bufferPool) { + this.buffer = buffer; + this.bufferType = bufferType; + this.bufferPool = bufferPool; + lock = new ReentrantLock(); + counter = new AtomicInteger(1); + isDestructed = new AtomicBoolean(false); + } /** - * When you have a ByteBuffer available before construction, use this constructor. - * - * @param bufferArg is the ByteBuffer - * @param bufferType needed for freeing the buffer later - * @param bufferPool needed for freeing the buffer later + * The destructor. Called by the resource owner to undo the work of the constructor. */ - public ByteBufferVendor(final ByteBuffer bufferArg, - final BufferType bufferType, - final BufferPool bufferPool) { - sharing = new ByteBufferSharingInternalImpl(bufferArg, bufferType, bufferPool); + void destruct() { + if (isDestructed.compareAndSet(false, true)) { + dropReference(); + } } /** @@ -79,19 +78,18 @@ public class ByteBufferVendor { * * Resource owners call this method as the last thing before returning a reference to the caller. * That caller binds that reference to a variable in a try-with-resources statement and relies on - * the AutoCloseable protocol to invoke {@link AutoCloseable#close()} on the object at - * the end of the block. + * the AutoCloseable protocol to invoke {@link #close()} on the object at the end of the block. */ - public ByteBufferSharing open() throws IOException { + ByteBufferSharing open() throws IOException { lock.lock(); addReferenceAfterLock(); - return sharing; + return this; } /** * This variant throws {@link OpenAttemptTimedOut} if it can't acquire the lock in time. */ - public ByteBufferSharing open(final long time, final TimeUnit unit) + ByteBufferSharing open(final long time, final TimeUnit unit) throws OpenAttemptTimedOut, IOException { try { if (!lock.tryLock(time, unit)) { @@ -102,25 +100,24 @@ public class ByteBufferVendor { throw new OpenAttemptTimedOut(); } addReferenceAfterLock(); - return sharing; - } - - /** - * The destructor. Called by the resource owner to undo the work of the constructor. - */ - public void destruct() { - if (isDestructed.compareAndSet(false, true)) { - dropReference(); - } + return this; } - private void exposingResource() throws IOException { + @Override + public ByteBuffer getBuffer() throws IOException { if (isDestructed.get()) { throwClosed(); } + return buffer; } - private void close() { + @Override + public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { + return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity); + } + + @Override + public void close() { /* * We are counting on our ReentrantLock throwing an exception if the current thread * does not hold the lock. In that case dropReference() will not be called. This @@ -145,11 +142,16 @@ public class ByteBufferVendor { private int dropReference() { final int usages = counter.decrementAndGet(); if (usages == 0) { - sharing.releaseBuffer(); + bufferPool.releaseBuffer(bufferType, buffer); } return usages; } + @VisibleForTesting + public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) { + buffer = newBufferForTesting; + } + private void addReferenceAfterLock() throws IOException { try { addReference(); @@ -163,54 +165,4 @@ public class ByteBufferVendor { throw new IOException("NioSslEngine has been closed"); } - private class ByteBufferSharingInternalImpl implements ByteBufferSharingInternal { - - /* - * mutable because in general our ByteBuffer may need to be resized (grown or compacted) - * no concurrency concerns since ByteBufferSharingNotNull is guarded by ByteBufferVendor.lock - */ - private ByteBuffer buffer; - private final BufferType bufferType; - private final BufferPool bufferPool; - - public ByteBufferSharingInternalImpl(final ByteBuffer buffer, - final BufferType bufferType, - final BufferPool bufferPool) { - Objects.requireNonNull(buffer); - this.buffer = buffer; - this.bufferType = bufferType; - this.bufferPool = bufferPool; - } - - @Override - public ByteBuffer getBuffer() throws IOException { - exposingResource(); - return buffer; - } - - @Override - public ByteBuffer expandWriteBufferIfNeeded(final int newCapacity) throws IOException { - return buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, getBuffer(), newCapacity); - } - - @Override - public ByteBuffer expandReadBufferIfNeeded(final int newCapacity) throws IOException { - return buffer = bufferPool.expandReadBufferIfNeeded(bufferType, getBuffer(), newCapacity); - } - - @Override - public void close() { - ByteBufferVendor.this.close(); - } - - @Override - public void releaseBuffer() { - bufferPool.releaseBuffer(bufferType, buffer); - } - } - - @VisibleForTesting - public void setBufferForTestingOnly(final ByteBuffer newBufferForTesting) { - ((ByteBufferSharingInternalImpl) sharing).buffer = newBufferForTesting; - } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index 4c603a0..fc91a31 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -62,12 +62,12 @@ public class NioSslEngine implements NioFilter { /** * holds bytes wrapped by the SSLEngine; a.k.a. myNetData */ - private final ByteBufferVendor outputBufferVendor; + private final ByteBufferVendor outputSharing; /** * holds the last unwrapped data from a peer; a.k.a. peerAppData */ - private final ByteBufferVendor inputBufferVendor; + private final ByteBufferVendor inputSharing; NioSslEngine(SSLEngine engine, BufferPool bufferPool) { SSLSession session = engine.getSession(); @@ -76,10 +76,10 @@ public class NioSslEngine implements NioFilter { closed = false; this.engine = engine; this.bufferPool = bufferPool; - outputBufferVendor = + outputSharing = new ByteBufferVendor(bufferPool.acquireDirectSenderBuffer(packetBufferSize), TRACKED_SENDER, bufferPool); - inputBufferVendor = + inputSharing = new ByteBufferVendor(bufferPool.acquireNonDirectReceiveBuffer(appBufferSize), TRACKED_RECEIVER, bufferPool); } @@ -98,7 +98,7 @@ public class NioSslEngine implements NioFilter { peerNetData.capacity(), engine.getSession().getPacketBufferSize())); } - final ByteBuffer handshakeBuffer = peerNetData; + ByteBuffer handshakeBuffer = peerNetData; handshakeBuffer.clear(); ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]); @@ -135,7 +135,7 @@ public class NioSslEngine implements NioFilter { switch (status) { case NEED_UNWRAP: - try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { + try (final ByteBufferSharing inputSharing = shareInputBuffer()) { final ByteBuffer peerAppData = inputSharing.getBuffer(); // Receive handshaking data from peer @@ -162,7 +162,7 @@ public class NioSslEngine implements NioFilter { } case NEED_WRAP: - try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) { + try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { final ByteBuffer myNetData = outputSharing.getBuffer(); // Empty the local network packet buffer. @@ -231,7 +231,7 @@ public class NioSslEngine implements NioFilter { @Override public ByteBufferSharing wrap(ByteBuffer appData) throws IOException { - try (final ByteBufferSharing outputSharing = outputBufferVendor.open()) { + try (final ByteBufferSharing outputSharing = shareOutputBuffer()) { ByteBuffer myNetData = outputSharing.getBuffer(); @@ -260,13 +260,13 @@ public class NioSslEngine implements NioFilter { myNetData.flip(); - return outputBufferVendor.open(); + return shareOutputBuffer(); } } @Override public ByteBufferSharing unwrap(ByteBuffer wrappedBuffer) throws IOException { - try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { + try (final ByteBufferSharing inputSharing = shareInputBuffer()) { ByteBuffer peerAppData = inputSharing.getBuffer(); @@ -292,7 +292,7 @@ public class NioSslEngine implements NioFilter { // partial data - need to read more. When this happens the SSLEngine will not have // changed the buffer position wrappedBuffer.compact(); - return inputBufferVendor.open(); + return shareInputBuffer(); case OK: break; default:// if there is data in the decrypted buffer return it. Otherwise signal that we're @@ -305,7 +305,7 @@ public class NioSslEngine implements NioFilter { } } wrappedBuffer.clear(); - return inputBufferVendor.open(); + return shareInputBuffer(); } } @@ -325,7 +325,7 @@ public class NioSslEngine implements NioFilter { @Override public ByteBufferSharing readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) throws IOException { - try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { + try (final ByteBufferSharing inputSharing = shareInputBuffer()) { ByteBuffer peerAppData = inputSharing.getBuffer(); @@ -355,13 +355,13 @@ public class NioSslEngine implements NioFilter { } } } - return inputBufferVendor.open(); + return shareInputBuffer(); } } @Override public ByteBufferSharing getUnwrappedBuffer() throws IOException { - return inputBufferVendor.open(); + return shareInputBuffer(); } @Override @@ -377,8 +377,8 @@ public class NioSslEngine implements NioFilter { return; } closed = true; - inputBufferVendor.destruct(); - try (final ByteBufferSharing outputSharing = outputBufferVendor.open(1, TimeUnit.MINUTES)) { + inputSharing.destruct(); + try (final ByteBufferSharing outputSharing = shareOutputBuffer(1, TimeUnit.MINUTES)) { final ByteBuffer myNetData = outputSharing.getBuffer(); if (!engine.isOutboundDone()) { @@ -412,7 +412,7 @@ public class NioSslEngine implements NioFilter { engine.closeOutbound(); } } finally { - outputBufferVendor.destruct(); + outputSharing.destruct(); } } @@ -422,12 +422,16 @@ public class NioSslEngine implements NioFilter { } @VisibleForTesting - public ByteBufferVendor getOutputBufferVendorForTestingOnly() throws IOException { - return outputBufferVendor; + public ByteBufferSharing shareOutputBuffer() throws IOException { + return outputSharing.open(); } - @VisibleForTesting - public ByteBufferVendor getInputBufferVendorForTestingOnly() throws IOException { - return inputBufferVendor; + private ByteBufferSharing shareOutputBuffer(final long time, final TimeUnit unit) + throws OpenAttemptTimedOut, IOException { + return outputSharing.open(time, unit); + } + + public ByteBufferSharing shareInputBuffer() throws IOException { + return inputSharing.open(); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java index a31bbb2..a232fca 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java @@ -803,16 +803,21 @@ public class SocketCreator extends TcpSocketCreatorImpl { * @param socketChannel the socket's NIO channel * @param engine the sslEngine (see createSSLEngine) * @param timeout handshake timeout in milliseconds. No timeout if <= 0 + * @param clientSocket set to true if you initiated the connect(), false if you accepted it * @param peerNetBuffer the buffer to use in reading data fron socketChannel. This should also be * used in subsequent I/O operations * @return The SSLEngine to be used in processing data for sending/receiving from the channel */ - public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, - SSLEngine engine, + public NioSslEngine handshakeSSLSocketChannel(SocketChannel socketChannel, SSLEngine engine, int timeout, + boolean clientSocket, ByteBuffer peerNetBuffer, BufferPool bufferPool) throws IOException { + engine.setUseClientMode(clientSocket); + if (!clientSocket) { + engine.setNeedClientAuth(sslConfig.isRequireAuth()); + } while (!socketChannel.finishConnect()) { try { Thread.sleep(50); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 107aa9f..f5a1886 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -18,7 +18,6 @@ import static java.lang.Boolean.FALSE; import static java.lang.ThreadLocal.withInitial; import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER_AUTH_INIT; import static org.apache.geode.distributed.internal.DistributionConfigImpl.SECURITY_SYSTEM_PREFIX; -import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER; import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX; import java.io.DataInput; @@ -80,7 +79,6 @@ import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.SystemTimer.SystemTimerTask; import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.ByteBufferSharing; -import org.apache.geode.internal.net.ByteBufferVendor; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.net.NioPlainEngine; import org.apache.geode.internal.net.SocketCreator; @@ -119,7 +117,7 @@ public class Connection implements Runnable { * Small buffer used for send socket buffer on receiver connections and receive buffer on sender * connections. */ - public static final int SMALL_BUFFER_SIZE = + static final int SMALL_BUFFER_SIZE = Integer.getInteger(GEMFIRE_PREFIX + "SMALL_BUFFER_SIZE", 4096); /** @@ -311,14 +309,11 @@ public class Connection implements Runnable { /** name of thread that we're currently performing an operation in (may be null) */ private String ackThreadName; - /* - * This object mediates access to the input ByteBuffer and ensures its return to - * pool after last use. This reference couldn't be final since it is initialized - * in createIoFilter() not in the constructors. It had to be initialized there - * because in general we have to construct an SSLEngine before we know the buffer - * size and createIoFilter() is where we create that object. - */ - private ByteBufferVendor inputBufferVendor; + /** the buffer used for message receipt */ + private ByteBuffer inputBuffer; + + /** Lock used to protect the input buffer */ + public final Object inputBufferLock = new Object(); /** the length of the next message to be dispatched */ private int messageLength; @@ -894,28 +889,27 @@ public class Connection implements Runnable { waitForAddressCompletion(); InternalDistributedMember myAddr = owner.getConduit().getMemberId(); - try (final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE)) { - /* - * Note a byte of zero is always written because old products serialized a member id with - * always sends an ip address. My reading of the ip-address specs indicated that the first - * byte of a valid address would never be 0. - */ - connectHandshake.writeByte(0); - connectHandshake.writeByte(HANDSHAKE_VERSION); - // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION - InternalDataSerializer.invokeToData(myAddr, connectHandshake); - connectHandshake.writeBoolean(sharedResource); - connectHandshake.writeBoolean(preserveOrder); - connectHandshake.writeLong(uniqueId); - // write the product version ordinal - Version.CURRENT.writeOrdinal(connectHandshake, true); - connectHandshake.writeInt(dominoCount.get() + 1); - // this writes the sending member + thread name that is stored in senderName - // on the receiver to show the cause of reader thread creation - connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR, - MsgIdGenerator.NO_MSG_ID); - writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null); - } + final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE); + /* + * Note a byte of zero is always written because old products serialized a member id with always + * sends an ip address. My reading of the ip-address specs indicated that the first byte of a + * valid address would never be 0. + */ + connectHandshake.writeByte(0); + connectHandshake.writeByte(HANDSHAKE_VERSION); + // NOTE: if you add or remove code in this section bump HANDSHAKE_VERSION + InternalDataSerializer.invokeToData(myAddr, connectHandshake); + connectHandshake.writeBoolean(sharedResource); + connectHandshake.writeBoolean(preserveOrder); + connectHandshake.writeLong(uniqueId); + // write the product version ordinal + Version.CURRENT.writeOrdinal(connectHandshake, true); + connectHandshake.writeInt(dominoCount.get() + 1); + // this writes the sending member + thread name that is stored in senderName + // on the receiver to show the cause of reader thread creation + connectHandshake.setMessageHeader(NORMAL_MSG_TYPE, OperationExecutors.STANDARD_EXECUTOR, + MsgIdGenerator.NO_MSG_ID); + writeFully(getSocket().getChannel(), connectHandshake.getContentBuffer(), false, null); } /** @@ -1359,7 +1353,7 @@ public class Connection implements Runnable { if (!isReceiver && !hasResidualReaderThread()) { // receivers release the input buffer when exiting run(). Senders use the // inputBuffer for reading direct-reply responses - inputBufferVendor.destruct(); + releaseInputBuffer(); } lengthSet = false; } @@ -1500,7 +1494,7 @@ public class Connection implements Runnable { } } - inputBufferVendor.destruct(); + releaseInputBuffer(); // make sure that if the reader thread exits we notify a thread waiting for the handshake. notifyHandshakeWaiter(false); @@ -1512,6 +1506,16 @@ public class Connection implements Runnable { } } + private void releaseInputBuffer() { + synchronized (inputBufferLock) { + ByteBuffer tmp = inputBuffer; + if (tmp != null) { + inputBuffer = null; + getBufferPool().releaseReceiveBuffer(tmp); + } + } + } + BufferPool getBufferPool() { return owner.getBufferPool(); } @@ -1533,6 +1537,9 @@ public class Connection implements Runnable { } private void readMessages() { + if (closing.get()) { + return; + } // take a snapshot of uniqueId to detect reconnect attempts SocketChannel channel; try { @@ -1578,6 +1585,8 @@ public class Connection implements Runnable { // we should not change the state of the connection if we are a handshake reader thread // as there is a race between this thread and the application thread doing direct ack boolean handshakeHasBeenRead = false; + // if we're using SSL/TLS the input buffer may already have data to process + boolean skipInitialRead = getInputBuffer().position() > 0; try { for (boolean isInitialRead = true;;) { if (stopped) { @@ -1597,9 +1606,8 @@ public class Connection implements Runnable { break; } - try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { - ByteBuffer buff = inputSharing.getBuffer(); - + try { + ByteBuffer buff = getInputBuffer(); synchronized (stateLock) { connectionState = STATE_READING; } @@ -1608,8 +1616,6 @@ public class Connection implements Runnable { amountRead = channel.read(buff); } else { isInitialRead = false; - // if we're using SSL/TLS the input buffer may already have data to process - final boolean skipInitialRead = buff.position() > 0; if (!skipInitialRead) { amountRead = channel.read(buff); } else { @@ -1674,11 +1680,11 @@ public class Connection implements Runnable { } catch (IOException e) { // "Socket closed" check needed for Solaris jdk 1.4.2_08 if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) { - if (logger.isInfoEnabled() && !isIgnorableIOException(e)) { - logger.info("{} io exception for {}", p2pReaderName(), this, e); + if (logger.isDebugEnabled() && !isIgnorableIOException(e)) { + logger.debug("{} io exception for {}", p2pReaderName(), this, e); } - if (logger.isDebugEnabled()) { - if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) { + if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) { + if (logger.isDebugEnabled()) { logger.debug( "{} received unexpected WSACancelBlockingCall exception, which may result in a hang", p2pReaderName()); @@ -1722,55 +1728,28 @@ public class Connection implements Runnable { private void createIoFilter(SocketChannel channel, boolean clientSocket) throws IOException { if (getConduit().useSSL() && channel != null) { InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress(); - String hostName; - if (remoteAddr != null) { - hostName = remoteAddr.getHostName(); - } else { - hostName = SocketCreator.getHostName(address.getAddress()); - } SSLEngine engine = - getConduit().getSocketCreator().createSSLEngine(hostName, - address.getPort(), clientSocket); - - final int packetBufferSize = engine.getSession().getPacketBufferSize(); - - inputBufferVendor = - new ByteBufferVendor( - getBufferPool().acquireDirectReceiveBuffer(packetBufferSize), - TRACKED_RECEIVER, - getBufferPool()); + getConduit().getSocketCreator().createSSLEngine(address.getHostName(), address.getPort(), + clientSocket); + int packetBufferSize = engine.getSession().getPacketBufferSize(); + if (inputBuffer == null || inputBuffer.capacity() < packetBufferSize) { + // TLS has a minimum input buffer size constraint + if (inputBuffer != null) { + getBufferPool().releaseReceiveBuffer(inputBuffer); + } + inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize); + } if (channel.socket().getReceiveBufferSize() < packetBufferSize) { channel.socket().setReceiveBufferSize(packetBufferSize); } if (channel.socket().getSendBufferSize() < packetBufferSize) { channel.socket().setSendBufferSize(packetBufferSize); } - try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { - final ByteBuffer inputBuffer = inputSharing.getBuffer(); - /* - * It's ok to share the inputBuffer with handshakeSSLSocketChannel() since that method - * accesses the referenced buffer for the handshake which completes before returning - * control here. The NioSslEngine retains no reference to the buffer. - */ - ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine, - getConduit().idleConnectionTimeout, inputBuffer, - getBufferPool()); - } + ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine, + getConduit().idleConnectionTimeout, clientSocket, inputBuffer, + getBufferPool()); } else { - final int allocSize; - if (recvBufferSize == -1) { - allocSize = owner.getConduit().tcpBufferSize; - } else { - allocSize = recvBufferSize; - } - - inputBufferVendor = - new ByteBufferVendor( - getBufferPool().acquireDirectReceiveBuffer(allocSize), - TRACKED_RECEIVER, - getBufferPool()); - ioFilter = new NioPlainEngine(getBufferPool()); } } @@ -1802,13 +1781,9 @@ public class Connection implements Runnable { } msg = msg.toLowerCase(); - - if (e instanceof SSLException && msg.contains("status = closed")) { - return true; // engine has been closed - this is normal - } - - return (msg.contains("forcibly closed") || msg.contains("reset by peer") - || msg.contains("connection reset") || msg.contains("socket is closed")); + return msg.contains("forcibly closed") + || msg.contains("reset by peer") + || msg.contains("connection reset"); } private static boolean validMsgType(int msgType) { @@ -2652,6 +2627,20 @@ public class Connection implements Runnable { } /** + * gets the buffer for receiving message length bytes + */ + private ByteBuffer getInputBuffer() { + if (inputBuffer == null) { + int allocSize = recvBufferSize; + if (allocSize == -1) { + allocSize = owner.getConduit().tcpBufferSize; + } + inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize); + } + return inputBuffer; + } + + /** * @throws SocketTimeoutException if wait expires. * @throws ConnectionException if ack is not received */ @@ -2758,93 +2747,72 @@ public class Connection implements Runnable { * deserialized and passed to TCPConduit for further processing */ private void processInputBuffer() throws ConnectionException, IOException { - try (final ByteBufferSharing inputSharing = inputBufferVendor.open()) { - // can't be final because in some cases we expand the buffer (resulting in a new object) - ByteBuffer inputBuffer = inputSharing.getBuffer(); - inputBuffer.flip(); + inputBuffer.flip(); - try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) { - final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer(); + try (final ByteBufferSharing sharedBuffer = ioFilter.unwrap(inputBuffer)) { + final ByteBuffer peerDataBuffer = sharedBuffer.getBuffer(); - peerDataBuffer.flip(); + peerDataBuffer.flip(); - boolean done = false; + boolean done = false; - while (!done && connected) { - owner.getConduit().getCancelCriterion().checkCancelInProgress(null); - int remaining = peerDataBuffer.remaining(); - if (lengthSet || remaining >= MSG_HEADER_BYTES) { - if (!lengthSet) { - if (readMessageHeader(peerDataBuffer)) { - break; - } + while (!done && connected) { + owner.getConduit().getCancelCriterion().checkCancelInProgress(null); + int remaining = peerDataBuffer.remaining(); + if (lengthSet || remaining >= MSG_HEADER_BYTES) { + if (!lengthSet) { + if (readMessageHeader(peerDataBuffer)) { + break; } - if (remaining >= messageLength + MSG_HEADER_BYTES) { - lengthSet = false; - peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES); - // don't trust the message deserialization to leave the position in - // the correct spot. Some of the serialization uses buffered - // streams that can leave the position at the wrong spot - int startPos = peerDataBuffer.position(); - int oldLimit = peerDataBuffer.limit(); - peerDataBuffer.limit(startPos + messageLength); - - if (handshakeRead) { - try { - readMessage(peerDataBuffer); - } catch (SerializationException e) { - logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit); - throw e; - } - } else { - try (ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer); - DataInputStream dis = new DataInputStream(bbis)) { - if (!isReceiver) { - // we read the handshake and then stop processing since we don't want - // to process the input buffer anymore in a handshake thread - readHandshakeForSender(dis, peerDataBuffer); - return; - } - if (readHandshakeForReceiver(dis)) { - ioFilter.doneReading(peerDataBuffer); - return; - } - } - } - if (!connected) { - continue; + } + if (remaining >= messageLength + MSG_HEADER_BYTES) { + lengthSet = false; + peerDataBuffer.position(peerDataBuffer.position() + MSG_HEADER_BYTES); + // don't trust the message deserialization to leave the position in + // the correct spot. Some of the serialization uses buffered + // streams that can leave the position at the wrong spot + int startPos = peerDataBuffer.position(); + int oldLimit = peerDataBuffer.limit(); + peerDataBuffer.limit(startPos + messageLength); + + if (handshakeRead) { + try { + readMessage(peerDataBuffer); + } catch (SerializationException e) { + logger.info("input buffer startPos {} oldLimit {}", startPos, oldLimit); + throw e; } - accessed(); - peerDataBuffer.limit(oldLimit); - peerDataBuffer.position(startPos + messageLength); } else { - done = true; - if (getConduit().useSSL()) { + ByteBufferInputStream bbis = new ByteBufferInputStream(peerDataBuffer); + DataInputStream dis = new DataInputStream(bbis); + if (!isReceiver) { + // we read the handshake and then stop processing since we don't want + // to process the input buffer anymore in a handshake thread + readHandshakeForSender(dis, peerDataBuffer); + return; + } + if (readHandshakeForReceiver(dis)) { ioFilter.doneReading(peerDataBuffer); - } else { - // compact or resize the buffer - final int oldBufferSize = inputBuffer.capacity(); - final int allocSize = messageLength + MSG_HEADER_BYTES; - if (oldBufferSize < allocSize) { - // need a bigger buffer - logger.info( - "Allocating larger network read buffer, new size is {} old size was {}.", - allocSize, oldBufferSize); - inputBuffer = inputSharing.expandReadBufferIfNeeded(allocSize); - } else { - if (inputBuffer.position() != 0) { - inputBuffer.compact(); - } else { - inputBuffer.position(inputBuffer.limit()); - inputBuffer.limit(inputBuffer.capacity()); - } - } + return; } } + if (!connected) { + continue; + } + accessed(); + peerDataBuffer.limit(oldLimit); + peerDataBuffer.position(startPos + messageLength); } else { - ioFilter.doneReading(peerDataBuffer); done = true; + if (getConduit().useSSL()) { + ioFilter.doneReading(peerDataBuffer); + } else { + compactOrResizeBuffer(messageLength); + } } + } else { + ioFilter.doneReading(peerDataBuffer); + done = true; } } } @@ -2979,9 +2947,10 @@ public class Connection implements Runnable { private void readMessage(ByteBuffer peerDataBuffer) { if (messageType == NORMAL_MSG_TYPE) { owner.getConduit().getStats().incMessagesBeingReceived(true, messageLength); - try (ByteBufferInputStream bbis = + ByteBufferInputStream bbis = remoteVersion == null ? new ByteBufferInputStream(peerDataBuffer) - : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion)) { + : new VersionedByteBufferInputStream(peerDataBuffer, remoteVersion); + try { ReplyProcessor21.initMessageRPId(); // add serialization stats long startSer = owner.getConduit().getStats().startMsgDeserialization(); @@ -3234,8 +3203,34 @@ public class Connection implements Runnable { private void setThreadName(int dominoNumber) { Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + remoteAddr + " " + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" : "un") - + "ordered sender uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "") - + " local port=" + socket.getLocalPort() + " remote port=" + socket.getPort()); + + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + dominoNumber : "") + + " port=" + socket.getPort()); + } + + private void compactOrResizeBuffer(int messageLength) { + final int oldBufferSize = inputBuffer.capacity(); + int allocSize = messageLength + MSG_HEADER_BYTES; + if (oldBufferSize < allocSize) { + // need a bigger buffer + logger.info("Allocating larger network read buffer, new size is {} old size was {}.", + allocSize, oldBufferSize); + ByteBuffer oldBuffer = inputBuffer; + inputBuffer = getBufferPool().acquireDirectReceiveBuffer(allocSize); + + if (oldBuffer != null) { + int oldByteCount = oldBuffer.remaining(); + inputBuffer.put(oldBuffer); + inputBuffer.position(oldByteCount); + getBufferPool().releaseReceiveBuffer(oldBuffer); + } + } else { + if (inputBuffer.position() != 0) { + inputBuffer.compact(); + } else { + inputBuffer.position(inputBuffer.limit()); + inputBuffer.limit(inputBuffer.capacity()); + } + } } private boolean dispatchMessage(DistributionMessage msg, int bytesRead, boolean directAck) @@ -3320,7 +3315,6 @@ public class Connection implements Runnable { * socket is properly closed at this end. When that is the case isResidualReaderThread * will return true. */ - @VisibleForTesting public boolean hasResidualReaderThread() { return hasResidualReaderThread; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java index fe4f08b..359d8aa 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/ByteBufferVendorTest.java @@ -32,11 +32,11 @@ import org.junit.Test; public class ByteBufferVendorTest { @FunctionalInterface - private interface Foo { + private static interface Foo { void run() throws IOException; } - private ByteBufferVendor sharingVendor; + private ByteBufferVendor sharing; private BufferPool poolMock; private CountDownLatch clientHasOpenedResource; private CountDownLatch clientMayComplete; @@ -44,7 +44,7 @@ public class ByteBufferVendorTest { @Before public void before() { poolMock = mock(BufferPool.class); - sharingVendor = + sharing = new ByteBufferVendor(mock(ByteBuffer.class), BufferPool.BufferType.TRACKED_SENDER, poolMock); clientHasOpenedResource = new CountDownLatch(1); @@ -54,7 +54,7 @@ public class ByteBufferVendorTest { @Test public void balancedCloseOwnerIsLastReferenceHolder() throws InterruptedException { resourceOwnerIsLastReferenceHolder("client with balanced close calls", () -> { - try (final ByteBufferSharing _unused = sharingVendor.open()) { + try (final ByteBufferSharing _unused = sharing.open()) { } }); } @@ -62,7 +62,7 @@ public class ByteBufferVendorTest { @Test public void extraCloseOwnerIsLastReferenceHolder() throws InterruptedException { resourceOwnerIsLastReferenceHolder("client with extra close calls", () -> { - final ByteBufferSharing sharing2 = sharingVendor.open(); + final ByteBufferSharing sharing2 = sharing.open(); sharing2.close(); verify(poolMock, times(0)).releaseBuffer(any(), any()); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); @@ -73,7 +73,7 @@ public class ByteBufferVendorTest { @Test public void balancedCloseClientIsLastReferenceHolder() throws InterruptedException { clientIsLastReferenceHolder("client with balanced close calls", () -> { - try (final ByteBufferSharing _unused = sharingVendor.open()) { + try (final ByteBufferSharing _unused = sharing.open()) { clientHasOpenedResource.countDown(); blockClient(); } @@ -83,42 +83,36 @@ public class ByteBufferVendorTest { @Test public void extraCloseClientIsLastReferenceHolder() throws InterruptedException { clientIsLastReferenceHolder("client with extra close calls", () -> { - final ByteBufferSharing sharing2 = sharingVendor.open(); + final ByteBufferSharing sharing2 = sharing.open(); clientHasOpenedResource.countDown(); blockClient(); sharing2.close(); verify(poolMock, times(1)).releaseBuffer(any(), any()); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); + System.out.println("here"); }); } @Test public void extraCloseDoesNotPrematurelyReturnBufferToPool() throws IOException { - final ByteBufferSharing sharing2 = sharingVendor.open(); + final ByteBufferSharing sharing2 = sharing.open(); sharing2.close(); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); verify(poolMock, times(0)).releaseBuffer(any(), any()); - sharingVendor.destruct(); + sharing.destruct(); verify(poolMock, times(1)).releaseBuffer(any(), any()); } @Test public void extraCloseDoesNotDecrementRefCount() throws IOException { - final ByteBufferSharing sharing2 = sharingVendor.open(); + final ByteBufferSharing sharing2 = sharing.open(); sharing2.close(); assertThatThrownBy(() -> sharing2.close()).isInstanceOf(IllegalMonitorStateException.class); - final ByteBufferSharing sharing3 = this.sharingVendor.open(); - sharingVendor.destruct(); + final ByteBufferSharing sharing3 = this.sharing.open(); + sharing.destruct(); verify(poolMock, times(0)).releaseBuffer(any(), any()); } - @Test - public void destructIsIdempotent() { - sharingVendor.destruct(); - sharingVendor.destruct(); - verify(poolMock, times(1)).releaseBuffer(any(), any()); - } - private void resourceOwnerIsLastReferenceHolder(final String name, final Foo client) throws InterruptedException { /* @@ -134,7 +128,7 @@ public class ByteBufferVendorTest { verify(poolMock, times(0)).releaseBuffer(any(), any()); - sharingVendor.destruct(); + sharing.destruct(); verify(poolMock, times(1)).releaseBuffer(any(), any()); } @@ -153,7 +147,7 @@ public class ByteBufferVendorTest { clientHasOpenedResource.await(); - sharingVendor.destruct(); + sharing.destruct(); verify(poolMock, times(0)).releaseBuffer(any(), any()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index d6b9aa6..62a858c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -90,8 +90,7 @@ public class NioSslEngineTest { @Test public void engineUsesDirectBuffers() throws IOException { - try (final ByteBufferSharing outputSharing = - nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { assertThat(outputSharing.getBuffer().isDirect()).isTrue(); } } @@ -191,8 +190,7 @@ public class NioSslEngineTest { @Test public void wrap() throws Exception { - try (final ByteBufferSharing outputSharing = - nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { // make the application data too big to fit into the engine's encryption buffer ByteBuffer appData = @@ -223,8 +221,7 @@ public class NioSslEngineTest { @Test public void wrapFails() throws IOException { - try (final ByteBufferSharing outputSharing = - nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { // make the application data too big to fit into the engine's encryption buffer ByteBuffer appData = ByteBuffer.allocate(outputSharing.getBuffer().capacity() + 100); @@ -247,8 +244,7 @@ public class NioSslEngineTest { @Test public void unwrapWithBufferOverflow() throws Exception { - try (final ByteBufferSharing inputSharing = - nioSslEngine.getInputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { // make the application data too big to fit into the engine's encryption buffer final ByteBuffer peerAppData = inputSharing.getBuffer(); @@ -288,8 +284,7 @@ public class NioSslEngineTest { @Test public void unwrapWithBufferUnderflow() throws Exception { - try (final ByteBufferSharing inputSharing = - nioSslEngine.getInputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { ByteBuffer wrappedData = ByteBuffer.allocate(inputSharing.getBuffer().capacity()); byte[] netBytes = new byte[wrappedData.capacity() / 2]; @@ -314,8 +309,7 @@ public class NioSslEngineTest { @Test public void unwrapWithDecryptionError() throws IOException { - try (final ByteBufferSharing inputSharing = - nioSslEngine.getInputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { // make the application data too big to fit into the engine's encryption buffer ByteBuffer wrappedData = ByteBuffer.allocate(inputSharing.getBuffer().capacity()); @@ -374,10 +368,10 @@ public class NioSslEngineTest { when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenReturn( new SSLEngineResult(CLOSED, FINISHED, 0, 0)); nioSslEngine.close(mockChannel); - assertThatThrownBy(() -> nioSslEngine.getOutputBufferVendorForTestingOnly().open().getBuffer()) + assertThatThrownBy(() -> nioSslEngine.shareOutputBuffer().getBuffer()) .isInstanceOf(IOException.class) .hasMessageContaining("NioSslEngine has been closed"); - assertThatThrownBy(() -> nioSslEngine.getInputBufferVendorForTestingOnly().open().getBuffer()) + assertThatThrownBy(() -> nioSslEngine.shareInputBuffer().getBuffer()) .isInstanceOf(IOException.class) .hasMessageContaining("NioSslEngine has been closed"); nioSslEngine.close(mockChannel); @@ -407,8 +401,7 @@ public class NioSslEngineTest { when(mockEngine.isOutboundDone()).thenReturn(Boolean.FALSE); when(mockEngine.wrap(any(ByteBuffer.class), any(ByteBuffer.class))).thenAnswer((x) -> { - try (final ByteBufferSharing outputSharing = - nioSslEngine.getOutputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing outputSharing = nioSslEngine.shareOutputBuffer()) { // give the NioSslEngine something to write on its socket channel, simulating a TLS close // message outputSharing.getBuffer().put("Goodbye cruel world".getBytes()); @@ -444,8 +437,7 @@ public class NioSslEngineTest { ByteBuffer wrappedBuffer = ByteBuffer.allocate(1000); SocketChannel mockChannel = mock(SocketChannel.class); - try (final ByteBufferSharing inputSharing = - nioSslEngine.getInputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { // force a compaction by making the decoded buffer appear near to being full ByteBuffer unwrappedBuffer = inputSharing.getBuffer(); unwrappedBuffer.position(unwrappedBuffer.capacity() - individualRead); @@ -495,7 +487,10 @@ public class NioSslEngineTest { ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer); + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing; + inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); + } // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { @@ -523,8 +518,7 @@ public class NioSslEngineTest { assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); // The initial available space in the unwrapped buffer should have doubled int initialFreeSpace = initialUnwrappedBufferSize - preexistingBytes; - try (final ByteBufferSharing inputSharing = - nioSslEngine.getInputBufferVendorForTestingOnly().open()) { + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { assertThat(inputSharing.getBuffer().capacity()) .isEqualTo(2 * initialFreeSpace + preexistingBytes); } @@ -550,7 +544,10 @@ public class NioSslEngineTest { // force buffer expansion by making a small decoded buffer appear near to being full ByteBuffer unwrappedBuffer = ByteBuffer.allocate(initialUnwrappedBufferSize); unwrappedBuffer.position(7).limit(preexistingBytes + 7); // 7 bytes of message header - ignored - nioSslEngine.getInputBufferVendorForTestingOnly().setBufferForTestingOnly(unwrappedBuffer); + try (final ByteBufferSharing inputSharing = nioSslEngine.shareInputBuffer()) { + final ByteBufferVendor inputSharingImpl = (ByteBufferVendor) inputSharing; + inputSharingImpl.setBufferForTestingOnly(unwrappedBuffer); + } // simulate some socket reads when(mockChannel.read(any(ByteBuffer.class))).thenAnswer(new Answer<Integer>() { diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java index 40f1ed3..c064afb 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionTest.java @@ -103,7 +103,6 @@ public class ConnectionTest { TCPConduit tcpConduit = mock(TCPConduit.class); when(connectionTable.getConduit()).thenReturn(tcpConduit); - when(connectionTable.getBufferPool()).thenReturn(mock(BufferPool.class)); when(distributionConfig.getMemberTimeout()).thenReturn(100); when(tcpConduit.getSocketId()).thenReturn(new InetSocketAddress(getLocalHost(), 12345));