GEODE-2582: Add get request; add serializer to new protocol handler.

Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/9973c493
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/9973c493
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/9973c493

Branch: refs/heads/feature/GEODE-2580
Commit: 9973c4938a0c27f0a53aedf3460995a47ce78427
Parents: 511e2a3
Author: Galen OSullivan <gosulli...@pivotal.io>
Authored: Thu May 18 17:17:09 2017 -0700
Committer: Udo Kohlmeyer <ukohlme...@pivotal.io>
Committed: Mon May 22 11:27:03 2017 -0700

----------------------------------------------------------------------
 .../client/NewClientProtocolTestClient.java     |  6 +-
 .../client/ProtobufProtocolMessageHandler.java  | 49 ++++++++++++----
 .../geode/protocol/client/MessageUtils.java     | 15 +++++
 .../client/ProtobufProtocolIntegrationTest.java | 62 ++++++++++++++++++--
 ...rotobufSerializationDeserializationTest.java |  4 +-
 .../sockets/ClientProtocolMessageHandler.java   |  5 +-
 .../cache/tier/sockets/ServerConnection.java    |  3 +-
 7 files changed, 120 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
----------------------------------------------------------------------
diff --git 
a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
 
b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
index a4476e1..834d2f6 100644
--- 
a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
+++ 
b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/NewClientProtocolTestClient.java
@@ -43,6 +43,8 @@ public class NewClientProtocolTestClient implements 
AutoCloseable {
     socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
     inputStream = socketChannel.socket().getInputStream();
     outputStream = socketChannel.socket().getOutputStream();
+
+    sendHeader(outputStream);
   }
 
   @Override
@@ -55,15 +57,13 @@ public class NewClientProtocolTestClient implements 
AutoCloseable {
   }
 
   public Message blockingSendMessage(Message message) throws IOException {
-    sendHeader(outputStream);
-
     message.writeDelimitedTo(outputStream);
     outputStream.flush();
 
     return ClientProtocol.Message.parseDelimitedFrom(inputStream);
   }
 
-  void parseResponse(Message response) {
+  void printResponse(Message response) {
     System.out.println("response = " + response.toString());
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
 
b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
index a6993c4..10ad7e9 100644
--- 
a/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
+++ 
b/geode-client-protobuf/src/main/java/org/apache/geode/protocol/client/ProtobufProtocolMessageHandler.java
@@ -16,7 +16,6 @@
 package org.apache.geode.protocol.client;
 
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Parser;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.Region;
@@ -24,10 +23,11 @@ import org.apache.geode.cache.TimeoutException;
 import 
org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.protocol.protobuf.BasicTypes;
+import org.apache.geode.protocol.protobuf.RegionAPI;
 import org.apache.geode.protocol.protobuf.RegionAPI.GetRequest;
-import org.apache.geode.protocol.protobuf.RegionAPI.GetResponse;
 import org.apache.geode.protocol.protobuf.RegionAPI.PutResponse;
 import org.apache.geode.serialization.Deserializer;
+import org.apache.geode.serialization.Serializer;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
@@ -49,7 +49,7 @@ public class ProtobufProtocolMessageHandler implements 
ClientProtocolMessageHand
 
   @Override
   public void receiveMessage(InputStream inputStream, OutputStream 
outputStream,
-      Deserializer deserializer, Cache cache) throws IOException {
+                             Deserializer deserializer, Serializer serializer, 
Cache cache) throws IOException {
     final Message message = Message.parseDelimitedFrom(inputStream);
     // can be null at EOF, see Parser.parseDelimitedFrom(java.io.InputStream)
     if (message == null) {
@@ -61,14 +61,23 @@ public class ProtobufProtocolMessageHandler implements 
ClientProtocolMessageHand
       logger.error(() -> "Got message of type response: " + 
ErrorMessageFromMessage(message));
     }
 
-    Request request = message.getRequest();
-    Message putResponseMessage = doPutRequest(request.getPutRequest(), 
deserializer, cache);
+    Message responseMessage = null;
 
-    putResponseMessage.writeDelimitedTo(outputStream);
+    Request request = message.getRequest();
+    Request.RequestAPICase requestAPICase = request.getRequestAPICase();
+    if (requestAPICase == Request.RequestAPICase.GETREQUEST) {
+      responseMessage = doGetRequest(request.getGetRequest(), deserializer, 
serializer, cache);
+    } else if (requestAPICase == Request.RequestAPICase.PUTREQUEST) {
+      responseMessage = doPutRequest(request.getPutRequest(), deserializer, 
cache);
+    } else {
+      // TODO
+    }
+    if (responseMessage != null) {
+    responseMessage.writeDelimitedTo(outputStream);
+    }
   }
 
   private Message doPutRequest(PutRequest request, Deserializer 
dataDeserializer, Cache cache) {
-    logger.error("Doing put request.");
     final String regionName = request.getRegionName();
     final BasicTypes.Entry entry = request.getEntry();
     final ByteString key = entry.getKey().getKey();
@@ -80,7 +89,7 @@ public class ProtobufProtocolMessageHandler implements 
ClientProtocolMessageHand
           dataDeserializer.deserialize(value.toByteArray()));
       return putResponseWithStatus(true);
     } catch (TimeoutException | CacheWriterException ex) {
-      logger.error("Caught normal-ish exception doing region put", ex);
+      logger.warn("Caught normal-ish exception doing region put", ex);
       return putResponseWithStatus(false);
     }
   }
@@ -91,9 +100,27 @@ public class ProtobufProtocolMessageHandler implements 
ClientProtocolMessageHand
         .build();
   }
 
-  private GetResponse doGetRequest(GetRequest request, Deserializer 
deserializer, Cache cache) {
-    // TODO
-    return null;
+  private Message doGetRequest(GetRequest request, Deserializer deserializer, 
Serializer serializer, Cache cache) {
+    String regionName = request.getRegionName();
+    BasicTypes.Key key = request.getKey();
+    byte[] keyBytes = key.getKey().toByteArray();
+    Region<Object, Object> region = cache.getRegion(regionName);
+
+    Object returnValue = region.get(deserializer.deserialize(keyBytes));
+
+    if (returnValue == null) {
+      return getResponseWithValue(new byte[0]);
+    } else {
+      // TODO types in the region?
+      return getResponseWithValue(serializer.serialize(returnValue));
+    }
+  }
+
+  private Message getResponseWithValue(byte[] value) {
+    return Message.newBuilder()
+      
.setResponse(Response.newBuilder().setGetResponse(RegionAPI.GetResponse.newBuilder()
+        
.setResult(BasicTypes.Value.newBuilder().setValue(ByteString.copyFrom(value)))))
+      .build();
   }
 
   public ProtobufProtocolMessageHandler() {}

http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
----------------------------------------------------------------------
diff --git 
a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
 
b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
index 02e2a58..4991467 100644
--- 
a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
+++ 
b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/MessageUtils.java
@@ -82,6 +82,21 @@ public class MessageUtils {
     return message.build();
   }
 
+  public static ClientProtocol.Message makeGetMessageFor(String region, String 
key) {
+    Random random = new Random();
+    ClientProtocol.MessageHeader.Builder messageHeader =
+      
ClientProtocol.MessageHeader.newBuilder().setCorrelationId(random.nextInt());
+
+    BasicTypes.Key.Builder keyBuilder =
+      BasicTypes.Key.newBuilder().setKey(ByteString.copyFromUtf8(key));
+
+    RegionAPI.GetRequest.Builder getRequest = 
RegionAPI.GetRequest.newBuilder().setRegionName(region).setKey(keyBuilder);
+    ClientProtocol.Request.Builder request = 
ClientProtocol.Request.newBuilder().setGetRequest(getRequest);
+
+    return 
ClientProtocol.Message.newBuilder().setMessageHeader(messageHeader).setRequest(request)
+      .build();
+  }
+
   private static byte[] createByteArrayOfSize(int msgSize) {
     byte[] array = new byte[msgSize];
     for (int i = 0; i < msgSize; i++) {

http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
 
b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
index 5d92cf0..b3864a2 100644
--- 
a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
+++ 
b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufProtocolIntegrationTest.java
@@ -15,11 +15,13 @@
 
 package org.apache.geode.protocol.client;
 
+import com.google.protobuf.ByteString;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.protocol.protobuf.BasicTypes;
 import org.apache.geode.protocol.protobuf.ClientProtocol;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 import org.junit.Test;
@@ -28,12 +30,15 @@ import org.junit.experimental.categories.Category;
 import java.io.IOException;
 import java.util.Properties;
 
+import static 
org.apache.geode.protocol.protobuf.ClientProtocol.Message.MessageTypeCase.RESPONSE;
+import static 
org.apache.geode.protocol.protobuf.ClientProtocol.Response.ResponseAPICase.GETRESPONSE;
+import static 
org.apache.geode.protocol.protobuf.ClientProtocol.Response.ResponseAPICase.PUTRESPONSE;
 import static org.junit.Assert.*;
 
 @Category(IntegrationTest.class)
 public class ProtobufProtocolIntegrationTest {
   @Test
-  public void testRoundTripClientCommunicationWorks() throws IOException {
+  public void testRoundTripPutRequest() throws IOException {
     try (Cache cache = createCacheOnPort(40404);
         NewClientProtocolTestClient client = new 
NewClientProtocolTestClient("localhost", 40404)) {
       final String testRegion = "testRegion";
@@ -44,11 +49,11 @@ public class ProtobufProtocolIntegrationTest {
       ClientProtocol.Message message =
           MessageUtils.makePutMessageFor(testRegion, testKey, testValue);
       ClientProtocol.Message response = client.blockingSendMessage(message);
-      client.parseResponse(response);
+      client.printResponse(response);
 
-      assertEquals(response.getMessageTypeCase(), 
ClientProtocol.Message.MessageTypeCase.RESPONSE);
-      assertEquals(response.getResponse().getResponseAPICase(),
-          ClientProtocol.Response.ResponseAPICase.PUTRESPONSE);
+      assertEquals(RESPONSE, response.getMessageTypeCase());
+      assertEquals(PUTRESPONSE,
+      response.getResponse().getResponseAPICase());
       assertTrue(response.getResponse().getPutResponse().getSuccess());
 
       assertEquals(1, region.size());
@@ -58,6 +63,53 @@ public class ProtobufProtocolIntegrationTest {
   }
 
   @Test
+  public void testRoundTripEmptyGetRequest() throws IOException {
+    try (Cache cache = createCacheOnPort(40404);
+         NewClientProtocolTestClient client = new 
NewClientProtocolTestClient("localhost", 40404)) {
+      final String testRegion = "testRegion";
+      final String testKey = "testKey";
+      Region<Object, Object> region = 
cache.createRegionFactory().create("testRegion");
+
+      ClientProtocol.Message message = 
MessageUtils.makeGetMessageFor(testRegion, testKey);
+      ClientProtocol.Message response = client.blockingSendMessage(message);
+
+      assertEquals(RESPONSE, response.getMessageTypeCase());
+      assertEquals(GETRESPONSE,
+        response.getResponse().getResponseAPICase());
+      BasicTypes.Value value = 
response.getResponse().getGetResponse().getResult();
+
+      assertTrue(value.getValue().isEmpty());
+    }
+  }
+
+  @Test
+  public void testRoundTripNonEmptyGetRequest() throws IOException {
+    try (Cache cache = createCacheOnPort(40404);
+         NewClientProtocolTestClient client = new 
NewClientProtocolTestClient("localhost", 40404)) {
+      final String testRegion = "testRegion";
+      final String testKey = "testKey";
+      final String testValue = "testValue";
+      Region<Object, Object> region = 
cache.createRegionFactory().create("testRegion");
+
+
+      ClientProtocol.Message putMessage =
+        MessageUtils.makePutMessageFor(testRegion, testKey, testValue);
+      ClientProtocol.Message putResponse = 
client.blockingSendMessage(putMessage);
+      client.printResponse(putResponse);
+
+      ClientProtocol.Message getMessage = 
MessageUtils.makeGetMessageFor(testRegion, testKey);
+      ClientProtocol.Message getResponse = 
client.blockingSendMessage(getMessage);
+
+      assertEquals(RESPONSE, getResponse.getMessageTypeCase());
+      assertEquals(GETRESPONSE,
+        getResponse.getResponse().getResponseAPICase());
+      BasicTypes.Value value = 
getResponse.getResponse().getGetResponse().getResult();
+
+      assertEquals(value.getValue().toStringUtf8(), testValue);
+    }
+  }
+
+  @Test
   public void startCache() throws IOException {
     try (Cache cache = createCacheOnPort(40404)) {
       while (true) {

http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
----------------------------------------------------------------------
diff --git 
a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
 
b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
index 8cf36ca..05d2216 100644
--- 
a/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
+++ 
b/geode-client-protobuf/src/test/java/org/apache/geode/protocol/client/ProtobufSerializationDeserializationTest.java
@@ -66,7 +66,7 @@ public class ProtobufSerializationDeserializationTest {
 
     ProtobufProtocolMessageHandler newClientProtocol = new 
ProtobufProtocolMessageHandler();
     
newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message),
-        mockOutputStream, deserializer, mockCache);
+        mockOutputStream, deserializer, SerializationType.STRING.serializer, 
mockCache);
 
     verify(mockRegion).put(testKey.getBytes(), testValue.getBytes());
   }
@@ -86,7 +86,7 @@ public class ProtobufSerializationDeserializationTest {
 
     ProtobufProtocolMessageHandler newClientProtocol = new 
ProtobufProtocolMessageHandler();
     
newClientProtocol.receiveMessage(MessageUtils.loadMessageIntoInputStream(message),
 outputStream,
-        deserializer, mockCache);
+        deserializer, SerializationType.STRING.serializer, mockCache);
 
     ClientProtocol.Message responseMessage = ClientProtocol.Message
         .parseDelimitedFrom(new 
ByteArrayInputStream(outputStream.toByteArray()));

http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
index aa6d4cb..e7e75bf 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -17,12 +17,13 @@ package org.apache.geode.internal.cache.tier.sockets;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.serialization.Deserializer;
+import org.apache.geode.serialization.Serializer;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
 public interface ClientProtocolMessageHandler {
-  void receiveMessage(InputStream inputStream, OutputStream outputStream, 
Deserializer serializer,
-      Cache cache) throws IOException;
+  void receiveMessage(InputStream inputStream, OutputStream outputStream, 
Deserializer deserializer,
+                      Serializer serializer, Cache cache) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/9973c493/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index e58e213..58f1709 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -957,7 +957,8 @@ public class ServerConnection implements Runnable {
         OutputStream outputStream = socket.getOutputStream();
         // TODO serialization types?
         newClientProtocol.receiveMessage(inputStream, outputStream,
-            SerializationType.STRING.deserializer, this.getCache());
+            SerializationType.STRING.deserializer, 
SerializationType.STRING.serializer,
+          this.getCache());
       } catch (IOException e) {
         // TODO?
       }

Reply via email to