This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new 120f94a  GEODE-8020: buffer management problems (#5048)
120f94a is described below

commit 120f94a3ee1b7934673978ae9c82f1d3e30cb9c8
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Thu May 7 11:33:02 2020 -0700

    GEODE-8020: buffer management problems (#5048)
    
    * GEODE-8020: buffer management problems
    
    This fixes some buffer handling in MsgStreamerList and alters
    MstStreamer to avoid creating MsgStreamerList and VersionedMsgStreamers
    during normal, non-upgrade, operations.
    
    It also changes NioSslEngine to use synchronization in more places,
    notably the close() method, which was possibly allowing multiple threads to
    change the state of the engine.
    
    * revert unnecessary change to ClusterCommunicationsDUnitTest
    
    * fixing another null version check
    
    * renamed new BufferPool property
    
    * restore logging of ssl exceptions
    
    (cherry picked from commit 7375c591f25bbba413237aed1f56f8a9f70075df)
---
 .../internal/ClusterDistributionManager.java       |   5 +-
 .../apache/geode/internal/cache/properties.html    |   9 +-
 .../org/apache/geode/internal/net/BufferPool.java  |  11 ++-
 .../apache/geode/internal/net/NioSslEngine.java    |   7 +-
 .../org/apache/geode/internal/tcp/Connection.java  |  20 ++--
 .../org/apache/geode/internal/tcp/MsgStreamer.java |  16 ++--
 .../apache/geode/internal/tcp/MsgStreamerList.java |  29 ++----
 .../geode/internal/net/NioSslEngineTest.java       |   5 +
 .../apache/geode/internal/tcp/MsgStreamerTest.java | 101 +++++++++++++++++++++
 9 files changed, 157 insertions(+), 46 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 338a8b7..a5e363c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -608,9 +608,8 @@ public class ClusterDistributionManager implements 
DistributionManager {
     }
     if (member != getDistributionManagerId()) {
       String relationship = areInSameZone(getDistributionManagerId(), member) 
? "" : "not ";
-      Object[] logArgs = new Object[] {member, relationship};
-      logger.info("Member {} is {} equivalent or in the same redundancy zone.",
-          logArgs);
+      logger.info("Member {} is {}equivalent or in the same redundancy zone.",
+          member, relationship);
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html 
b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
index 1f5b88d..0419553 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
@@ -2787,16 +2787,19 @@ TBA
 </dd>
 
 <!-- -------------------------------------------------------  -->
-<dt><strong>p2p.nodirectBuffers</strong></dt>
+<dt><strong>gemfire.BufferPool.useHeapBuffers</strong></dt>
 <dd>
 <em>Public:</em> false
 <p>
 <em>Boolean</em> (default is false)
 <p>
-See <code>org.apache.geode.internal.tcp.TCPConduit#useDirectBuffers</code>.
+See <code>org.apache.geode.internal.net.BufferPool#useDirectBuffers</code>.
 <p>
 <pre>
-  use direct ByteBuffers instead of heap ByteBuffers for NIO operations
+  use java "heap" ByteBuffers instead of direct ByteBuffers for NIO 
operations.  Recommended if TLSv1
+    is being used or if you find you are running out of direct-memory and do 
not want to
+    increase the amount of direct-memory available to the JVM.  Use of heap 
buffers can
+    reduce performance in some cases.
 </pre>
 <p>
 TBA
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index c156c2c..26d069b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -19,13 +19,18 @@ import java.nio.ByteBuffer;
 import java.util.IdentityHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.tcp.Connection;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.util.internal.GeodeGlossary;
 
 public class BufferPool {
   private final DMStats stats;
+  private static final Logger logger = LogService.getLogger();
 
   /**
    * Buffers may be acquired from the Buffers pool
@@ -69,7 +74,8 @@ public class BufferPool {
   /**
    * use direct ByteBuffers instead of heap ByteBuffers for NIO operations
    */
-  public static final boolean useDirectBuffers = 
!Boolean.getBoolean("p2p.nodirectBuffers");
+  public static final boolean useDirectBuffers = 
!Boolean.getBoolean("p2p.nodirectBuffers")
+      || Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + 
"BufferPool.useHeapBuffers");
 
   /**
    * Should only be called by threads that have currently acquired send 
permission.
@@ -183,9 +189,6 @@ public class BufferPool {
           alreadySeen = new IdentityHashMap<>();
         }
         if (alreadySeen.put(ref, ref) != null) {
-          // if it returns non-null then we have already seen this item
-          // so we have worked all the way through the queue once.
-          // So it is time to give up and allocate a new buffer.
           break;
         }
       }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java 
b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2d55fa3..d948ae7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -54,7 +54,7 @@ public class NioSslEngine implements NioFilter {
 
   private final BufferPool bufferPool;
 
-  private volatile boolean closed;
+  private boolean closed;
 
   SSLEngine engine;
 
@@ -212,7 +212,7 @@ public class NioSslEngine implements NioFilter {
     return bufferPool.expandWriteBufferIfNeeded(type, existing, 
desiredCapacity);
   }
 
-  void checkClosed() throws IOException {
+  synchronized void checkClosed() throws IOException {
     if (closed) {
       throw new IOException("NioSslEngine has been closed");
     }
@@ -364,7 +364,7 @@ public class NioSslEngine implements NioFilter {
 
 
   @Override
-  public void close(SocketChannel socketChannel) {
+  public synchronized void close(SocketChannel socketChannel) {
     if (closed) {
       return;
     }
@@ -398,6 +398,7 @@ public class NioSslEngine implements NioFilter {
     } finally {
       bufferPool.releaseBuffer(TRACKED_SENDER, myNetData);
       bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData);
+      myNetData = null;
       this.closed = true;
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 841b86c..48bd1b5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1659,11 +1659,11 @@ public class Connection implements Runnable {
         } catch (IOException e) {
           // "Socket closed" check needed for Solaris jdk 1.4.2_08
           if (!isSocketClosed() && !"Socket 
closed".equalsIgnoreCase(e.getMessage())) {
-            if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
-              logger.debug("{} io exception for {}", p2pReaderName(), this, e);
+            if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
+              logger.info("{} io exception for {}", p2pReaderName(), this, e);
             }
-            if (e.getMessage().contains("interrupted by a call to 
WSACancelBlockingCall")) {
-              if (logger.isDebugEnabled()) {
+            if (logger.isDebugEnabled()) {
+              if (e.getMessage().contains("interrupted by a call to 
WSACancelBlockingCall")) {
                 logger.debug(
                     "{} received unexpected WSACancelBlockingCall exception, 
which may result in a hang",
                     p2pReaderName());
@@ -1760,9 +1760,13 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return msg.contains("forcibly closed")
-        || msg.contains("reset by peer")
-        || msg.contains("connection reset");
+
+    if (e instanceof SSLException && msg.contains("status = closed")) {
+      return true; // engine has been closed - this is normal
+    }
+
+    return (msg.contains("forcibly closed") || msg.contains("reset by peer")
+        || msg.contains("connection reset") || msg.contains("socket is 
closed"));
   }
 
   private static boolean validMsgType(int msgType) {
@@ -3170,7 +3174,7 @@ public class Connection implements Runnable {
     Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " + 
remoteAddr + " "
         + (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? "" 
: "un")
         + "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" + 
dominoNumber : "")
-        + " port=" + socket.getPort());
+        + " local port=" + socket.getLocalPort() + " remote port=" + 
socket.getPort());
   }
 
   private void compactOrResizeBuffer(int messageLength) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 9783397..ae48a46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -155,7 +155,9 @@ public class MsgStreamer extends OutputStream
       int numVersioned = 0;
       for (Object c : cons) {
         con = (Connection) c;
-        if ((version = con.getRemoteVersion()) != null) {
+        version = con.getRemoteVersion();
+        if (version != null
+            && Version.CURRENT_ORDINAL > version.ordinal()) {
           if (versionToConnMap == null) {
             versionToConnMap = new Object2ObjectOpenHashMap();
           }
@@ -181,15 +183,17 @@ public class MsgStreamer extends OutputStream
         if (numCons > numVersioned) {
           // allocating list of numCons size so that as the result of
           // getSentConnections it may not need to be reallocted later
-          final ArrayList<Object> unversionedCons = new 
ArrayList<Object>(numCons);
+          final ArrayList<Object> currentVersionConnections = new 
ArrayList<Object>(numCons);
           for (Object c : cons) {
             con = (Connection) c;
-            if ((version = con.getRemoteVersion()) == null) {
-              unversionedCons.add(con);
+            version = con.getRemoteVersion();
+            if (version == null || version.ordinal() >= 
Version.CURRENT_ORDINAL) {
+              currentVersionConnections.add(con);
             }
           }
-          streamers.add(new MsgStreamer(unversionedCons, msg, directReply, 
stats, sendBufferSize,
-              bufferPool));
+          streamers.add(
+              new MsgStreamer(currentVersionConnections, msg, directReply, 
stats, sendBufferSize,
+                  bufferPool));
         }
         for (ObjectIterator<Object2ObjectMap.Entry> itr =
             versionToConnMap.object2ObjectEntrySet().fastIterator(); 
itr.hasNext();) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
index 3d2446c..08b573c 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
@@ -63,25 +63,16 @@ public class MsgStreamerList implements BaseMsgStreamer {
     for (MsgStreamer streamer : this.streamers) {
       if (ex != null) {
         streamer.release();
-        // TODO: shouldn't we call continue here?
-        // It seems wrong to call writeMessage on a streamer we have just 
released.
-        // But why do we call release on a streamer when we had an exception 
on one
-        // of the previous streamer?
-        // release clears the direct bb and returns it to the pool but leaves
-        // it has the "buffer". THen we call writeMessage and it will use 
"buffer"
-        // that has also been returned to the pool.
-        // I think we only have a MsgStreamerList when a DS has a mix of 
versions
-        // which usually is just during a rolling upgrade so that might be why 
we
-        // haven't noticed this causing a bug.
-      }
-      try {
-        result += streamer.writeMessage();
-        // if there is an exception we need to finish the
-        // loop and release the other streamer's buffers
-      } catch (RuntimeException e) {
-        ex = e;
-      } catch (IOException e) {
-        ioex = e;
+      } else {
+        try {
+          result += streamer.writeMessage();
+          // if there is an exception we need to finish the
+          // loop and release the other streamer's buffers
+        } catch (RuntimeException e) {
+          ex = e;
+        } catch (IOException e) {
+          ioex = e;
+        }
       }
     }
     if (ex != null) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index b42d566..9a2197d 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -86,6 +86,11 @@ public class NioSslEngineTest {
   }
 
   @Test
+  public void engineUsesDirectBuffers() {
+    assertThat(nioSslEngine.myNetData.isDirect()).isTrue();
+  }
+
+  @Test
   public void handshake() throws Exception {
     SocketChannel mockChannel = mock(SocketChannel.class);
     when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
new file mode 100644
index 0000000..8094a37
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.net.ssl.SSLException;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.serialization.Version;
+
+public class MsgStreamerTest {
+  private DMStats stats = mock(DMStats.class);
+  private BufferPool pool = spy(new BufferPool(stats));
+  Connection connection1 = mock(Connection.class);
+  Connection connection2 = mock(Connection.class);
+
+  @Test
+  public void create() {
+    final BaseMsgStreamer msgStreamer = createMsgStreamer(false);
+    assertThat(msgStreamer).isInstanceOf(MsgStreamer.class);
+  }
+
+  @Test
+  public void createWithMixedVersions() {
+    final BaseMsgStreamer msgStreamer = createMsgStreamer(true);
+    assertThat(msgStreamer).isInstanceOf(MsgStreamerList.class);
+  }
+
+  @Test
+  public void streamerListRelease() throws IOException {
+    final MsgStreamerList msgStreamer = (MsgStreamerList) 
createMsgStreamer(true);
+    msgStreamer.writeMessage();
+    verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+  }
+
+  @Test
+  public void streamerListReleaseWithException() throws IOException {
+    final MsgStreamerList msgStreamer = (MsgStreamerList) 
createMsgStreamer(true);
+    // if the first streamer throws an exception while writing the message we 
should still only
+    // release two buffers (one for each streamer)
+    doThrow(new 
SSLException("")).when(connection1).sendPreserialized(any(ByteBuffer.class),
+        any(Boolean.class), any(DistributionMessage.class));
+    msgStreamer.writeMessage();
+    verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+  }
+
+  protected BaseMsgStreamer createMsgStreamer(boolean 
mixedDestinationVersions) {
+
+    InternalDistributedMember member1, member2;
+    member1 = new InternalDistributedMember("localhost", 1234);
+    member2 = new InternalDistributedMember("localhost", 2345);
+
+    DistributionMessage message = new SerialAckedMessage();
+    message.setRecipients(Arrays.asList(member1, member2));
+
+    when(connection1.getRemoteAddress()).thenReturn(member1);
+    when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+    when(connection2.getRemoteAddress()).thenReturn(member2);
+    if (mixedDestinationVersions) {
+      when(connection1.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
+    } else {
+      when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+    }
+    List<Connection> connections = Arrays.asList(connection1, connection2);
+
+    return MsgStreamer.create(connections, message, false, stats, pool);
+  }
+}

Reply via email to