This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-6733 in repository https://gitbox.apache.org/repos/asf/geode.git
commit a1b95388c76522be199ebb50d8b455e0e6411f3c Author: Bruce Schuchardt <[email protected]> AuthorDate: Fri May 17 15:57:56 2019 -0700 GEODE-6733 Remove mutable static org.apache.geode.internal.net.Buffers.buffersQueue Converted static Buffers class to be a non-static buffer pool. --- .../internal/net/SSLSocketIntegrationTest.java | 4 +- .../distributed/internal/DistributionStats.java | 2 - .../distributed/internal/direct/DirectChannel.java | 3 +- .../internal/net/{Buffers.java => BufferPool.java} | 75 ++++++++++------------ .../org/apache/geode/internal/net/NioFilter.java | 8 +-- .../apache/geode/internal/net/NioPlainEngine.java | 22 +++---- .../apache/geode/internal/net/NioSslEngine.java | 48 +++++++------- .../apache/geode/internal/net/SocketCreator.java | 5 +- .../org/apache/geode/internal/tcp/Connection.java | 70 ++++++-------------- .../apache/geode/internal/tcp/ConnectionTable.java | 8 +++ .../geode/internal/tcp/DirectReplySender.java | 3 +- .../apache/geode/internal/tcp/MsgOutputStream.java | 4 +- .../org/apache/geode/internal/tcp/MsgReader.java | 8 +-- .../org/apache/geode/internal/tcp/MsgStreamer.java | 31 +++++---- .../org/apache/geode/internal/tcp/TCPConduit.java | 5 ++ .../geode/internal/tcp/VersionedMsgStreamer.java | 5 +- .../net/{BuffersTest.java => BufferPoolTest.java} | 26 +++++--- .../geode/internal/net/NioPlainEngineTest.java | 20 +++--- .../geode/internal/net/NioSslEngineTest.java | 18 +++--- .../geode/internal/tcp/ConnectionJUnitTest.java | 5 +- 20 files changed, 173 insertions(+), 197 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java index 8e27671..5a09285 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/net/SSLSocketIntegrationTest.java @@ -216,7 +216,7 @@ public class SSLSocketIntegrationTest { NioSslEngine engine = clusterSocketCreator.handshakeSSLSocketChannel(clientSocket.getChannel(), clusterSocketCreator.createSSLEngine("localhost", 1234), 0, true, - ByteBuffer.allocate(65535), mock(DMStats.class)); + ByteBuffer.allocate(65535), new BufferPool(mock(DMStats.class))); clientChannel.configureBlocking(true); // transmit expected string from Client to Server @@ -264,7 +264,7 @@ public class SSLSocketIntegrationTest { timeoutMillis, false, ByteBuffer.allocate(500), - mock(DMStats.class)); + new BufferPool(mock(DMStats.class))); readMessageFromNIOSSLClient(socket, buffer, engine); readMessageFromNIOSSLClient(socket, buffer, engine); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java index 91c47e2..13a77fb 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionStats.java @@ -25,7 +25,6 @@ import org.apache.geode.annotations.Immutable; import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.internal.NanoTimer; import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.net.Buffers; import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl; import org.apache.geode.internal.util.Breadcrumbs; @@ -954,7 +953,6 @@ public class DistributionStats implements DMStats { // this.replyWaitHistogram = new HistogramStats("ReplyWait", "nanoseconds", f, // new long[] {100000, 200000, 300000, 400000, 500000, 600000, 700000, 800000, 900000, 1000000}, // false); - Buffers.initBufferStats(this); } /** diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java index 7d6d046..ecf37f2 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/direct/DirectChannel.java @@ -373,7 +373,8 @@ public class DirectChannel { DMStats stats = getDMStats(); List<?> sentCons; // used for cons we sent to this time - final BaseMsgStreamer ms = MsgStreamer.create(cons, msg, directReply, stats); + final BaseMsgStreamer ms = + MsgStreamer.create(cons, msg, directReply, stats, getConduit().getBufferPool()); try { startTime = 0; if (ackTimeout > 0) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java similarity index 73% rename from geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java rename to geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java index c77803d..d796ed6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/Buffers.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/BufferPool.java @@ -19,11 +19,12 @@ import java.nio.ByteBuffer; import java.util.IdentityHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.internal.Assert; -public class Buffers { +public class BufferPool { + private final DMStats stats; + /** * Buffers may be acquired from the Buffers pool * or they may be allocated using Buffer.allocate(). This enum is used @@ -34,11 +35,15 @@ public class Buffers { UNTRACKED, TRACKED_SENDER, TRACKED_RECEIVER } + + public BufferPool(DMStats stats) { + this.stats = stats; + } + /** * A list of soft references to byte buffers. */ - @MakeNotStatic - private static final ConcurrentLinkedQueue<BBSoftReference> bufferQueue = + private final ConcurrentLinkedQueue<BBSoftReference> bufferQueue = new ConcurrentLinkedQueue<>(); /** @@ -51,15 +56,15 @@ public class Buffers { * * @return a byte buffer to be used for sending on this connection. */ - public static ByteBuffer acquireSenderBuffer(int size, DMStats stats) { - return acquireBuffer(size, stats, true); + public ByteBuffer acquireSenderBuffer(int size) { + return acquireBuffer(size, true); } - public static ByteBuffer acquireReceiveBuffer(int size, DMStats stats) { - return acquireBuffer(size, stats, false); + public ByteBuffer acquireReceiveBuffer(int size) { + return acquireBuffer(size, false); } - private static ByteBuffer acquireBuffer(int size, DMStats stats, boolean send) { + private ByteBuffer acquireBuffer(int size, boolean send) { ByteBuffer result; if (useDirectBuffers) { IdentityHashMap<BBSoftReference, BBSoftReference> alreadySeen = null; // keys are used like a @@ -109,19 +114,19 @@ public class Buffers { return result; } - public static void releaseSenderBuffer(ByteBuffer bb, DMStats stats) { - releaseBuffer(bb, stats, true); + public void releaseSenderBuffer(ByteBuffer bb) { + releaseBuffer(bb, true); } - public static void releaseReceiveBuffer(ByteBuffer bb, DMStats stats) { - releaseBuffer(bb, stats, false); + public void releaseReceiveBuffer(ByteBuffer bb) { + releaseBuffer(bb, false); } /** * expand a buffer that's currently being read from */ - static ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing, - int desiredCapacity, DMStats stats) { + ByteBuffer expandReadBufferIfNeeded(BufferType type, ByteBuffer existing, + int desiredCapacity) { if (existing.capacity() >= desiredCapacity) { if (existing.position() > 0) { existing.compact(); @@ -129,51 +134,51 @@ public class Buffers { } return existing; } - ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats); + ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity); newBuffer.clear(); newBuffer.put(existing); newBuffer.flip(); - releaseBuffer(type, existing, stats); + releaseBuffer(type, existing); return newBuffer; } /** * expand a buffer that's currently being written to */ - static ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing, - int desiredCapacity, DMStats stats) { + ByteBuffer expandWriteBufferIfNeeded(BufferType type, ByteBuffer existing, + int desiredCapacity) { if (existing.capacity() >= desiredCapacity) { return existing; } - ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity, stats); + ByteBuffer newBuffer = acquireBuffer(type, desiredCapacity); newBuffer.clear(); existing.flip(); newBuffer.put(existing); - releaseBuffer(type, existing, stats); + releaseBuffer(type, existing); return newBuffer; } - static ByteBuffer acquireBuffer(Buffers.BufferType type, int capacity, DMStats stats) { + ByteBuffer acquireBuffer(BufferPool.BufferType type, int capacity) { switch (type) { case UNTRACKED: return ByteBuffer.allocate(capacity); case TRACKED_SENDER: - return Buffers.acquireSenderBuffer(capacity, stats); + return acquireSenderBuffer(capacity); case TRACKED_RECEIVER: - return Buffers.acquireReceiveBuffer(capacity, stats); + return acquireReceiveBuffer(capacity); } throw new IllegalArgumentException("Unexpected buffer type " + type.toString()); } - static void releaseBuffer(Buffers.BufferType type, ByteBuffer buffer, DMStats stats) { + void releaseBuffer(BufferPool.BufferType type, ByteBuffer buffer) { switch (type) { case UNTRACKED: return; case TRACKED_SENDER: - Buffers.releaseSenderBuffer(buffer, stats); + releaseSenderBuffer(buffer); return; case TRACKED_RECEIVER: - Buffers.releaseReceiveBuffer(buffer, stats); + releaseReceiveBuffer(buffer); return; } throw new IllegalArgumentException("Unexpected buffer type " + type.toString()); @@ -183,7 +188,7 @@ public class Buffers { /** * Releases a previously acquired buffer. */ - private static void releaseBuffer(ByteBuffer bb, DMStats stats, boolean send) { + private void releaseBuffer(ByteBuffer bb, boolean send) { if (useDirectBuffers) { BBSoftReference bbRef = new BBSoftReference(bb, send); bufferQueue.offer(bbRef); @@ -196,20 +201,6 @@ public class Buffers { } } - public static void initBufferStats(DMStats stats) { // fixes 46773 - if (useDirectBuffers) { - for (BBSoftReference ref : bufferQueue) { - if (ref.getBB() != null) { - if (ref.getSend()) { // fix bug 46773 - stats.incSenderBufferSize(ref.getSize(), true); - } else { - stats.incReceiverBufferSize(ref.getSize(), true); - } - } - } - } - } - /** * A soft reference that remembers the size of the byte buffer it refers to. TODO Dan - I really * think this should be a weak reference. The JVM doesn't seem to clear soft references if it is diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java index 6cb40ec..8e41ef1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioFilter.java @@ -18,8 +18,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import org.apache.geode.distributed.internal.DMStats; - /** * Prior to transmitting a buffer or processing a received buffer * a NioFilter should be called to wrap (transmit) or unwrap (received) @@ -44,7 +42,7 @@ public interface NioFilter { * This must be invoked before readAtLeast. A new buffer may be returned by this method. */ ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer, - Buffers.BufferType bufferType, DMStats stats); + BufferPool.BufferType bufferType); /** * read at least the indicated amount of bytes from the given @@ -55,8 +53,8 @@ public interface NioFilter { * wrappedBuffer = filter.ensureWrappedCapacity(amount, wrappedBuffer, etc.);<br> * unwrappedBuffer = filter.readAtLeast(channel, amount, wrappedBuffer, etc.) */ - ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer, - DMStats stats) throws IOException; + ByteBuffer readAtLeast(SocketChannel channel, int amount, ByteBuffer wrappedBuffer) + throws IOException; /** * You must invoke this when done reading from the unwrapped buffer diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java index 8a3e3fb..32fa297 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioPlainEngine.java @@ -20,24 +20,22 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import org.apache.logging.log4j.Logger; - -import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.internal.Assert; -import org.apache.geode.internal.logging.LogService; /** * A pass-through implementation of NioFilter. Use this if you don't need * secure communications. */ public class NioPlainEngine implements NioFilter { - private static final Logger logger = LogService.getLogger(); + private final BufferPool bufferPool; int lastReadPosition; int lastProcessedPosition; - public NioPlainEngine() {} + public NioPlainEngine(BufferPool bufferPool) { + this.bufferPool = bufferPool; + } @Override public ByteBuffer wrap(ByteBuffer buffer) { @@ -52,11 +50,11 @@ public class NioPlainEngine implements NioFilter { @Override public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer, - Buffers.BufferType bufferType, DMStats stats) { + BufferPool.BufferType bufferType) { ByteBuffer buffer = wrappedBuffer; if (buffer == null) { - buffer = Buffers.acquireBuffer(bufferType, amount, stats); + buffer = bufferPool.acquireBuffer(bufferType, amount); buffer.clear(); lastProcessedPosition = 0; lastReadPosition = 0; @@ -73,10 +71,10 @@ public class NioPlainEngine implements NioFilter { ByteBuffer oldBuffer = buffer; oldBuffer.limit(lastReadPosition); oldBuffer.position(lastProcessedPosition); - buffer = Buffers.acquireBuffer(bufferType, amount, stats); + buffer = bufferPool.acquireBuffer(bufferType, amount); buffer.clear(); buffer.put(oldBuffer); - Buffers.releaseBuffer(bufferType, oldBuffer, stats); + bufferPool.releaseBuffer(bufferType, oldBuffer); lastReadPosition = buffer.position(); lastProcessedPosition = 0; } @@ -84,8 +82,8 @@ public class NioPlainEngine implements NioFilter { } @Override - public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer, - DMStats stats) throws IOException { + public ByteBuffer readAtLeast(SocketChannel channel, int bytes, ByteBuffer wrappedBuffer) + throws IOException { ByteBuffer buffer = wrappedBuffer; Assert.assertTrue(buffer.capacity() - lastProcessedPosition >= bytes); diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java index dd71d75..9bf969d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/NioSslEngine.java @@ -19,9 +19,8 @@ import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_TASK; import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NEED_UNWRAP; import static javax.net.ssl.SSLEngineResult.Status.BUFFER_OVERFLOW; import static javax.net.ssl.SSLEngineResult.Status.OK; -import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_RECEIVER; -import static org.apache.geode.internal.net.Buffers.BufferType.TRACKED_SENDER; -import static org.apache.geode.internal.net.Buffers.releaseBuffer; +import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_RECEIVER; +import static org.apache.geode.internal.net.BufferPool.BufferType.TRACKED_SENDER; import java.io.EOFException; import java.io.IOException; @@ -41,8 +40,8 @@ import javax.net.ssl.SSLSession; import org.apache.logging.log4j.Logger; import org.apache.geode.GemFireIOException; -import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.net.BufferPool.BufferType; /** @@ -53,7 +52,7 @@ import org.apache.geode.internal.logging.LogService; public class NioSslEngine implements NioFilter { private static final Logger logger = LogService.getLogger(); - private final DMStats stats; + private final BufferPool bufferPool; private volatile boolean closed; @@ -74,14 +73,14 @@ public class NioSslEngine implements NioFilter { */ ByteBuffer handshakeBuffer; - NioSslEngine(SSLEngine engine, DMStats stats) { - this.stats = stats; + NioSslEngine(SSLEngine engine, BufferPool bufferPool) { SSLSession session = engine.getSession(); int appBufferSize = session.getApplicationBufferSize(); int packetBufferSize = engine.getSession().getPacketBufferSize(); this.myNetData = ByteBuffer.allocate(packetBufferSize); this.peerAppData = ByteBuffer.allocate(appBufferSize); this.engine = engine; + this.bufferPool = bufferPool; } /** @@ -97,7 +96,7 @@ public class NioSslEngine implements NioFilter { logger.debug("Allocating new buffer for SSL handshake"); } this.handshakeBuffer = - Buffers.acquireReceiveBuffer(engine.getSession().getPacketBufferSize(), stats); + bufferPool.acquireReceiveBuffer(engine.getSession().getPacketBufferSize()); } else { this.handshakeBuffer = peerNetData; } @@ -154,8 +153,7 @@ public class NioSslEngine implements NioFilter { if (engineResult.getStatus() == BUFFER_OVERFLOW) { peerAppData = - expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2, - stats); + expandWriteBuffer(TRACKED_RECEIVER, peerAppData, peerAppData.capacity() * 2); } break; @@ -172,7 +170,7 @@ public class NioSslEngine implements NioFilter { case BUFFER_OVERFLOW: myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, - myNetData.capacity() * 2, stats); + myNetData.capacity() * 2); break; case OK: myNetData.flip(); @@ -216,9 +214,9 @@ public class NioSslEngine implements NioFilter { return true; } - ByteBuffer expandWriteBuffer(Buffers.BufferType type, ByteBuffer existing, - int desiredCapacity, DMStats stats) { - return Buffers.expandWriteBufferIfNeeded(type, existing, desiredCapacity, stats); + ByteBuffer expandWriteBuffer(BufferType type, ByteBuffer existing, + int desiredCapacity) { + return bufferPool.expandWriteBufferIfNeeded(type, existing, desiredCapacity); } void checkClosed() { @@ -248,7 +246,7 @@ public class NioSslEngine implements NioFilter { if (remaining < (appData.remaining() * 2)) { int newCapacity = expandedCapacity(appData, myNetData); - myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity, stats); + myNetData = expandWriteBuffer(TRACKED_SENDER, myNetData, newCapacity); } SSLEngineResult wrapResult = engine.wrap(appData, myNetData); @@ -303,27 +301,27 @@ public class NioSslEngine implements NioFilter { void expandPeerAppData(ByteBuffer wrappedBuffer) { if (peerAppData.capacity() - peerAppData.position() < 2 * wrappedBuffer.remaining()) { peerAppData = - Buffers.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, - expandedCapacity(wrappedBuffer, peerAppData), stats); + bufferPool.expandWriteBufferIfNeeded(TRACKED_RECEIVER, peerAppData, + expandedCapacity(wrappedBuffer, peerAppData)); } } @Override public ByteBuffer ensureWrappedCapacity(int amount, ByteBuffer wrappedBuffer, - Buffers.BufferType bufferType, DMStats stats) { + BufferType bufferType) { ByteBuffer buffer = wrappedBuffer; int requiredSize = engine.getSession().getPacketBufferSize(); if (buffer == null) { - buffer = Buffers.acquireBuffer(bufferType, requiredSize, stats); + buffer = bufferPool.acquireBuffer(bufferType, requiredSize); } else if (buffer.capacity() < requiredSize) { - buffer = Buffers.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize, stats); + buffer = bufferPool.expandWriteBufferIfNeeded(bufferType, buffer, requiredSize); } return buffer; } @Override public ByteBuffer readAtLeast(SocketChannel channel, int bytes, - ByteBuffer wrappedBuffer, DMStats stats) throws IOException { + ByteBuffer wrappedBuffer) throws IOException { if (peerAppData.capacity() > bytes) { // we already have a buffer that's big enough if (peerAppData.capacity() - peerAppData.position() < bytes) { @@ -332,7 +330,7 @@ public class NioSslEngine implements NioFilter { } } else { peerAppData = - Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes, this.stats); + bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, bytes); } while (peerAppData.remaining() < bytes) { @@ -367,7 +365,7 @@ public class NioSslEngine implements NioFilter { // for TTLS the app-data buffers do not need to be tracked direct-buffers since we // do not use them for I/O operations peerAppData = - Buffers.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount, this.stats); + bufferPool.expandReadBufferIfNeeded(TRACKED_RECEIVER, peerAppData, amount); return peerAppData; } @@ -405,8 +403,8 @@ public class NioSslEngine implements NioFilter { } catch (IOException e) { throw new GemFireIOException("exception closing SSL session", e); } finally { - releaseBuffer(TRACKED_SENDER, myNetData, stats); - releaseBuffer(TRACKED_RECEIVER, peerAppData, stats); + bufferPool.releaseBuffer(TRACKED_SENDER, myNetData); + bufferPool.releaseBuffer(TRACKED_RECEIVER, peerAppData); this.closed = true; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java index 412c423..bc136f5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/net/SocketCreator.java @@ -84,7 +84,6 @@ import org.apache.geode.annotations.internal.MakeNotStatic; import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.cache.wan.GatewayTransportFilter; import org.apache.geode.distributed.ClientSocketFactory; -import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.distributed.internal.DistributionConfigImpl; import org.apache.geode.distributed.internal.InternalDistributedSystem; @@ -940,7 +939,7 @@ public class SocketCreator { int timeout, boolean clientSocket, ByteBuffer peerNetBuffer, - DMStats stats) + BufferPool bufferPool) throws IOException { engine.setUseClientMode(clientSocket); while (!socketChannel.finishConnect()) { @@ -954,7 +953,7 @@ public class SocketCreator { } } - NioSslEngine nioSslEngine = new NioSslEngine(engine, stats); + NioSslEngine nioSslEngine = new NioSslEngine(engine, bufferPool); boolean blocking = socketChannel.isBlocking(); if (blocking) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 2ba313e..a9cb8d9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -18,8 +18,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_PEER import java.io.DataInputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; @@ -78,7 +76,7 @@ import org.apache.geode.internal.Version; import org.apache.geode.internal.alerting.AlertingAction; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThread; -import org.apache.geode.internal.net.Buffers; +import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.net.NioPlainEngine; import org.apache.geode.internal.net.SocketCreator; @@ -593,7 +591,7 @@ public class Connection implements Runnable { bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK; int allocSize = bytes.length; ByteBuffer bb; - if (Buffers.useDirectBuffers) { + if (BufferPool.useDirectBuffers) { bb = ByteBuffer.allocateDirect(allocSize); } else { bb = ByteBuffer.allocate(allocSize); @@ -637,7 +635,7 @@ public class Connection implements Runnable { if (this.isReceiver) { DistributionConfig cfg = owner.getConduit().config; ByteBuffer bb; - if (Buffers.useDirectBuffers) { + if (BufferPool.useDirectBuffers) { bb = ByteBuffer.allocateDirect(128); } else { bb = ByteBuffer.allocate(128); @@ -1200,7 +1198,7 @@ public class Connection implements Runnable { private BatchBufferFlusher batchFlusher; private void createBatchSendBuffer() { - if (Buffers.useDirectBuffers) { + if (BufferPool.useDirectBuffers) { this.fillBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE); this.sendBatchBuffer = ByteBuffer.allocateDirect(BATCH_BUFFER_SIZE); } else { @@ -1613,10 +1611,14 @@ public class Connection implements Runnable { if (tmp != null) { this.inputBuffer = null; final DMStats stats = this.owner.getConduit().getStats(); - Buffers.releaseReceiveBuffer(tmp, stats); + getBufferPool().releaseReceiveBuffer(tmp); } } + BufferPool getBufferPool() { + return owner.getBufferPool(); + } + private String p2pReaderName() { StringBuilder sb = new StringBuilder(64); if (this.isReceiver) { @@ -1840,9 +1842,9 @@ public class Connection implements Runnable { || (inputBuffer.capacity() < packetBufferSize)) { // TLS has a minimum input buffer size constraint if (inputBuffer != null) { - Buffers.releaseReceiveBuffer(inputBuffer, getConduit().getStats()); + getBufferPool().releaseReceiveBuffer(inputBuffer); } - inputBuffer = Buffers.acquireReceiveBuffer(packetBufferSize, getConduit().getStats()); + inputBuffer = getBufferPool().acquireReceiveBuffer(packetBufferSize); } if (channel.socket().getReceiveBufferSize() < packetBufferSize) { channel.socket().setReceiveBufferSize(packetBufferSize); @@ -1851,9 +1853,10 @@ public class Connection implements Runnable { channel.socket().setSendBufferSize(packetBufferSize); } ioFilter = getConduit().getSocketCreator().handshakeSSLSocketChannel(channel, engine, - getConduit().idleConnectionTimeout, clientSocket, inputBuffer, getConduit().getStats()); + getConduit().idleConnectionTimeout, clientSocket, inputBuffer, + getBufferPool()); } else { - ioFilter = new NioPlainEngine(); + ioFilter = new NioPlainEngine(getBufferPool()); } } @@ -1958,42 +1961,6 @@ public class Connection implements Runnable { } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DE_MIGHT_IGNORE") - void readFully(InputStream input, byte[] buffer, int len) throws IOException { - int bytesSoFar = 0; - while (bytesSoFar < len) { - this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); - try { - synchronized (stateLock) { - connectionState = STATE_READING; - } - int bytesThisTime = input.read(buffer, bytesSoFar, len - bytesSoFar); - if (bytesThisTime < 0) { - this.readerShuttingDown = true; - try { - requestClose("Stream read returned non-positive length"); - } catch (Exception ignored) { - } - return; - } - bytesSoFar += bytesThisTime; - } catch (InterruptedIOException io) { - // Current thread has been interrupted. Regard it similar to an EOF - this.readerShuttingDown = true; - try { - requestClose("Current thread interrupted"); - } catch (Exception ignored) { - } - Thread.currentThread().interrupt(); - this.owner.getConduit().getCancelCriterion().checkCancelInProgress(null); - } finally { - synchronized (stateLock) { - connectionState = STATE_IDLE; - } - } - } // while - } - /** * sends a serialized message to the other end of this connection. This is used by the * DirectChannel in GemFire when the message is going to be sent to multiple recipients. @@ -2011,7 +1978,7 @@ public class Connection implements Runnable { return; } final boolean origSocketInUse = this.socketInUse; - byte originalState = -1; + byte originalState; synchronized (stateLock) { originalState = this.connectionState; this.connectionState = STATE_SENDING; @@ -2237,7 +2204,6 @@ public class Connection implements Runnable { ck.setBuffer(oldBuffer); } else { // old buffer was not large enough - oldBuffer = null; ByteBuffer newbb = ByteBuffer.allocate(newBytes); newbb.put(buffer); newbb.flip(); @@ -2787,7 +2753,7 @@ public class Connection implements Runnable { if (allocSize == -1) { allocSize = this.owner.getConduit().tcpBufferSize; } - inputBuffer = Buffers.acquireReceiveBuffer(allocSize, this.owner.getConduit().getStats()); + inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize); } return inputBuffer; } @@ -3398,13 +3364,13 @@ public class Connection implements Runnable { logger.info("Allocating larger network read buffer, new size is {} old size was {}.", allocSize, oldBufferSize); ByteBuffer oldBuffer = inputBuffer; - inputBuffer = Buffers.acquireReceiveBuffer(allocSize, stats); + inputBuffer = getBufferPool().acquireReceiveBuffer(allocSize); if (oldBuffer != null) { int oldByteCount = oldBuffer.remaining(); inputBuffer.put(oldBuffer); inputBuffer.position(oldByteCount); - Buffers.releaseReceiveBuffer(oldBuffer, stats); + getBufferPool().releaseReceiveBuffer(oldBuffer); } } else { if (inputBuffer.position() != 0) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java index f3a4432..50c3cf8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java @@ -48,6 +48,7 @@ import org.apache.geode.internal.SystemTimer; import org.apache.geode.internal.alerting.AlertingAction; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingExecutors; +import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.SocketCloser; /** @@ -124,6 +125,8 @@ public class ConnectionTable { */ private final TCPConduit owner; + private final BufferPool bufferPool; + /** * true if this table is no longer in use */ @@ -199,6 +202,7 @@ public class ConnectionTable { this.threadConnectionMap = new ConcurrentHashMap(); this.p2pReaderThreadPool = createThreadPoolForIO(conduit.getDM().getSystem().isShareSockets()); this.socketCloser = new SocketCloser(); + this.bufferPool = new BufferPool(owner.getStats()); } private Executor createThreadPoolForIO(boolean conserveSockets) { @@ -611,6 +615,10 @@ public class ConnectionTable { return owner; } + public BufferPool getBufferPool() { + return bufferPool; + } + public boolean isClosed() { return this.closed; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java index 73d1582..dbb828d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java @@ -65,7 +65,8 @@ class DirectReplySender implements ReplySender { } ArrayList<Connection> conns = new ArrayList<Connection>(1); conns.add(conn); - MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS); + MsgStreamer ms = (MsgStreamer) MsgStreamer.create(conns, msg, false, DUMMY_STATS, + conn.getBufferPool()); try { ms.writeMessage(); ConnectExceptions ce = ms.getConnectExceptions(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java index 2d767b8..98669b3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgOutputStream.java @@ -22,7 +22,7 @@ import java.nio.ByteBuffer; import org.apache.geode.DataSerializer; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.ObjToByteArraySerializer; -import org.apache.geode.internal.net.Buffers; +import org.apache.geode.internal.net.BufferPool; /** * MsgOutputStream should no longer be used except in Connection to do the handshake. Otherwise @@ -38,7 +38,7 @@ public class MsgOutputStream extends OutputStream implements ObjToByteArraySeria * The caller of this constructor is responsible for managing the allocated instance. */ public MsgOutputStream(int allocSize) { - if (Buffers.useDirectBuffers) { + if (BufferPool.useDirectBuffers) { this.buffer = ByteBuffer.allocateDirect(allocSize); } else { this.buffer = ByteBuffer.allocate(allocSize); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java index afb0272..0a33428 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java @@ -27,7 +27,7 @@ import org.apache.geode.internal.Assert; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.Version; import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.net.Buffers; +import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.NioFilter; /** @@ -125,13 +125,13 @@ public class MsgReader { private ByteBuffer readAtLeast(int bytes) throws IOException { peerNetData = ioFilter.ensureWrappedCapacity(bytes, peerNetData, - Buffers.BufferType.TRACKED_RECEIVER, getStats()); - return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData, getStats()); + BufferPool.BufferType.TRACKED_RECEIVER); + return ioFilter.readAtLeast(conn.getSocket().getChannel(), bytes, peerNetData); } public void close() { if (peerNetData != null) { - Buffers.releaseReceiveBuffer(peerNetData, getStats()); + conn.getBufferPool().releaseReceiveBuffer(peerNetData); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java index 22a385b..d42bef1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgStreamer.java @@ -25,7 +25,6 @@ import java.util.List; import it.unimi.dsi.fastutil.objects.Object2ObjectMap; import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import it.unimi.dsi.fastutil.objects.ObjectIterator; -import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; import org.apache.geode.distributed.internal.DMStats; @@ -37,8 +36,7 @@ import org.apache.geode.internal.HeapDataOutputStream; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.ObjToByteArraySerializer; import org.apache.geode.internal.Version; -import org.apache.geode.internal.logging.LogService; -import org.apache.geode.internal.net.Buffers; +import org.apache.geode.internal.net.BufferPool; /** * <p> @@ -52,13 +50,13 @@ import org.apache.geode.internal.net.Buffers; public class MsgStreamer extends OutputStream implements ObjToByteArraySerializer, BaseMsgStreamer, ByteBufferWriter { - private static final Logger logger = LogService.getLogger(); - /** * List of connections to send this msg to. */ private final List<?> cons; + private final BufferPool bufferPool; + /** * Any exceptions that happen during sends */ @@ -98,7 +96,7 @@ public class MsgStreamer extends OutputStream MsgIdGenerator.release(this.msgId); this.buffer.clear(); this.overflowBuf = null; - Buffers.releaseSenderBuffer(this.buffer, this.stats); + bufferPool.releaseSenderBuffer(this.buffer); } /** @@ -126,15 +124,16 @@ public class MsgStreamer extends OutputStream * now be used. */ MsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats, - int sendBufferSize) { + int sendBufferSize, BufferPool bufferPool) { this.stats = stats; this.msg = msg; this.cons = cons; - this.buffer = Buffers.acquireSenderBuffer(sendBufferSize, stats); + this.buffer = bufferPool.acquireSenderBuffer(sendBufferSize); this.buffer.clear(); this.buffer.position(Connection.MSG_HEADER_BYTES); this.msgId = MsgIdGenerator.NO_MSG_ID; this.directReply = directReply; + this.bufferPool = bufferPool; startSerialization(); } @@ -144,7 +143,7 @@ public class MsgStreamer extends OutputStream * List of MsgStreamer objects. */ public static BaseMsgStreamer create(List<?> cons, final DistributionMessage msg, - final boolean directReply, final DMStats stats) { + final boolean directReply, final DMStats stats, BufferPool bufferPool) { final Connection firstCon = (Connection) cons.get(0); // split into different versions if required Version version; @@ -170,7 +169,8 @@ public class MsgStreamer extends OutputStream } } if (versionToConnMap == null) { - return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize()); + return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(), + bufferPool); } else { // if there is a versioned stream created, then split remaining // connections to unversioned stream @@ -187,7 +187,8 @@ public class MsgStreamer extends OutputStream unversionedCons.add(con); } } - streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize)); + streamers.add(new MsgStreamer(unversionedCons, msg, directReply, stats, sendBufferSize, + bufferPool)); } for (ObjectIterator<Object2ObjectMap.Entry> itr = versionToConnMap.object2ObjectEntrySet().fastIterator(); itr.hasNext();) { @@ -195,15 +196,17 @@ public class MsgStreamer extends OutputStream Object ver = entry.getKey(); Object l = entry.getValue(); streamers.add(new VersionedMsgStreamer((List<?>) l, msg, directReply, stats, - sendBufferSize, (Version) ver)); + bufferPool, sendBufferSize, (Version) ver)); } return new MsgStreamerList(streamers); } } else if ((version = firstCon.getRemoteVersion()) == null) { - return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize()); + return new MsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(), + bufferPool); } else { // create a single VersionedMsgStreamer - return new VersionedMsgStreamer(cons, msg, directReply, stats, firstCon.getSendBufferSize(), + return new VersionedMsgStreamer(cons, msg, directReply, stats, bufferPool, + firstCon.getSendBufferSize(), version); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java index 97d748f..699c706 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/TCPConduit.java @@ -47,6 +47,7 @@ import org.apache.geode.internal.alerting.AlertingAction; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.LoggingThread; import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.internal.security.SecurableCommunicationChannel; @@ -975,6 +976,10 @@ public class TCPConduit implements Runnable { return useSSL; } + public BufferPool getBufferPool() { + return this.conTable.getBufferPool(); + } + protected class Stopper extends CancelCriterion { @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java index 4fe7b32..72b68f1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java @@ -21,6 +21,7 @@ import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.internal.Version; import org.apache.geode.internal.VersionedDataStream; +import org.apache.geode.internal.net.BufferPool; /** * An extension of {@link MsgStreamer} that implements {@link VersionedDataStream}. @@ -32,8 +33,8 @@ class VersionedMsgStreamer extends MsgStreamer implements VersionedDataStream { private final Version version; VersionedMsgStreamer(List<?> cons, DistributionMessage msg, boolean directReply, DMStats stats, - int sendBufferSize, Version version) { - super(cons, msg, directReply, stats, sendBufferSize); + BufferPool bufferPool, int sendBufferSize, Version version) { + super(cons, msg, directReply, stats, sendBufferSize, bufferPool); this.version = version; } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java similarity index 84% rename from geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java rename to geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java index 96a4ac6..cc441e4 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/BuffersTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/BufferPoolTest.java @@ -21,11 +21,19 @@ import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; +import org.junit.Before; import org.junit.Test; import org.apache.geode.distributed.internal.DMStats; -public class BuffersTest { +public class BufferPoolTest { + + private BufferPool bufferPool; + + @Before + public void setup() { + bufferPool = new BufferPool(mock(DMStats.class)); + } @Test public void expandBuffer() throws Exception { @@ -50,8 +58,7 @@ public class BuffersTest { private void createAndVerifyNewWriteBuffer(ByteBuffer buffer, boolean useDirectBuffer) { buffer.position(buffer.capacity()); ByteBuffer newBuffer = - Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500, - mock(DMStats.class)); + bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500); assertEquals(buffer.position(), newBuffer.position()); assertEquals(500, newBuffer.capacity()); newBuffer.flip(); @@ -66,8 +73,7 @@ public class BuffersTest { buffer.position(0); buffer.limit(256); ByteBuffer newBuffer = - Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, 500, - mock(DMStats.class)); + bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, 500); assertEquals(0, newBuffer.position()); assertEquals(500, newBuffer.capacity()); for (int i = 0; i < 256; i++) { @@ -84,8 +90,9 @@ public class BuffersTest { ByteBuffer buffer = ByteBuffer.allocate(33842); buffer.position(7); buffer.limit(16384); - ByteBuffer newBuffer = Buffers.expandReadBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, - 40899, mock(DMStats.class)); + ByteBuffer newBuffer = + bufferPool.expandReadBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, + 40899); assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899); // buffer should be ready to read the same amount of data assertThat(newBuffer.position()).isEqualTo(0); @@ -98,8 +105,9 @@ public class BuffersTest { ByteBuffer buffer = ByteBuffer.allocate(33842); buffer.position(16384); buffer.limit(buffer.capacity()); - ByteBuffer newBuffer = Buffers.expandWriteBufferIfNeeded(Buffers.BufferType.UNTRACKED, buffer, - 40899, mock(DMStats.class)); + ByteBuffer newBuffer = + bufferPool.expandWriteBufferIfNeeded(BufferPool.BufferType.UNTRACKED, buffer, + 40899); assertThat(newBuffer.capacity()).isGreaterThanOrEqualTo(40899); // buffer should have the same amount of data as the old one assertThat(newBuffer.position()).isEqualTo(16384); diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java index 5fe4def..133d827 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioPlainEngineTest.java @@ -36,17 +36,15 @@ import org.apache.geode.distributed.internal.DMStats; public class NioPlainEngineTest { - private static final int netBufferSize = 10000; - private static final int appBufferSize = 20000; - private DMStats mockStats; private NioPlainEngine nioEngine; + private BufferPool bufferPool; @Before public void setUp() throws Exception { mockStats = mock(DMStats.class); - - nioEngine = new NioPlainEngine(); + bufferPool = new BufferPool(mockStats); + nioEngine = new NioPlainEngine(bufferPool); } @Test @@ -60,13 +58,13 @@ public class NioPlainEngineTest { @Test @Ignore("Pending fix of GEODE-6733 to remove static from Buffers implementation") public void ensureWrappedCapacity() { - ByteBuffer wrappedBuffer = Buffers.acquireReceiveBuffer(100, mockStats); + ByteBuffer wrappedBuffer = bufferPool.acquireReceiveBuffer(100); verify(mockStats, times(1)).incReceiverBufferSize(any(Integer.class), any(Boolean.class)); wrappedBuffer.put(new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); nioEngine.lastReadPosition = 10; int requestedCapacity = 210; ByteBuffer result = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer, - Buffers.BufferType.TRACKED_RECEIVER, mockStats); + BufferPool.BufferType.TRACKED_RECEIVER); verify(mockStats, times(2)).incReceiverBufferSize(any(Integer.class), any(Boolean.class)); assertThat(result.capacity()).isGreaterThanOrEqualTo(requestedCapacity); assertThat(result).isNotSameAs(wrappedBuffer); @@ -90,7 +88,7 @@ public class NioPlainEngineTest { nioEngine.lastReadPosition = consumedDataPresentInBuffer + unconsumedDataPresentInBuffer; ByteBuffer result = wrappedBuffer = nioEngine.ensureWrappedCapacity(requestedCapacity, wrappedBuffer, - Buffers.BufferType.UNTRACKED, mockStats); + BufferPool.BufferType.UNTRACKED); assertThat(result.capacity()).isEqualTo(requestedCapacity + unconsumedDataPresentInBuffer); assertThat(result).isSameAs(wrappedBuffer); // make sure that data was transferred to the new buffer @@ -121,14 +119,14 @@ public class NioPlainEngineTest { nioEngine.lastReadPosition = 10; - ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats); + ByteBuffer data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); assertThat(data.position()).isEqualTo(0); assertThat(data.limit()).isEqualTo(amountToRead); assertThat(nioEngine.lastReadPosition).isEqualTo(individualRead * 3 + preexistingBytes); assertThat(nioEngine.lastProcessedPosition).isEqualTo(amountToRead); - data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats); + data = nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); verify(mockChannel, times(5)).read(any(ByteBuffer.class)); // at end of last readAtLeast data assertThat(data.position()).isEqualTo(amountToRead); @@ -152,7 +150,7 @@ public class NioPlainEngineTest { nioEngine.lastReadPosition = 10; - nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats); + nioEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java index b12df09..7236895 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/net/NioSslEngineTest.java @@ -80,7 +80,7 @@ public class NioSslEngineTest { mockStats = mock(DMStats.class); - nioSslEngine = new NioSslEngine(mockEngine, mockStats); + nioSslEngine = new NioSslEngine(mockEngine, new BufferPool(mockStats)); spyNioSslEngine = spy(nioSslEngine); } @@ -113,8 +113,8 @@ public class NioSslEngineTest { verify(mockEngine, atLeast(2)).getHandshakeStatus(); verify(mockEngine, times(3)).wrap(any(ByteBuffer.class), any(ByteBuffer.class)); verify(mockEngine, times(3)).unwrap(any(ByteBuffer.class), any(ByteBuffer.class)); - verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(Buffers.BufferType.class), - any(ByteBuffer.class), any(Integer.class), any(DMStats.class)); + verify(spyNioSslEngine, times(2)).expandWriteBuffer(any(BufferPool.BufferType.class), + any(ByteBuffer.class), any(Integer.class)); verify(spyNioSslEngine, times(1)).handleBlockingTasks(); verify(mockChannel, times(3)).read(any(ByteBuffer.class)); } @@ -228,8 +228,8 @@ public class NioSslEngineTest { ByteBuffer wrappedBuffer = spyNioSslEngine.wrap(appData); - verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(Buffers.BufferType.class), - any(ByteBuffer.class), any(Integer.class), any(DMStats.class)); + verify(spyNioSslEngine, times(1)).expandWriteBuffer(any(BufferPool.BufferType.class), + any(ByteBuffer.class), any(Integer.class)); appData.flip(); assertThat(wrappedBuffer).isEqualTo(appData); verify(spyNioSslEngine, times(1)).handleBlockingTasks(); @@ -376,14 +376,14 @@ public class NioSslEngineTest { public void ensureWrappedCapacityOfSmallMessage() { ByteBuffer buffer = ByteBuffer.allocate(netBufferSize); assertThat( - nioSslEngine.ensureWrappedCapacity(10, buffer, Buffers.BufferType.UNTRACKED, mockStats)) + nioSslEngine.ensureWrappedCapacity(10, buffer, BufferPool.BufferType.UNTRACKED)) .isEqualTo(buffer); } @Test public void ensureWrappedCapacityWithNoBuffer() { assertThat( - nioSslEngine.ensureWrappedCapacity(10, null, Buffers.BufferType.UNTRACKED, mockStats) + nioSslEngine.ensureWrappedCapacity(10, null, BufferPool.BufferType.UNTRACKED) .capacity()) .isEqualTo(netBufferSize); } @@ -415,7 +415,7 @@ public class NioSslEngineTest { testSSLEngine.addReturnResult(new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); nioSslEngine.engine = testSSLEngine; - ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats); + ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); assertThat(data.position()).isEqualTo(0); assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); @@ -459,7 +459,7 @@ public class NioSslEngineTest { new SSLEngineResult(OK, NEED_UNWRAP, 0, 0)); // 130 + 60 bytes = 190 nioSslEngine.engine = testSSLEngine; - ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer, mockStats); + ByteBuffer data = nioSslEngine.readAtLeast(mockChannel, amountToRead, wrappedBuffer); verify(mockChannel, times(3)).read(isA(ByteBuffer.class)); assertThat(data.position()).isEqualTo(0); assertThat(data.limit()).isEqualTo(individualRead * 3 + preexistingBytes); diff --git a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java index 73cf06c..854685f 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/tcp/ConnectionJUnitTest.java @@ -31,6 +31,7 @@ import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.distributed.internal.membership.MembershipManager; +import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.SocketCloser; import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.test.junit.categories.MembershipTest; @@ -52,10 +53,12 @@ public class ConnectionJUnitTest { DistributionManager distMgr = mock(DistributionManager.class); MembershipManager membership = mock(MembershipManager.class); TCPConduit conduit = mock(TCPConduit.class); + DMStats stats = mock(DMStats.class); // mock the connection table and conduit when(table.getConduit()).thenReturn(conduit); + when(table.getBufferPool()).thenReturn(new BufferPool(stats)); CancelCriterion stopper = mock(CancelCriterion.class); when(stopper.cancelInProgress()).thenReturn(null); @@ -67,7 +70,7 @@ public class ConnectionJUnitTest { // mock the distribution manager and membership manager when(distMgr.getMembershipManager()).thenReturn(membership); when(conduit.getDM()).thenReturn(distMgr); - when(conduit.getStats()).thenReturn(mock(DMStats.class)); + when(conduit.getStats()).thenReturn(stats); when(table.getDM()).thenReturn(distMgr); SocketCloser closer = mock(SocketCloser.class); when(table.getSocketCloser()).thenReturn(closer);
