This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6661
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 06615ac4752be45e2d9e315ae9f9ffd93bf50e09
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Tue Nov 5 10:49:39 2019 -0800

    GEODE-6661 NioSslEngine has some problems in its ByteBuffer management
    
    Reverting the change to use a temporary byte buffer for SSL handshakes.
    At the end of a handshake the buffer may contain application data that
    must be available for subsequent decryption.  In the case of TCPConduit
    this is usually the "handshake" bytes transmitted for that package's
    communications protocol.
    
    Since we really need those bytes I've removed the option of expanding
    the handshake buffer if it's smaller than the SSL session's required
    packet size.  TCPConduit uses that figure to allocate the buffer so this
    should be safe.  I've added a test for this.
---
 .../geode/ClusterCommunicationsDUnitTest.java      |  53 +++++--
 .../apache/geode/internal/net/NioSslEngine.java    | 161 ++++++++++-----------
 .../geode/internal/net/NioSslEngineTest.java       |  32 +++-
 3 files changed, 155 insertions(+), 91 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
index 492ec3a..8d313a1 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/ClusterCommunicationsDUnitTest.java
@@ -167,10 +167,17 @@ public class ClusterCommunicationsDUnitTest implements 
Serializable {
       createCacheAndRegion(getVM(i), locatorPort);
     }
     performCreateWithLargeValue(getVM(1));
-    // fault the value into an empty cache - forces use of message chunking
-    for (int i = 1; i <= NUM_SERVERS - 1; i++) {
-      verifyCreatedEntry(getVM(i));
+    performUpdateWithLargeValue(getVM(1));
+  }
+
+  @Test
+  public void createEntryWithSmallMessage() {
+    int locatorPort = createLocator(getVM(0));
+    for (int i = 1; i <= NUM_SERVERS; i++) {
+      createCacheAndRegion(getVM(i), locatorPort);
     }
+    performCreateWithSmallValue(getVM(1));
+    performUpdateWithSmallValue(getVM(1));
   }
 
   @Test
@@ -262,12 +269,40 @@ public class ClusterCommunicationsDUnitTest implements 
Serializable {
 
   private void performCreateWithLargeValue(VM memberVM) {
     memberVM.invoke("perform create", () -> {
-      byte[] value = new byte[SMALL_BUFFER_SIZE * 20];
+      byte[] value = new byte[1024];
       Arrays.fill(value, (byte) 1);
       cache.getRegion(regionName).put("testKey", value);
     });
   }
 
+  private void performUpdateWithLargeValue(VM memberVM) {
+    memberVM.invoke("perform update", () -> {
+      byte[] value = new byte[1024];
+      Arrays.fill(value, (byte) 1);
+      for (int i = 0; i < 1000; i++) {
+        cache.getRegion(regionName).put("testKey", value);
+      }
+    });
+  }
+
+  private void performCreateWithSmallValue(VM memberVM) {
+    memberVM.invoke("perform create", () -> {
+      byte[] value = new byte[512];
+      Arrays.fill(value, (byte) 1);
+      cache.getRegion(regionName).put("testKey", value);
+    });
+  }
+
+  private void performUpdateWithSmallValue(VM memberVM) {
+    memberVM.invoke("perform update", () -> {
+      byte[] value = new byte[512];
+      Arrays.fill(value, (byte) 1);
+      for (int i = 0; i < 1000; i++) {
+        cache.getRegion(regionName).put("testKey", value);
+      }
+    });
+  }
+
   private void verifyCreatedEntry(VM memberVM) {
     memberVM.invoke("verify entry created", () -> Assert.assertTrue(cache
         .getRegion(regionName).containsKey("testKey")));
@@ -339,11 +374,11 @@ public class ClusterCommunicationsDUnitTest implements 
Serializable {
   }
 
   enum RunConfiguration {
-    SHARED_CONNECTIONS(true, false, false),
-    SHARED_CONNECTIONS_WITH_SSL(true, true, false),
-    UNSHARED_CONNECTIONS(false, false, false),
-    UNSHARED_CONNECTIONS_WITH_SSL(false, true, false),
-    UDP_CONNECTIONS(true, false, true);
+    // SHARED_CONNECTIONS(true, false, false),
+    // SHARED_CONNECTIONS_WITH_SSL(true, true, false),
+    // UNSHARED_CONNECTIONS(false, false, false),
+    UNSHARED_CONNECTIONS_WITH_SSL(false, true, false);
+    // UDP_CONNECTIONS(true, false, true);
 
     boolean useSSL;
     boolean conserveSockets;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index ba5fa16..e914847 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -86,11 +86,14 @@ public class NioSslEngine implements NioFilter {
       ByteBuffer peerNetData)
       throws IOException, InterruptedException {
 
-    if (logger.isDebugEnabled()) {
-      logger.debug("Allocating new buffer for SSL handshake");
+    if (peerNetData.capacity() < engine.getSession().getPacketBufferSize()) {
+      throw new IllegalArgumentException(String.format("Provided buffer is too 
small to perform "
+          + "SSL handshake.  Buffer capacity is %s but need %s",
+          peerNetData.capacity(), engine.getSession().getPacketBufferSize()));
     }
-    ByteBuffer handshakeBuffer =
-        
bufferPool.acquireDirectReceiveBuffer(engine.getSession().getPacketBufferSize());
+
+    ByteBuffer handshakeBuffer = peerNetData;
+    handshakeBuffer.clear();
 
     ByteBuffer myAppData = ByteBuffer.wrap(new byte[0]);
 
@@ -109,88 +112,84 @@ public class NioSslEngine implements NioFilter {
     SSLEngineResult.HandshakeStatus status = engine.getHandshakeStatus();
     SSLEngineResult engineResult = null;
 
-    try {
-      // Process handshaking message
-      while (status != FINISHED &&
-          status != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
-        if (socketChannel.socket().isClosed()) {
-          logger.info("Handshake terminated because socket is closed");
-          throw new SocketException("handshake terminated - socket is closed");
+    // Process handshaking message
+    while (status != FINISHED &&
+        status != SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING) {
+      if (socketChannel.socket().isClosed()) {
+        logger.info("Handshake terminated because socket is closed");
+        throw new SocketException("handshake terminated - socket is closed");
+      }
+
+      if (timeoutNanos > 0) {
+        if (timeoutNanos < System.nanoTime()) {
+          logger.info("TLS handshake is timing out");
+          throw new SocketTimeoutException("handshake timed out");
         }
+      }
 
-        if (timeoutNanos > 0) {
-          if (timeoutNanos < System.nanoTime()) {
-            logger.info("TLS handshake is timing out");
-            throw new SocketTimeoutException("handshake timed out");
+      switch (status) {
+        case NEED_UNWRAP:
+          // Receive handshaking data from peer
+          int dataRead = socketChannel.read(handshakeBuffer);
+
+          // Process incoming handshaking data
+          handshakeBuffer.flip();
+          engineResult = engine.unwrap(handshakeBuffer, peerAppData);
+          handshakeBuffer.compact();
+          status = engineResult.getHandshakeStatus();
+
+          // if we're not finished, there's nothing to process and no data was 
read let's hang out
+          // for a little
+          if (peerAppData.remaining() == 0 && dataRead == 0 && status == 
NEED_UNWRAP) {
+            Thread.sleep(10);
           }
-        }
 
-        switch (status) {
-          case NEED_UNWRAP:
-            // Receive handshaking data from peer
-            int dataRead = socketChannel.read(handshakeBuffer);
-
-            // Process incoming handshaking data
-            handshakeBuffer.flip();
-            engineResult = engine.unwrap(handshakeBuffer, peerAppData);
-            handshakeBuffer.compact();
-            status = engineResult.getHandshakeStatus();
-
-            // if we're not finished, there's nothing to process and no data 
was read let's hang out
-            // for a little
-            if (peerAppData.remaining() == 0 && dataRead == 0 && status == 
NEED_UNWRAP) {
-              Thread.sleep(10);
-            }
-
-            if (engineResult.getStatus() == BUFFER_OVERFLOW) {
-              peerAppData =
-                  expandWriteBuffer(TRACKED_RECEIVER, peerAppData, 
peerAppData.capacity() * 2);
-            }
-            break;
-
-          case NEED_WRAP:
-            // Empty the local network packet buffer.
-            myNetData.clear();
-
-            // Generate handshaking data
-            engineResult = engine.wrap(myAppData, myNetData);
-            status = engineResult.getHandshakeStatus();
-
-            // Check status
-            switch (engineResult.getStatus()) {
-              case BUFFER_OVERFLOW:
-                myNetData =
-                    expandWriteBuffer(TRACKED_SENDER, myNetData,
-                        myNetData.capacity() * 2);
-                break;
-              case OK:
-                myNetData.flip();
-                // Send the handshaking data to peer
-                while (myNetData.hasRemaining()) {
-                  socketChannel.write(myNetData);
-                }
-                break;
-              case CLOSED:
-                break;
-              default:
-                logger.info("handshake terminated with illegal state due to 
{}", status);
-                throw new IllegalStateException(
-                    "Unknown SSLEngineResult status: " + 
engineResult.getStatus());
-            }
-            break;
-          case NEED_TASK:
-            // Handle blocking tasks
-            handleBlockingTasks();
-            status = engine.getHandshakeStatus();
-            break;
-          default:
-            logger.info("handshake terminated with illegal state due to {}", 
status);
-            throw new IllegalStateException("Unknown SSL Handshake state: " + 
status);
-        }
-        Thread.sleep(10);
+          if (engineResult.getStatus() == BUFFER_OVERFLOW) {
+            peerAppData =
+                expandWriteBuffer(TRACKED_RECEIVER, peerAppData, 
peerAppData.capacity() * 2);
+          }
+          break;
+
+        case NEED_WRAP:
+          // Empty the local network packet buffer.
+          myNetData.clear();
+
+          // Generate handshaking data
+          engineResult = engine.wrap(myAppData, myNetData);
+          status = engineResult.getHandshakeStatus();
+
+          // Check status
+          switch (engineResult.getStatus()) {
+            case BUFFER_OVERFLOW:
+              myNetData =
+                  expandWriteBuffer(TRACKED_SENDER, myNetData,
+                      myNetData.capacity() * 2);
+              break;
+            case OK:
+              myNetData.flip();
+              // Send the handshaking data to peer
+              while (myNetData.hasRemaining()) {
+                socketChannel.write(myNetData);
+              }
+              break;
+            case CLOSED:
+              break;
+            default:
+              logger.info("handshake terminated with illegal state due to {}", 
status);
+              throw new IllegalStateException(
+                  "Unknown SSLEngineResult status: " + 
engineResult.getStatus());
+          }
+          break;
+        case NEED_TASK:
+          // Handle blocking tasks
+          handleBlockingTasks();
+          status = engine.getHandshakeStatus();
+          break;
+        default:
+          logger.info("handshake terminated with illegal state due to {}", 
status);
+          throw new IllegalStateException("Unknown SSL Handshake state: " + 
status);
       }
-    } finally {
-      bufferPool.releaseReceiveBuffer(handshakeBuffer);
+      Thread.sleep(10);
     }
     if (status != FINISHED) {
       logger.info("handshake terminated with exception due to {}", status);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index e5a4963..e50b878 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -109,7 +109,7 @@ public class NioSslEngineTest {
         new SSLEngineResult(BUFFER_OVERFLOW, NEED_WRAP, 0, 0),
         new SSLEngineResult(CLOSED, FINISHED, 100, 0));
 
-    spyNioSslEngine.handshake(mockChannel, 10000, 
ByteBuffer.allocate(netBufferSize / 2));
+    spyNioSslEngine.handshake(mockChannel, 10000, 
ByteBuffer.allocate(netBufferSize));
     verify(mockEngine, atLeast(2)).getHandshakeStatus();
     verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), 
any(ByteBuffer.class));
     verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), 
any(ByteBuffer.class));
@@ -120,6 +120,36 @@ public class NioSslEngineTest {
   }
 
   @Test
+  public void handshakeWithInsufficientBufferSize() throws Exception {
+    SocketChannel mockChannel = mock(SocketChannel.class);
+    when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
+    Socket mockSocket = mock(Socket.class);
+    when(mockChannel.socket()).thenReturn(mockSocket);
+    when(mockSocket.isClosed()).thenReturn(false);
+
+    // // initial read of handshake status followed by read of handshake 
status after task execution
+    // when(mockEngine.getHandshakeStatus()).thenReturn(NEED_UNWRAP, 
NEED_WRAP);
+    //
+    // // interleaved wraps/unwraps/task-execution
+    // when(mockEngine.unwrap(any(ByteBuffer.class), 
any(ByteBuffer.class))).thenReturn(
+    // new SSLEngineResult(OK, NEED_WRAP, 100, 100),
+    // new SSLEngineResult(BUFFER_OVERFLOW, NEED_UNWRAP, 0, 0),
+    // new SSLEngineResult(OK, NEED_TASK, 100, 0));
+    //
+    // when(mockEngine.getDelegatedTask()).thenReturn(() -> {
+    // }, (Runnable) null);
+    //
+    // when(mockEngine.wrap(any(ByteBuffer.class), 
any(ByteBuffer.class))).thenReturn(
+    // new SSLEngineResult(OK, NEED_UNWRAP, 100, 100),
+    // new SSLEngineResult(BUFFER_OVERFLOW, NEED_WRAP, 0, 0),
+    // new SSLEngineResult(CLOSED, FINISHED, 100, 0));
+    //
+    assertThatThrownBy(() -> spyNioSslEngine.handshake(mockChannel, 10000,
+        ByteBuffer.allocate(netBufferSize / 
2))).isExactlyInstanceOf(IllegalArgumentException.class)
+            .hasMessageContaining("Provided buffer is too small");
+  }
+
+  @Test
   public void handshakeDetectsClosedSocket() throws Exception {
     SocketChannel mockChannel = mock(SocketChannel.class);
     when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);

Reply via email to