This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new c303d01 GEODE-5043: Buffering protobuf output to avoid multiple
packet sends (#1768)
c303d01 is described below
commit c303d01e408bc903f8ff5ac54b551618a4c5e782
Author: Dan Smith <[email protected]>
AuthorDate: Wed Apr 18 15:54:03 2018 -0700
GEODE-5043: Buffering protobuf output to avoid multiple packet sends (#1768)
GEODE-5043: Buffering protobuf output to avoid multiple packet sends
Based on the stats all protobuf requests and responses were being sent
as two packets. Using a BufferedOutputStream to make sure we only send 1
packet
---
.../cache/tier/sockets/OriginalServerConnection.java | 2 ++
.../cache/tier/sockets/ProtobufServerConnection.java | 13 +++++++++----
.../geode/internal/cache/tier/sockets/ServerConnection.java | 11 ++++-------
.../cache/tier/sockets/ServerConnectionFactory.java | 2 +-
.../apache/geode/experimental/driver/ProtobufChannel.java | 7 +++++--
.../tier/sockets/OutputCapturingServerConnectionTest.java | 8 +++++---
.../cache/tier/sockets/ProtobufServerConnectionTest.java | 13 +++++++------
7 files changed, 33 insertions(+), 23 deletions(-)
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
index 56700e4..f21f701 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
@@ -47,6 +47,8 @@ public class OriginalServerConnection extends
ServerConnection {
SecurityService securityService) {
super(socket, internalCache, helper, stats, hsTimeout, socketBufferSize,
communicationModeStr,
communicationMode, acceptor, securityService);
+
+ initStreams(socket, socketBufferSize, stats);
}
@Override
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
index 4601bd7..8392eaf 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
@@ -15,10 +15,10 @@
package org.apache.geode.internal.cache.tier.sockets;
+import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -41,6 +41,7 @@ public class ProtobufServerConnection extends
ServerConnection {
private final ClientProtocolProcessor protocolProcessor;
private boolean cleanedUp;
private ClientProxyMembershipID clientProxyMembershipID;
+ private final BufferedOutputStream output;
/**
* Creates a new <code>ProtobufServerConnection</code> that processes
messages received from an
@@ -49,11 +50,12 @@ public class ProtobufServerConnection extends
ServerConnection {
public ProtobufServerConnection(Socket socket, InternalCache c,
CachedRegionHelper helper,
CacheServerStats stats, int hsTimeout, int socketBufferSize, String
communicationModeStr,
byte communicationMode, Acceptor acceptor, ClientProtocolProcessor
clientProtocolProcessor,
- SecurityService securityService) {
+ SecurityService securityService) throws IOException {
super(socket, c, helper, stats, hsTimeout, socketBufferSize,
communicationModeStr,
communicationMode, acceptor, securityService);
this.protocolProcessor = clientProtocolProcessor;
+ this.output = new BufferedOutputStream(socket.getOutputStream(),
socketBufferSize);
setClientProxyMembershipId();
doHandShake(CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), 0);
@@ -64,12 +66,15 @@ public class ProtobufServerConnection extends
ServerConnection {
Socket socket = this.getSocket();
try {
InputStream inputStream = socket.getInputStream();
- OutputStream outputStream = socket.getOutputStream();
InternalCache cache = getCache();
cache.setReadSerializedForCurrentThread(true);
try {
- protocolProcessor.processMessage(inputStream, outputStream);
+ try {
+ protocolProcessor.processMessage(inputStream, output);
+ } finally {
+ output.flush();
+ }
} finally {
cache.setReadSerializedForCurrentThread(false);
}
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ccfb00d..0707e73 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -289,8 +289,9 @@ public abstract class ServerConnection implements Runnable {
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
-
- initStreams(socket, socketBufferSize, stats);
+ theSocket = socket;
+ theSocket.setSendBufferSize(socketBufferSize);
+ theSocket.setReceiveBufferSize(socketBufferSize);
if (isDebugEnabled) {
logger.debug(
@@ -1480,12 +1481,8 @@ public abstract class ServerConnection implements
Runnable {
}
}
-
- private void initStreams(Socket s, int socketBufferSize, MessageStats
msgStats) {
+ protected void initStreams(Socket s, int socketBufferSize, MessageStats
msgStats) {
try {
- theSocket = s;
- theSocket.setSendBufferSize(socketBufferSize);
- theSocket.setReceiveBufferSize(socketBufferSize);
if (getAcceptor().isSelector()) {
// set it on the message to null. This causes Message
// to fetch it from a thread local. That way we only need
diff --git
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index c09a1f4..16e1eb3 100644
---
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -78,7 +78,7 @@ public class ServerConnectionFactory {
private ServerConnection createProtobufServerConnection(Socket socket,
InternalCache cache,
CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int
socketBufferSize,
String communicationModeStr, byte communicationMode, Acceptor acceptor,
- SecurityService securityService) {
+ SecurityService securityService) throws IOException {
ClientProtocolService service =
getClientProtocolService(cache.getDistributedSystem(),
acceptor.getServerName());
diff --git
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
index afbe909..b309053 100644
---
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
+++
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.experimental.driver;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -38,6 +39,7 @@ class ProtobufChannel {
* Socket to a GemFire server that has Protobuf enabled.
*/
final Socket socket;
+ final BufferedOutputStream output;
private final ValueSerializer serializer;
public ProtobufChannel(final Set<InetSocketAddress> locators, String
username, String password,
@@ -46,6 +48,7 @@ class ProtobufChannel {
this.serializer = serializer;
socket = connectToAServer(locators, username, password, keyStorePath,
trustStorePath, protocols,
ciphers);
+ output = new BufferedOutputStream(socket.getOutputStream(),
socket.getSendBufferSize());
}
public void close() throws IOException {
@@ -181,8 +184,8 @@ class ProtobufChannel {
}
Message sendRequest(final Message request, MessageTypeCase expectedResult)
throws IOException {
- final OutputStream outputStream = socket.getOutputStream();
- request.writeDelimitedTo(outputStream);
+ request.writeDelimitedTo(output);
+ output.flush();
Message response = readResponse();
if (!response.getMessageTypeCase().equals(expectedResult)) {
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
index 3fc9970..8c1d6f1 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
@@ -21,11 +21,11 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.UnknownHostException;
import org.junit.Rule;
import org.junit.Test;
@@ -52,6 +52,8 @@ public class OutputCapturingServerConnectionTest {
Socket socketMock = mock(Socket.class);
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
when(socketMock.isClosed()).thenReturn(true);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ when(socketMock.getOutputStream()).thenReturn(outputStream);
AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
ClientProtocolProcessor clientProtocolProcessor =
mock(ClientProtocolProcessor.class);
@@ -76,7 +78,7 @@ public class OutputCapturingServerConnectionTest {
private ProtobufServerConnection getServerConnection(Socket socketMock,
ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl
acceptorStub)
- throws UnknownHostException {
+ throws IOException {
ClientHealthMonitor clientHealthMonitorMock =
mock(ClientHealthMonitor.class);
when(acceptorStub.getClientHealthMonitor()).thenReturn(clientHealthMonitorMock);
InetSocketAddress inetSocketAddressStub =
InetSocketAddress.createUnresolved("localhost", 9071);
@@ -89,7 +91,7 @@ public class OutputCapturingServerConnectionTest {
CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
when(cachedRegionHelper.getCache()).thenReturn(cache);
return new ProtobufServerConnection(socketMock, cache, cachedRegionHelper,
- mock(CacheServerStats.class), 0, 0, "",
+ mock(CacheServerStats.class), 0, 1024, "",
CommunicationMode.ProtobufClientServerProtocol.getModeNumber(),
acceptorStub,
clientProtocolProcessorMock, mock(SecurityService.class));
}
diff --git
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
index ce829f5..57e6603 100644
---
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
+++
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
@@ -22,11 +22,11 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.UnknownHostException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -71,7 +71,7 @@ public class ProtobufServerConnectionTest {
}
@Test
- public void testClientHealthMonitorRegistration() throws
UnknownHostException {
+ public void testClientHealthMonitorRegistration() throws IOException {
AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
ClientProtocolProcessor clientProtocolProcessor =
mock(ClientProtocolProcessor.class);
@@ -94,7 +94,7 @@ public class ProtobufServerConnectionTest {
}
@Test
- public void testDoOneMessageNotifiesClientHealthMonitor() throws
UnknownHostException {
+ public void testDoOneMessageNotifiesClientHealthMonitor() throws IOException
{
AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
ClientProtocolProcessor clientProtocolProcessor =
mock(ClientProtocolProcessor.class);
@@ -117,7 +117,7 @@ public class ProtobufServerConnectionTest {
private ProtobufServerConnection getServerConnection(Socket socketMock,
ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl
acceptorStub)
- throws UnknownHostException {
+ throws IOException {
clientHealthMonitorMock = mock(ClientHealthMonitor.class);
when(acceptorStub.getClientHealthMonitor()).thenReturn(clientHealthMonitorMock);
InetSocketAddress inetSocketAddressStub =
InetSocketAddress.createUnresolved("localhost", 9071);
@@ -130,15 +130,16 @@ public class ProtobufServerConnectionTest {
CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
when(cachedRegionHelper.getCache()).thenReturn(cache);
return new ProtobufServerConnection(socketMock, cache, cachedRegionHelper,
- mock(CacheServerStats.class), 0, 0, "",
+ mock(CacheServerStats.class), 0, 1024, "",
CommunicationMode.ProtobufClientServerProtocol.getModeNumber(),
acceptorStub,
clientProtocolProcessorMock, mock(SecurityService.class));
}
private ProtobufServerConnection getServerConnection(
ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl
acceptorStub)
- throws UnknownHostException {
+ throws IOException {
Socket socketMock = mock(Socket.class);
+ when(socketMock.getOutputStream()).thenReturn(new ByteArrayOutputStream());
return getServerConnection(socketMock, clientProtocolProcessorMock,
acceptorStub);
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].