This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f3f07a0 GEODE-8767: NullPointerException in TCPConduit.getBufferPool
due to conTable being null (#5962)
f3f07a0 is described below
commit f3f07a006c30f54fd4ae84309c4c4ede4af22f36
Author: Bruce Schuchardt <[email protected]>
AuthorDate: Fri Jan 29 08:07:17 2021 -0800
GEODE-8767: NullPointerException in TCPConduit.getBufferPool due to
conTable being null (#5962)
A NPE was being thrown in TCPConduit.getBufferPool() because the conTable
instance
variable is nulled out when the TCPConduit is stopped. This commit
moves the buffer pool from TCPConduit's conTable object up to the enclosing
DirectChannel object. This removes the need for DirectChannel to ask
TCPConduit for the buffer pool.
---
.../distributed/internal/direct/DirectChannel.java | 22 ++++++++++-----
.../apache/geode/internal/tcp/ConnectionTable.java | 2 +-
.../org/apache/geode/internal/tcp/TCPConduit.java | 11 +++++---
.../apache/geode/internal/tcp/TCPConduitTest.java | 31 +++++++++++++++++-----
4 files changed, 48 insertions(+), 18 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
index c195655..fc8800f 100644
---
a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
+++
b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java
@@ -49,6 +49,7 @@ import
org.apache.geode.distributed.internal.membership.api.MessageListener;
import org.apache.geode.internal.cache.DirectReplyMessage;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.tcp.BaseMsgStreamer;
import org.apache.geode.internal.tcp.ConnectExceptions;
import org.apache.geode.internal.tcp.Connection;
@@ -71,6 +72,8 @@ public class DirectChannel {
/** this is the conduit used for communications */
private final transient TCPConduit conduit;
private final ClusterDistributionManager dm;
+ private final DMStats stats;
+ private final BufferPool bufferPool;
private volatile boolean disconnected = true;
@@ -112,6 +115,8 @@ public class DirectChannel {
throws ConnectionException {
this.receiver = listener;
this.dm = dm;
+ this.stats = dm.getStats();
+ this.bufferPool = new BufferPool(stats);
DistributionConfig dc = dm.getConfig();
this.address = initAddress(dc);
@@ -137,7 +142,7 @@ public class DirectChannel {
props.setProperty("membership_port_range_start", "" + range[0]);
props.setProperty("membership_port_range_end", "" + range[1]);
- this.conduit = new TCPConduit(mgr, port, address, isBindAddress, this,
props);
+ this.conduit = new TCPConduit(mgr, port, address, isBindAddress, this,
bufferPool, props);
disconnected = false;
disconnectCompleted = false;
logger.info("GemFire P2P Listener started on {}",
@@ -184,6 +189,13 @@ public class DirectChannel {
/**
+ * Returns the buffer pool used for direct-memory byte buffers in this
DirectChannel
+ */
+ public BufferPool getBufferPool() {
+ return bufferPool;
+ }
+
+ /**
* Sends a msg to a list of destinations. This code does some special
optimizations to stream
* large messages
*
@@ -295,7 +307,7 @@ public class DirectChannel {
List<?> sentCons; // used for cons we sent to this time
final BaseMsgStreamer ms =
- MsgStreamer.create(cons, msg, directReply, stats,
getConduit().getBufferPool());
+ MsgStreamer.create(cons, msg, directReply, stats, bufferPool);
try {
startTime = 0;
if (ackTimeout > 0) {
@@ -520,11 +532,7 @@ public class DirectChannel {
* Returns null if no stats available.
*/
public DMStats getDMStats() {
- if (dm != null) {
- return dm.getStats(); // fix for bug#34004
- } else {
- return null;
- }
+ return stats;
}
/**
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index df9a4a5..614b502 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -207,7 +207,7 @@ public class ConnectionTable {
threadConnectionMap = new ConcurrentHashMap();
p2pReaderThreadPool =
createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets());
socketCloser = new SocketCloser();
- bufferPool = new BufferPool(owner.getStats());
+ bufferPool = conduit.getBufferPool();
}
private Executor createThreadPoolForIO(boolean conserveSockets) {
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
index 6f5edda..4d6d9c8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java
@@ -112,6 +112,8 @@ public class TCPConduit implements Runnable {
*/
private final boolean useSSL;
+ private final BufferPool bufferPool;
+
/**
* The socket producer used by the cluster
*/
@@ -217,8 +219,8 @@ public class TCPConduit implements Runnable {
* </pre>
*/
public TCPConduit(Membership mgr, int port, InetAddress address, boolean
isBindAddress,
- DirectChannel receiver, Properties props) throws ConnectionException {
- this(mgr, port, address, isBindAddress, receiver, props,
ConnectionTable::create,
+ DirectChannel receiver, BufferPool bufferPool, Properties props) throws
ConnectionException {
+ this(mgr, port, address, isBindAddress, receiver, bufferPool, props,
ConnectionTable::create,
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER),
() -> {
try {
@@ -232,7 +234,7 @@ public class TCPConduit implements Runnable {
@VisibleForTesting
TCPConduit(Membership mgr, int port, InetAddress address, boolean
isBindAddress,
- DirectChannel receiver, Properties props,
+ DirectChannel receiver, BufferPool bufferPool, Properties props,
Function<TCPConduit, ConnectionTable> connectionTableFactory,
SocketCreator socketCreator,
Runnable localHostValidation, boolean startAcceptor) throws
ConnectionException {
parseProperties(props);
@@ -241,6 +243,7 @@ public class TCPConduit implements Runnable {
this.isBindAddress = isBindAddress;
this.port = port;
directChannel = receiver;
+ this.bufferPool = bufferPool;
stats = null;
config = null;
membership = mgr;
@@ -946,7 +949,7 @@ public class TCPConduit implements Runnable {
}
public BufferPool getBufferPool() {
- return conTable.getBufferPool();
+ return bufferPool;
}
public CancelCriterion getCancelCriterion() {
diff --git
a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
index c0bf0a5..392a599 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/TCPConduitTest.java
@@ -46,6 +46,7 @@ import
org.apache.geode.distributed.internal.direct.DirectChannel;
import
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.Membership;
import org.apache.geode.internal.inet.LocalHostUtil;
+import org.apache.geode.internal.net.BufferPool;
import org.apache.geode.internal.net.SSLConfig;
import org.apache.geode.internal.net.SocketCreator;
@@ -73,10 +74,23 @@ public class TCPConduitTest {
}
@Test
+ public void closedConduitDoesNotThrowNPEWhenAskedForBufferPool() {
+ directChannel.getDM(); // Mockito demands that this mock be used in this
test
+ TCPConduit tcpConduit =
+ new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
+ new Properties(),
+ TCPConduit -> connectionTable, socketCreator, doNothing(), false);
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ tcpConduit.stop(null);
+ assertThat(tcpConduit.getBufferPool()).isNotNull();
+ }
+
+ @Test
public void
getConnectionThrowsAlertingIOException_ifCaughtIOException_whileAlerting()
throws Exception {
TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel, new
Properties(),
+ new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
+ new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
@@ -99,7 +113,8 @@ public class TCPConduitTest {
@Test
public void getConnectionRethrows_ifCaughtIOException_whileNotAlerting()
throws Exception {
TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel, new
Properties(),
+ new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
+ new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
Connection connection = mock(Connection.class);
@@ -123,7 +138,8 @@ public class TCPConduitTest {
@Test
public void
getConnectionRethrows_ifCaughtIOException_whenMemberDoesNotExist() throws
Exception {
TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel, new
Properties(),
+ new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
+ new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
@@ -143,7 +159,8 @@ public class TCPConduitTest {
@Test
public void getConnectionRethrows_ifCaughtIOException_whenMemberIsShunned()
throws Exception {
TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel, new
Properties(),
+ new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
+ new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
@@ -166,7 +183,8 @@ public class TCPConduitTest {
public void
getConnectionThrowsDistributedSystemDisconnectedException_ifCaughtIOException_whenShutdownIsInProgress()
throws Exception {
TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel, new
Properties(),
+ new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
+ new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))
@@ -191,7 +209,8 @@ public class TCPConduitTest {
public void
getConnectionThrowsDistributedSystemDisconnectedException_ifCaughtIOException_whenShutdownIsInProgress_andCancelIsInProgress()
throws Exception {
TCPConduit tcpConduit =
- new TCPConduit(membership, 0, localHost, false, directChannel, new
Properties(),
+ new TCPConduit(membership, 0, localHost, false, directChannel,
mock(BufferPool.class),
+ new Properties(),
TCPConduit -> connectionTable, socketCreator, doNothing(), false);
InternalDistributedMember member = mock(InternalDistributedMember.class);
doThrow(new IOException("Cannot form connection to alert listener"))