This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new c303d01  GEODE-5043: Buffering protobuf output to avoid multiple 
packet sends (#1768)
c303d01 is described below

commit c303d01e408bc903f8ff5ac54b551618a4c5e782
Author: Dan Smith <[email protected]>
AuthorDate: Wed Apr 18 15:54:03 2018 -0700

    GEODE-5043: Buffering protobuf output to avoid multiple packet sends (#1768)
    
    GEODE-5043: Buffering protobuf output to avoid multiple packet sends
    
    Based on the stats all protobuf requests and responses were being sent
    as two packets. Using a BufferedOutputStream to make sure we only send 1
    packet
---
 .../cache/tier/sockets/OriginalServerConnection.java        |  2 ++
 .../cache/tier/sockets/ProtobufServerConnection.java        | 13 +++++++++----
 .../geode/internal/cache/tier/sockets/ServerConnection.java | 11 ++++-------
 .../cache/tier/sockets/ServerConnectionFactory.java         |  2 +-
 .../apache/geode/experimental/driver/ProtobufChannel.java   |  7 +++++--
 .../tier/sockets/OutputCapturingServerConnectionTest.java   |  8 +++++---
 .../cache/tier/sockets/ProtobufServerConnectionTest.java    | 13 +++++++------
 7 files changed, 33 insertions(+), 23 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
index 56700e4..f21f701 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/OriginalServerConnection.java
@@ -47,6 +47,8 @@ public class OriginalServerConnection extends 
ServerConnection {
       SecurityService securityService) {
     super(socket, internalCache, helper, stats, hsTimeout, socketBufferSize, 
communicationModeStr,
         communicationMode, acceptor, securityService);
+
+    initStreams(socket, socketBufferSize, stats);
   }
 
   @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
index 4601bd7..8392eaf 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnection.java
@@ -15,10 +15,10 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
+import java.io.BufferedOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
@@ -41,6 +41,7 @@ public class ProtobufServerConnection extends 
ServerConnection {
   private final ClientProtocolProcessor protocolProcessor;
   private boolean cleanedUp;
   private ClientProxyMembershipID clientProxyMembershipID;
+  private final BufferedOutputStream output;
 
   /**
    * Creates a new <code>ProtobufServerConnection</code> that processes 
messages received from an
@@ -49,11 +50,12 @@ public class ProtobufServerConnection extends 
ServerConnection {
   public ProtobufServerConnection(Socket socket, InternalCache c, 
CachedRegionHelper helper,
       CacheServerStats stats, int hsTimeout, int socketBufferSize, String 
communicationModeStr,
       byte communicationMode, Acceptor acceptor, ClientProtocolProcessor 
clientProtocolProcessor,
-      SecurityService securityService) {
+      SecurityService securityService) throws IOException {
     super(socket, c, helper, stats, hsTimeout, socketBufferSize, 
communicationModeStr,
         communicationMode, acceptor, securityService);
     this.protocolProcessor = clientProtocolProcessor;
 
+    this.output = new BufferedOutputStream(socket.getOutputStream(), 
socketBufferSize);
     setClientProxyMembershipId();
 
     
doHandShake(CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), 0);
@@ -64,12 +66,15 @@ public class ProtobufServerConnection extends 
ServerConnection {
     Socket socket = this.getSocket();
     try {
       InputStream inputStream = socket.getInputStream();
-      OutputStream outputStream = socket.getOutputStream();
 
       InternalCache cache = getCache();
       cache.setReadSerializedForCurrentThread(true);
       try {
-        protocolProcessor.processMessage(inputStream, outputStream);
+        try {
+          protocolProcessor.processMessage(inputStream, output);
+        } finally {
+          output.flush();
+        }
       } finally {
         cache.setReadSerializedForCurrentThread(false);
       }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index ccfb00d..0707e73 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -289,8 +289,9 @@ public abstract class ServerConnection implements Runnable {
 
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
-
-      initStreams(socket, socketBufferSize, stats);
+      theSocket = socket;
+      theSocket.setSendBufferSize(socketBufferSize);
+      theSocket.setReceiveBufferSize(socketBufferSize);
 
       if (isDebugEnabled) {
         logger.debug(
@@ -1480,12 +1481,8 @@ public abstract class ServerConnection implements 
Runnable {
     }
   }
 
-
-  private void initStreams(Socket s, int socketBufferSize, MessageStats 
msgStats) {
+  protected void initStreams(Socket s, int socketBufferSize, MessageStats 
msgStats) {
     try {
-      theSocket = s;
-      theSocket.setSendBufferSize(socketBufferSize);
-      theSocket.setReceiveBufferSize(socketBufferSize);
       if (getAcceptor().isSelector()) {
         // set it on the message to null. This causes Message
         // to fetch it from a thread local. That way we only need
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index c09a1f4..16e1eb3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -78,7 +78,7 @@ public class ServerConnectionFactory {
   private ServerConnection createProtobufServerConnection(Socket socket, 
InternalCache cache,
       CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int 
socketBufferSize,
       String communicationModeStr, byte communicationMode, Acceptor acceptor,
-      SecurityService securityService) {
+      SecurityService securityService) throws IOException {
     ClientProtocolService service =
         getClientProtocolService(cache.getDistributedSystem(), 
acceptor.getServerName());
 
diff --git 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
index afbe909..b309053 100644
--- 
a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
+++ 
b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufChannel.java
@@ -14,6 +14,7 @@
  */
 package org.apache.geode.experimental.driver;
 
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -38,6 +39,7 @@ class ProtobufChannel {
    * Socket to a GemFire server that has Protobuf enabled.
    */
   final Socket socket;
+  final BufferedOutputStream output;
   private final ValueSerializer serializer;
 
   public ProtobufChannel(final Set<InetSocketAddress> locators, String 
username, String password,
@@ -46,6 +48,7 @@ class ProtobufChannel {
     this.serializer = serializer;
     socket = connectToAServer(locators, username, password, keyStorePath, 
trustStorePath, protocols,
         ciphers);
+    output = new BufferedOutputStream(socket.getOutputStream(), 
socket.getSendBufferSize());
   }
 
   public void close() throws IOException {
@@ -181,8 +184,8 @@ class ProtobufChannel {
   }
 
   Message sendRequest(final Message request, MessageTypeCase expectedResult) 
throws IOException {
-    final OutputStream outputStream = socket.getOutputStream();
-    request.writeDelimitedTo(outputStream);
+    request.writeDelimitedTo(output);
+    output.flush();
     Message response = readResponse();
 
     if (!response.getMessageTypeCase().equals(expectedResult)) {
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
index 3fc9970..8c1d6f1 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/OutputCapturingServerConnectionTest.java
@@ -21,11 +21,11 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.UnknownHostException;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -52,6 +52,8 @@ public class OutputCapturingServerConnectionTest {
     Socket socketMock = mock(Socket.class);
     
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
     when(socketMock.isClosed()).thenReturn(true);
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    when(socketMock.getOutputStream()).thenReturn(outputStream);
 
     AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
     ClientProtocolProcessor clientProtocolProcessor = 
mock(ClientProtocolProcessor.class);
@@ -76,7 +78,7 @@ public class OutputCapturingServerConnectionTest {
 
   private ProtobufServerConnection getServerConnection(Socket socketMock,
       ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl 
acceptorStub)
-      throws UnknownHostException {
+      throws IOException {
     ClientHealthMonitor clientHealthMonitorMock = 
mock(ClientHealthMonitor.class);
     
when(acceptorStub.getClientHealthMonitor()).thenReturn(clientHealthMonitorMock);
     InetSocketAddress inetSocketAddressStub = 
InetSocketAddress.createUnresolved("localhost", 9071);
@@ -89,7 +91,7 @@ public class OutputCapturingServerConnectionTest {
     CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
     when(cachedRegionHelper.getCache()).thenReturn(cache);
     return new ProtobufServerConnection(socketMock, cache, cachedRegionHelper,
-        mock(CacheServerStats.class), 0, 0, "",
+        mock(CacheServerStats.class), 0, 1024, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), 
acceptorStub,
         clientProtocolProcessorMock, mock(SecurityService.class));
   }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
index ce829f5..57e6603 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/ProtobufServerConnectionTest.java
@@ -22,11 +22,11 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.UnknownHostException;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -71,7 +71,7 @@ public class ProtobufServerConnectionTest {
   }
 
   @Test
-  public void testClientHealthMonitorRegistration() throws 
UnknownHostException {
+  public void testClientHealthMonitorRegistration() throws IOException {
     AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
 
     ClientProtocolProcessor clientProtocolProcessor = 
mock(ClientProtocolProcessor.class);
@@ -94,7 +94,7 @@ public class ProtobufServerConnectionTest {
   }
 
   @Test
-  public void testDoOneMessageNotifiesClientHealthMonitor() throws 
UnknownHostException {
+  public void testDoOneMessageNotifiesClientHealthMonitor() throws IOException 
{
     AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
     ClientProtocolProcessor clientProtocolProcessor = 
mock(ClientProtocolProcessor.class);
 
@@ -117,7 +117,7 @@ public class ProtobufServerConnectionTest {
 
   private ProtobufServerConnection getServerConnection(Socket socketMock,
       ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl 
acceptorStub)
-      throws UnknownHostException {
+      throws IOException {
     clientHealthMonitorMock = mock(ClientHealthMonitor.class);
     
when(acceptorStub.getClientHealthMonitor()).thenReturn(clientHealthMonitorMock);
     InetSocketAddress inetSocketAddressStub = 
InetSocketAddress.createUnresolved("localhost", 9071);
@@ -130,15 +130,16 @@ public class ProtobufServerConnectionTest {
     CachedRegionHelper cachedRegionHelper = mock(CachedRegionHelper.class);
     when(cachedRegionHelper.getCache()).thenReturn(cache);
     return new ProtobufServerConnection(socketMock, cache, cachedRegionHelper,
-        mock(CacheServerStats.class), 0, 0, "",
+        mock(CacheServerStats.class), 0, 1024, "",
         CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), 
acceptorStub,
         clientProtocolProcessorMock, mock(SecurityService.class));
   }
 
   private ProtobufServerConnection getServerConnection(
       ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl 
acceptorStub)
-      throws UnknownHostException {
+      throws IOException {
     Socket socketMock = mock(Socket.class);
+    when(socketMock.getOutputStream()).thenReturn(new ByteArrayOutputStream());
     return getServerConnection(socketMock, clientProtocolProcessorMock, 
acceptorStub);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to