This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-8020 in repository https://gitbox.apache.org/repos/asf/geode.git
commit a0b7880c978c23fd13da31a7161c1ffec2d9285e Author: Bruce Schuchardt <[email protected]> AuthorDate: Fri Apr 24 15:40:54 2020 -0700 GEODE-8020: buffer corruption in SSL communications revert change in GEODE-6661 that made NioSslEngine use a direct buffer. This class is not designed to share its buffer with a pool in BufferPool. Connection is also modified to use a heap buffer for reading encrypted SSL packets for consistency. New tests ensure that these buffers are the correct type when using SSL or plain sockets. --- .../geode/ClusterCommunicationsDUnitTest.java | 33 +++++++++++++++++++++- .../apache/geode/internal/net/NioSslEngine.java | 4 +-- .../org/apache/geode/internal/tcp/Connection.java | 16 +++++++---- .../geode/internal/net/NioSslEngineTest.java | 5 ++++ 4 files changed, 49 insertions(+), 9 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java index 3ae79fd..09f94e5 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java @@ -43,6 +43,8 @@ import java.io.DataOutput; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.Field; +import java.nio.Buffer; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -69,6 +71,7 @@ import org.apache.geode.distributed.Locator; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DirectReplyProcessor; +import org.apache.geode.distributed.internal.DistributionImpl; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.MessageWithReply; @@ -76,12 +79,15 @@ import org.apache.geode.distributed.internal.OperationExecutors; import org.apache.geode.distributed.internal.ReplyException; import org.apache.geode.distributed.internal.ReplyMessage; import org.apache.geode.distributed.internal.SerialAckedMessage; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.gms.membership.GMSJoinLeave; import org.apache.geode.internal.AvailablePortHelper; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.cache.DirectReplyMessage; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.serialization.DeserializationContext; import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.internal.tcp.Connection; import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.VM; @@ -143,7 +149,7 @@ public class ClusterCommunicationsDUnitTest implements Serializable { } @Test - public void createEntryAndVerifyUpdate() { + public void createEntryAndVerifyUpdate() throws Exception { int locatorPort = createLocator(getVM(0)); for (int i = 1; i <= NUM_SERVERS; i++) { createCacheAndRegion(getVM(i), locatorPort); @@ -157,6 +163,9 @@ public class ClusterCommunicationsDUnitTest implements Serializable { } for (int i = 1; i <= NUM_SERVERS; i++) { verifyUpdatedEntry(getVM(i)); + if (!disableTcp) { + verifyBufferType(getVM(i)); + } } } @@ -245,6 +254,28 @@ public class ClusterCommunicationsDUnitTest implements Serializable { }); } + private void verifyBufferType(VM vm) throws Exception { + vm.invoke("verify connection type", () -> { + assertThat(cache).isNotNull(); + InternalCache internalCache = (InternalCache) cache; + final DistributionImpl distribution = + (DistributionImpl) internalCache.getDistributionManager().getDistribution(); + InternalDistributedMember locatorMember = + (InternalDistributedMember) distribution.getCoordinator(); + final Connection connection = + distribution.getDirectChannel().getConduit().getConnection(locatorMember, false, false, + System.currentTimeMillis(), 15000, 0); + Field inputBufferField = Connection.class.getDeclaredField("inputBuffer"); + inputBufferField.setAccessible(true); + Buffer inputBuffer = (Buffer) inputBufferField.get(connection); + // SSL connections use heap buffers for decoding efficiency. Non-SSL connections use + // direct buffers since they don't need to be accessed as much + assertThat(inputBuffer.isDirect()).isNotEqualTo(useSSL); + }); + } + + + private void performCreate(VM memberVM) { memberVM.invoke("perform create", () -> cache .getRegion(regionName).put("testKey", "testValue")); diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index 2d55fa3..424e53a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -74,7 +74,7 @@ public class NioSslEngine implements NioFilter { int packetBufferSize = engine.getSession().getPacketBufferSize(); this.engine = engine; this.bufferPool = bufferPool; - this.myNetData = bufferPool.acquireDirectSenderBuffer(packetBufferSize); + this.myNetData = bufferPool.acquireNonDirectSenderBuffer(packetBufferSize); this.peerAppData = bufferPool.acquireNonDirectReceiveBuffer(appBufferSize); } @@ -301,7 +301,7 @@ public class NioSslEngine implements NioFilter { ByteBuffer buffer = wrappedBuffer; int requiredSize = engine.getSession().getPacketBufferSize(); if (buffer == null) { - buffer = bufferPool.acquireDirectBuffer(bufferType, requiredSize); + buffer = bufferPool.acquireNonDirectBuffer(bufferType, requiredSize); } else if (buffer.capacity() < requiredSize) { buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 7fbf4b2..f8f6932 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -1662,8 +1662,8 @@ public class Connection implements Runnable { } catch (IOException e) { // "Socket closed" check needed for Solaris jdk 1.4.2_08 if (!isSocketClosed() && !"Socket closed".equalsIgnoreCase(e.getMessage())) { - if (logger.isDebugEnabled() && !isIgnorableIOException(e)) { - logger.debug("{} io exception for {}", p2pReaderName(), this, e); + if (logger.isInfoEnabled() && !isIgnorableIOException(e)) { + logger.info("{} io exception for {}", p2pReaderName(), this, e); } if (e.getMessage().contains("interrupted by a call to WSACancelBlockingCall")) { if (logger.isDebugEnabled()) { @@ -1720,7 +1720,7 @@ public class Connection implements Runnable { if (inputBuffer != null) { getBufferPool().releaseReceiveBuffer(inputBuffer); } - inputBuffer = getBufferPool().acquireDirectReceiveBuffer(packetBufferSize); + inputBuffer = getBufferPool().acquireNonDirectReceiveBuffer(packetBufferSize); } if (channel.socket().getReceiveBufferSize() < packetBufferSize) { channel.socket().setReceiveBufferSize(packetBufferSize); @@ -1763,9 +1763,13 @@ public class Connection implements Runnable { } msg = msg.toLowerCase(); - return msg.contains("forcibly closed") - || msg.contains("reset by peer") - || msg.contains("connection reset"); + + if (e instanceof SSLException && msg.contains("status = closed")) { + return true; // engine has been closed - this is normal + } + + return (msg.contains("forcibly closed") || msg.contains("reset by peer") + || msg.contains("connection reset") || msg.contains("socket is closed")); } private static boolean validMsgType(int msgType) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index ef16a21..2c9be77 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -86,6 +86,11 @@ public class NioSslEngineTest { } @Test + public void engineUsesHeapBuffers() { + assertThat(nioSslEngine.myNetData.isDirect()).isFalse(); + } + + @Test public void handshake() throws Exception { SocketChannel mockChannel = mock(SocketChannel.class); when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
