This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push:
new 120f94a GEODE-8020: buffer management problems (#5048)
120f94a is described below
commit 120f94a3ee1b7934673978ae9c82f1d3e30cb9c8
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Thu May 7 11:33:02 2020 -0700
GEODE-8020: buffer management problems (#5048)
* GEODE-8020: buffer management problems
This fixes some buffer handling in MsgStreamerList and alters
MstStreamer to avoid creating MsgStreamerList and VersionedMsgStreamers
during normal, non-upgrade, operations.
It also changes NioSslEngine to use synchronization in more places,
notably the close() method, which was possibly allowing multiple threads to
change the state of the engine.
* revert unnecessary change to ClusterCommunicationsDUnitTest
* fixing another null version check
* renamed new BufferPool property
* restore logging of ssl exceptions
(cherry picked from commit 7375c591f25bbba413237aed1f56f8a9f70075df)
---
.../internal/ClusterDistributionManager.java | 5 +-
.../apache/geode/internal/cache/properties.html | 9 +-
.../org/apache/geode/internal/net/BufferPool.java | 11 ++-
.../apache/geode/internal/net/NioSslEngine.java | 7 +-
.../org/apache/geode/internal/tcp/Connection.java | 20 ++--
.../org/apache/geode/internal/tcp/MsgStreamer.java | 16 ++--
.../apache/geode/internal/tcp/MsgStreamerList.java | 29 ++----
.../geode/internal/net/NioSslEngineTest.java | 5 +
.../apache/geode/internal/tcp/MsgStreamerTest.java | 101 +++++++++++++++++++++
9 files changed, 157 insertions(+), 46 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 338a8b7..a5e363c 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -608,9 +608,8 @@ public class ClusterDistributionManager implements
DistributionManager {
}
if (member != getDistributionManagerId()) {
String relationship = areInSameZone(getDistributionManagerId(), member)
? "" : "not ";
- Object[] logArgs = new Object[] {member, relationship};
- logger.info("Member {} is {} equivalent or in the same redundancy zone.",
- logArgs);
+ logger.info("Member {} is {}equivalent or in the same redundancy zone.",
+ member, relationship);
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
index 1f5b88d..0419553 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/properties.html
@@ -2787,16 +2787,19 @@ TBA
</dd>
<!-- ------------------------------------------------------- -->
-<dt><strong>p2p.nodirectBuffers</strong></dt>
+<dt><strong>gemfire.BufferPool.useHeapBuffers</strong></dt>
<dd>
<em>Public:</em> false
<p>
<em>Boolean</em> (default is false)
<p>
-See <code>org.apache.geode.internal.tcp.TCPConduit#useDirectBuffers</code>.
+See <code>org.apache.geode.internal.net.BufferPool#useDirectBuffers</code>.
<p>
<pre>
- use direct ByteBuffers instead of heap ByteBuffers for NIO operations
+ use java "heap" ByteBuffers instead of direct ByteBuffers for NIO
operations. Recommended if TLSv1
+ is being used or if you find you are running out of direct-memory and do
not want to
+ increase the amount of direct-memory available to the JVM. Use of heap
buffers can
+ reduce performance in some cases.
</pre>
<p>
TBA
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
index c156c2c..26d069b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java
@@ -19,13 +19,18 @@ import java.nio.ByteBuffer;
import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.tcp.Connection;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.util.internal.GeodeGlossary;
public class BufferPool {
private final DMStats stats;
+ private static final Logger logger = LogService.getLogger();
/**
* Buffers may be acquired from the Buffers pool
@@ -69,7 +74,8 @@ public class BufferPool {
/**
* use direct ByteBuffers instead of heap ByteBuffers for NIO operations
*/
- public static final boolean useDirectBuffers =
!Boolean.getBoolean("p2p.nodirectBuffers");
+ public static final boolean useDirectBuffers =
!Boolean.getBoolean("p2p.nodirectBuffers")
+ || Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX +
"BufferPool.useHeapBuffers");
/**
* Should only be called by threads that have currently acquired send
permission.
@@ -183,9 +189,6 @@ public class BufferPool {
alreadySeen = new IdentityHashMap<>();
}
if (alreadySeen.put(ref, ref) != null) {
- // if it returns non-null then we have already seen this item
- // so we have worked all the way through the queue once.
- // So it is time to give up and allocate a new buffer.
break;
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
index 2d55fa3..d948ae7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java
@@ -54,7 +54,7 @@ public class NioSslEngine implements NioFilter {
private final BufferPool bufferPool;
- private volatile boolean closed;
+ private boolean closed;
SSLEngine engine;
@@ -212,7 +212,7 @@ public class NioSslEngine implements NioFilter {
return bufferPool.expandWriteBufferIfNeeded(type, existing,
desiredCapacity);
}
- void checkClosed() throws IOException {
+ synchronized void checkClosed() throws IOException {
if (closed) {
throw new IOException("NioSslEngine has been closed");
}
@@ -364,7 +364,7 @@ public class NioSslEngine implements NioFilter {
@Override
- public void close(SocketChannel socketChannel) {
+ public synchronized void close(SocketChannel socketChannel) {
if (closed) {
return;
}
@@ -398,6 +398,7 @@ public class NioSslEngine implements NioFilter {
} finally {
bufferPool.releaseBuffer(TRACKED_SENDER, myNetData);
bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData);
+ myNetData = null;
this.closed = true;
}
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 841b86c..48bd1b5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -1659,11 +1659,11 @@ public class Connection implements Runnable {
} catch (IOException e) {
// "Socket closed" check needed for Solaris jdk 1.4.2_08
if (!isSocketClosed() && !"Socket
closed".equalsIgnoreCase(e.getMessage())) {
- if (logger.isDebugEnabled() && !isIgnorableIOException(e)) {
- logger.debug("{} io exception for {}", p2pReaderName(), this, e);
+ if (logger.isInfoEnabled() && !isIgnorableIOException(e)) {
+ logger.info("{} io exception for {}", p2pReaderName(), this, e);
}
- if (e.getMessage().contains("interrupted by a call to
WSACancelBlockingCall")) {
- if (logger.isDebugEnabled()) {
+ if (logger.isDebugEnabled()) {
+ if (e.getMessage().contains("interrupted by a call to
WSACancelBlockingCall")) {
logger.debug(
"{} received unexpected WSACancelBlockingCall exception,
which may result in a hang",
p2pReaderName());
@@ -1760,9 +1760,13 @@ public class Connection implements Runnable {
}
msg = msg.toLowerCase();
- return msg.contains("forcibly closed")
- || msg.contains("reset by peer")
- || msg.contains("connection reset");
+
+ if (e instanceof SSLException && msg.contains("status = closed")) {
+ return true; // engine has been closed - this is normal
+ }
+
+ return (msg.contains("forcibly closed") || msg.contains("reset by peer")
+ || msg.contains("connection reset") || msg.contains("socket is
closed"));
}
private static boolean validMsgType(int msgType) {
@@ -3170,7 +3174,7 @@ public class Connection implements Runnable {
Thread.currentThread().setName(THREAD_KIND_IDENTIFIER + " for " +
remoteAddr + " "
+ (sharedResource ? "" : "un") + "shared" + " " + (preserveOrder ? ""
: "un")
+ "ordered" + " uid=" + uniqueId + (dominoNumber > 0 ? " dom #" +
dominoNumber : "")
- + " port=" + socket.getPort());
+ + " local port=" + socket.getLocalPort() + " remote port=" +
socket.getPort());
}
private void compactOrResizeBuffer(int messageLength) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
index 9783397..ae48a46 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java
@@ -155,7 +155,9 @@ public class MsgStreamer extends OutputStream
int numVersioned = 0;
for (Object c : cons) {
con = (Connection) c;
- if ((version = con.getRemoteVersion()) != null) {
+ version = con.getRemoteVersion();
+ if (version != null
+ && Version.CURRENT_ORDINAL > version.ordinal()) {
if (versionToConnMap == null) {
versionToConnMap = new Object2ObjectOpenHashMap();
}
@@ -181,15 +183,17 @@ public class MsgStreamer extends OutputStream
if (numCons > numVersioned) {
// allocating list of numCons size so that as the result of
// getSentConnections it may not need to be reallocted later
- final ArrayList<Object> unversionedCons = new
ArrayList<Object>(numCons);
+ final ArrayList<Object> currentVersionConnections = new
ArrayList<Object>(numCons);
for (Object c : cons) {
con = (Connection) c;
- if ((version = con.getRemoteVersion()) == null) {
- unversionedCons.add(con);
+ version = con.getRemoteVersion();
+ if (version == null || version.ordinal() >=
Version.CURRENT_ORDINAL) {
+ currentVersionConnections.add(con);
}
}
- streamers.add(new MsgStreamer(unversionedCons, msg, directReply,
stats, sendBufferSize,
- bufferPool));
+ streamers.add(
+ new MsgStreamer(currentVersionConnections, msg, directReply,
stats, sendBufferSize,
+ bufferPool));
}
for (ObjectIterator<Object2ObjectMap.Entry> itr =
versionToConnMap.object2ObjectEntrySet().fastIterator();
itr.hasNext();) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
index 3d2446c..08b573c 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamerList.java
@@ -63,25 +63,16 @@ public class MsgStreamerList implements BaseMsgStreamer {
for (MsgStreamer streamer : this.streamers) {
if (ex != null) {
streamer.release();
- // TODO: shouldn't we call continue here?
- // It seems wrong to call writeMessage on a streamer we have just
released.
- // But why do we call release on a streamer when we had an exception
on one
- // of the previous streamer?
- // release clears the direct bb and returns it to the pool but leaves
- // it has the "buffer". THen we call writeMessage and it will use
"buffer"
- // that has also been returned to the pool.
- // I think we only have a MsgStreamerList when a DS has a mix of
versions
- // which usually is just during a rolling upgrade so that might be why
we
- // haven't noticed this causing a bug.
- }
- try {
- result += streamer.writeMessage();
- // if there is an exception we need to finish the
- // loop and release the other streamer's buffers
- } catch (RuntimeException e) {
- ex = e;
- } catch (IOException e) {
- ioex = e;
+ } else {
+ try {
+ result += streamer.writeMessage();
+ // if there is an exception we need to finish the
+ // loop and release the other streamer's buffers
+ } catch (RuntimeException e) {
+ ex = e;
+ } catch (IOException e) {
+ ioex = e;
+ }
}
}
if (ex != null) {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
index b42d566..9a2197d 100644
---
a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
+++
b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java
@@ -86,6 +86,11 @@ public class NioSslEngineTest {
}
@Test
+ public void engineUsesDirectBuffers() {
+ assertThat(nioSslEngine.myNetData.isDirect()).isTrue();
+ }
+
+ @Test
public void handshake() throws Exception {
SocketChannel mockChannel = mock(SocketChannel.class);
when(mockChannel.read(any(ByteBuffer.class))).thenReturn(100, 100, 100, 0);
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
new file mode 100644
index 0000000..8094a37
--- /dev/null
+++
b/geode-core/src/test/java/org/apache/geode/internal/tcp/MsgStreamerTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.tcp;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.net.ssl.SSLException;
+
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.DistributionMessage;
+import org.apache.geode.distributed.internal.SerialAckedMessage;
+import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.serialization.Version;
+
+public class MsgStreamerTest {
+ private DMStats stats = mock(DMStats.class);
+ private BufferPool pool = spy(new BufferPool(stats));
+ Connection connection1 = mock(Connection.class);
+ Connection connection2 = mock(Connection.class);
+
+ @Test
+ public void create() {
+ final BaseMsgStreamer msgStreamer = createMsgStreamer(false);
+ assertThat(msgStreamer).isInstanceOf(MsgStreamer.class);
+ }
+
+ @Test
+ public void createWithMixedVersions() {
+ final BaseMsgStreamer msgStreamer = createMsgStreamer(true);
+ assertThat(msgStreamer).isInstanceOf(MsgStreamerList.class);
+ }
+
+ @Test
+ public void streamerListRelease() throws IOException {
+ final MsgStreamerList msgStreamer = (MsgStreamerList)
createMsgStreamer(true);
+ msgStreamer.writeMessage();
+ verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+ }
+
+ @Test
+ public void streamerListReleaseWithException() throws IOException {
+ final MsgStreamerList msgStreamer = (MsgStreamerList)
createMsgStreamer(true);
+ // if the first streamer throws an exception while writing the message we
should still only
+ // release two buffers (one for each streamer)
+ doThrow(new
SSLException("")).when(connection1).sendPreserialized(any(ByteBuffer.class),
+ any(Boolean.class), any(DistributionMessage.class));
+ msgStreamer.writeMessage();
+ verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
+ }
+
+ protected BaseMsgStreamer createMsgStreamer(boolean
mixedDestinationVersions) {
+
+ InternalDistributedMember member1, member2;
+ member1 = new InternalDistributedMember("localhost", 1234);
+ member2 = new InternalDistributedMember("localhost", 2345);
+
+ DistributionMessage message = new SerialAckedMessage();
+ message.setRecipients(Arrays.asList(member1, member2));
+
+ when(connection1.getRemoteAddress()).thenReturn(member1);
+ when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+ when(connection2.getRemoteAddress()).thenReturn(member2);
+ if (mixedDestinationVersions) {
+ when(connection1.getRemoteVersion()).thenReturn(Version.GEODE_1_12_0);
+ } else {
+ when(connection1.getRemoteVersion()).thenReturn(Version.CURRENT);
+ }
+ List<Connection> connections = Arrays.asList(connection1, connection2);
+
+ return MsgStreamer.create(connections, message, false, stats, pool);
+ }
+}