Repository: geode Updated Branches: refs/heads/develop cc8aacb43 -> cd1a32357
GEODE-2997: New flow getAll/putAll. This now closes #629 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/cd1a3235 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/cd1a3235 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/cd1a3235 Branch: refs/heads/develop Commit: cd1a3235761610a5454e215f6ceb096e6fd6df77 Parents: cc8aacb Author: Alexander Murmann <[email protected]> Authored: Wed Jul 5 17:36:20 2017 -0700 Committer: Udo Kohlmeyer <[email protected]> Committed: Wed Jul 12 17:00:29 2017 -0700 ---------------------------------------------------------------------- .../protobuf/ProtobufStreamProcessor.java | 11 +- .../GetAllRequestOperationHandler.java | 75 +++++++++ .../operations/GetRequestOperationHandler.java | 14 +- .../PutAllRequestOperationHandler.java | 109 ++++++++++++ .../operations/PutRequestOperationHandler.java | 23 ++- .../RemoveRequestOperationHandler.java | 15 +- .../utilities/ProtobufRequestUtilities.java | 36 +++- .../utilities/ProtobufResponseUtilities.java | 41 +++-- .../protobuf/utilities/ProtobufUtilities.java | 24 ++- .../src/main/proto/clientProtocol.proto | 13 +- geode-protobuf/src/main/proto/region_API.proto | 2 +- .../org/apache/geode/protocol/MessageUtil.java | 2 +- .../RoundTripCacheConnectionJUnitTest.java | 139 +++++++++++++++- .../GetAllRequestOperationHandlerJUnitTest.java | 159 ++++++++++++++++++ .../GetRequestOperationHandlerJUnitTest.java | 42 ++++- .../PutAllRequestOperationHandlerJUnitTest.java | 166 +++++++++++++++++++ 16 files changed, 798 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java index 7146392..ef4affa 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java @@ -21,10 +21,7 @@ import org.apache.geode.protocol.exception.InvalidProtocolMessageException; import org.apache.geode.protocol.operations.registry.OperationsHandlerRegistry; import org.apache.geode.protocol.operations.registry.exception.OperationHandlerAlreadyRegisteredException; import org.apache.geode.protocol.operations.registry.exception.OperationHandlerNotRegisteredException; -import org.apache.geode.protocol.protobuf.operations.GetRegionNamesRequestOperationHandler; -import org.apache.geode.protocol.protobuf.operations.GetRequestOperationHandler; -import org.apache.geode.protocol.protobuf.operations.PutRequestOperationHandler; -import org.apache.geode.protocol.protobuf.operations.RemoveRequestOperationHandler; +import org.apache.geode.protocol.protobuf.operations.*; import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer; import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException; @@ -65,6 +62,12 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler { ClientProtocol.Request.RequestAPICase.GETREGIONNAMESREQUEST.getNumber(), new GetRegionNamesRequestOperationHandler()); registry.registerOperationHandlerForOperationId( + ClientProtocol.Request.RequestAPICase.GETALLREQUEST.getNumber(), + new GetAllRequestOperationHandler()); + registry.registerOperationHandlerForOperationId( + ClientProtocol.Request.RequestAPICase.PUTALLREQUEST.getNumber(), + new PutAllRequestOperationHandler()); + registry.registerOperationHandlerForOperationId( ClientProtocol.Request.RequestAPICase.REMOVEREQUEST.getNumber(), new RemoveRequestOperationHandler()); } http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java new file mode 100644 index 0000000..75ae842 --- /dev/null +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandler.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.protocol.protobuf.operations; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.protocol.operations.OperationHandler; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.RegionAPI; +import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities; +import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; +import org.apache.geode.serialization.SerializationService; +import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException; +import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class GetAllRequestOperationHandler + implements OperationHandler<ClientProtocol.Request, ClientProtocol.Response> { + private static Logger logger = LogManager.getLogger(); + + @Override + public ClientProtocol.Response process(SerializationService serializationService, + ClientProtocol.Request request, Cache cache) { + if (request.getRequestAPICase() != ClientProtocol.Request.RequestAPICase.GETALLREQUEST) { + return ProtobufResponseUtilities + .createAndLogErrorResponse("Improperly formatted getAll request message.", logger, null); + } + RegionAPI.GetAllRequest getAllRequest = request.getGetAllRequest(); + + String regionName = getAllRequest.getRegionName(); + Region region = cache.getRegion(regionName); + if (region == null) { + return ProtobufResponseUtilities.createErrorResponse("Region not found"); + } + + try { + Set<Object> keys = new HashSet<>(); + for (BasicTypes.EncodedValue key : getAllRequest.getKeyList()) { + keys.add(ProtobufUtilities.decodeValue(serializationService, key)); + } + Map<Object, Object> results = region.getAll(keys); + Set<BasicTypes.Entry> entries = new HashSet<>(); + for (Map.Entry<Object, Object> entry : results.entrySet()) { + entries.add( + ProtobufUtilities.createEntry(serializationService, entry.getKey(), entry.getValue())); + } + return ProtobufResponseUtilities.createGetAllResponse(entries); + } catch (UnsupportedEncodingTypeException ex) { + // can be thrown by encoding or decoding. + return ProtobufResponseUtilities.createAndLogErrorResponse("Encoding not supported.", logger, + ex); + } catch (CodecNotRegisteredForTypeException ex) { + return ProtobufResponseUtilities + .createAndLogErrorResponse("Codec error in protobuf deserialization.", logger, ex); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java index 13b156f..d5bcfb9 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandler.java @@ -34,15 +34,15 @@ public class GetRequestOperationHandler public ClientProtocol.Response process(SerializationService serializationService, ClientProtocol.Request request, Cache cache) { if (request.getRequestAPICase() != ClientProtocol.Request.RequestAPICase.GETREQUEST) { - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, - "Improperly formatted get request message.", logger, null); + return ProtobufResponseUtilities + .createAndLogErrorResponse("Improperly formatted get request message.", logger, null); } RegionAPI.GetRequest getRequest = request.getGetRequest(); String regionName = getRequest.getRegionName(); Region region = cache.getRegion(regionName); if (region == null) { - return ProtobufResponseUtilities.createErrorResponse(false, false, "Region not found"); + return ProtobufResponseUtilities.createErrorResponse("Region not found"); } try { @@ -58,11 +58,11 @@ public class GetRequestOperationHandler return ProtobufResponseUtilities.createGetResponse(encodedValue); } catch (UnsupportedEncodingTypeException ex) { // can be thrown by encoding or decoding. - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, - "Encoding not supported.", logger, ex); + return ProtobufResponseUtilities.createAndLogErrorResponse("Encoding not supported.", logger, + ex); } catch (CodecNotRegisteredForTypeException ex) { - return ProtobufResponseUtilities.createAndLogErrorResponse(true, false, - "Codec error in protobuf deserialization.", logger, ex); + return ProtobufResponseUtilities + .createAndLogErrorResponse("Codec error in protobuf deserialization.", logger, ex); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java new file mode 100644 index 0000000..7e62bba --- /dev/null +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandler.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.protocol.protobuf.operations; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.protocol.operations.OperationHandler; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.RegionAPI; +import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities; +import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; +import org.apache.geode.serialization.SerializationService; +import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException; +import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +public class PutAllRequestOperationHandler + implements OperationHandler<ClientProtocol.Request, ClientProtocol.Response> { + private static Logger logger = LogManager.getLogger(); + + private RegionAPI.PutAllRequest putAllRequest = null; + private Region region = null; + private Map<Object, Object> entries = null; + + @Override + public ClientProtocol.Response process(SerializationService serializationService, + ClientProtocol.Request request, Cache cache) { + ClientProtocol.Response errorResponse = validatePutAllRequest(request); + if (errorResponse == null) { + errorResponse = determinePutAllRegion(cache); + } + if (errorResponse == null) { + errorResponse = extractPutAllEntries(serializationService); + } + if (errorResponse == null) { + try { + region.putAll(entries); + } catch (Exception ex) { + return ProtobufResponseUtilities.createAndLogErrorResponse(ex.getMessage(), logger, ex); + } + + return ProtobufResponseUtilities.createPutAllResponse(); + } else { + return errorResponse; + } + } + + private ClientProtocol.Response validatePutAllRequest(ClientProtocol.Request request) { + if (request.getRequestAPICase() != ClientProtocol.Request.RequestAPICase.PUTALLREQUEST) { + return ProtobufResponseUtilities + .createAndLogErrorResponse("Improperly formatted put request message.", logger, null); + } + + putAllRequest = request.getPutAllRequest(); + return null; + } + + private ClientProtocol.Response determinePutAllRegion(Cache cache) { + String regionName = putAllRequest.getRegionName(); + region = cache.getRegion(regionName); + + if (region == null) { + return ProtobufResponseUtilities.createAndLogErrorResponse( + "Region passed by client did not exist: " + regionName, logger, null); + } else { + return null; + } + } + + // Read all of the entries out of the protobuf and return an error (without performing any puts) + // if any of the entries can't be decoded + private ClientProtocol.Response extractPutAllEntries(SerializationService serializationService) { + entries = new HashMap(); + try { + for (BasicTypes.Entry entry : putAllRequest.getEntryList()) { + Object decodedValue = ProtobufUtilities.decodeValue(serializationService, entry.getValue()); + Object decodedKey = ProtobufUtilities.decodeValue(serializationService, entry.getKey()); + + entries.put(decodedKey, decodedValue); + } + } catch (UnsupportedEncodingTypeException ex) { + return ProtobufResponseUtilities.createAndLogErrorResponse("Encoding not supported ", logger, + ex); + } catch (CodecNotRegisteredForTypeException ex) { + return ProtobufResponseUtilities + .createAndLogErrorResponse("Codec error in protobuf deserialization ", logger, ex); + } + + return null; + } + +} http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java index fecf01d..195aa7a 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandler.java @@ -34,15 +34,15 @@ public class PutRequestOperationHandler public ClientProtocol.Response process(SerializationService serializationService, ClientProtocol.Request request, Cache cache) { if (request.getRequestAPICase() != ClientProtocol.Request.RequestAPICase.PUTREQUEST) { - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, - "Improperly formatted put request message.", logger, null); + return ProtobufResponseUtilities + .createAndLogErrorResponse("Improperly formatted put request message.", logger, null); } RegionAPI.PutRequest putRequest = request.getPutRequest(); String regionName = putRequest.getRegionName(); Region region = cache.getRegion(regionName); if (region == null) { - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, + return ProtobufResponseUtilities.createAndLogErrorResponse( "Region passed by client did not exist: " + regionName, logger, null); } @@ -55,18 +55,17 @@ public class PutRequestOperationHandler region.put(decodedKey, decodedValue); return ProtobufResponseUtilities.createPutResponse(); } catch (ClassCastException ex) { - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, - "invalid key or value type for region " + regionName + ",passed key: " - + entry.getKey().getEncodingType() + " value: " - + entry.getValue().getEncodingType(), - logger, ex); + return ProtobufResponseUtilities + .createAndLogErrorResponse("invalid key or value type for region " + regionName + + ",passed key: " + entry.getKey().getEncodingType() + " value: " + + entry.getValue().getEncodingType(), logger, ex); } } catch (UnsupportedEncodingTypeException ex) { - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, - "encoding not supported ", logger, ex); + return ProtobufResponseUtilities.createAndLogErrorResponse("encoding not supported ", logger, + ex); } catch (CodecNotRegisteredForTypeException ex) { - return ProtobufResponseUtilities.createAndLogErrorResponse(true, false, - "codec error in protobuf deserialization ", logger, ex); + return ProtobufResponseUtilities + .createAndLogErrorResponse("codec error in protobuf deserialization ", logger, ex); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java index e1fef85..725a338 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandler.java @@ -17,7 +17,6 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; import org.apache.geode.protocol.operations.OperationHandler; -import org.apache.geode.protocol.protobuf.BasicTypes; import org.apache.geode.protocol.protobuf.ClientProtocol; import org.apache.geode.protocol.protobuf.RegionAPI; import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities; @@ -36,15 +35,15 @@ public class RemoveRequestOperationHandler public ClientProtocol.Response process(SerializationService serializationService, ClientProtocol.Request request, Cache cache) { if (request.getRequestAPICase() != ClientProtocol.Request.RequestAPICase.REMOVEREQUEST) { - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, - "Improperly formatted get request message.", logger, null); + return ProtobufResponseUtilities + .createAndLogErrorResponse("Improperly formatted get request message.", logger, null); } RegionAPI.RemoveRequest removeRequest = request.getRemoveRequest(); String regionName = removeRequest.getRegionName(); Region region = cache.getRegion(regionName); if (region == null) { - return ProtobufResponseUtilities.createErrorResponse(false, false, "Region not found"); + return ProtobufResponseUtilities.createErrorResponse("Region not found"); } try { @@ -55,11 +54,11 @@ public class RemoveRequestOperationHandler return ProtobufResponseUtilities.createRemoveResponse(); } catch (UnsupportedEncodingTypeException ex) { // can be thrown by encoding or decoding. - return ProtobufResponseUtilities.createAndLogErrorResponse(false, false, - "Encoding not supported.", logger, ex); + return ProtobufResponseUtilities.createAndLogErrorResponse("Encoding not supported.", logger, + ex); } catch (CodecNotRegisteredForTypeException ex) { - return ProtobufResponseUtilities.createAndLogErrorResponse(true, false, - "Codec error in protobuf deserialization.", logger, ex); + return ProtobufResponseUtilities + .createAndLogErrorResponse("Codec error in protobuf deserialization.", logger, ex); } } } http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufRequestUtilities.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufRequestUtilities.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufRequestUtilities.java index b246a50..b0f4795 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufRequestUtilities.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufRequestUtilities.java @@ -14,7 +14,11 @@ */ package org.apache.geode.protocol.protobuf.utilities; -import org.apache.geode.protocol.protobuf.*; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.RegionAPI; + +import java.util.Set; /** * This class contains helper functions for generating ClientProtocol.Request objects @@ -73,4 +77,34 @@ public abstract class ProtobufRequestUtilities { RegionAPI.PutRequest.newBuilder().setRegionName(region).setEntry(entry).build(); return ClientProtocol.Request.newBuilder().setPutRequest(putRequest).build(); } + + /** + * Create a request to get the values for multiple keys + * + * @param regionName - Name of the region to fetch from + * @param keys - Set of keys being fetched + * @return Request object containing the getAll request + */ + public static ClientProtocol.Request createGetAllRequest(String regionName, + Set<BasicTypes.EncodedValue> keys) { + RegionAPI.GetAllRequest.Builder getAllRequestBuilder = + RegionAPI.GetAllRequest.newBuilder().setRegionName(regionName); + getAllRequestBuilder.addAllKey(keys); + return ClientProtocol.Request.newBuilder().setGetAllRequest(getAllRequestBuilder).build(); + } + + /** + * Create a request to insert multiple entries in a region + * + * @param regionName - Region to which entries are being added + * @param entries - key, value pairs to add to the region + * @return Request object containing the putAll request for the passed parameters + */ + public static ClientProtocol.Request createPutAllRequest(String regionName, + Set<BasicTypes.Entry> entries) { + RegionAPI.PutAllRequest.Builder putAllRequestBuilder = + RegionAPI.PutAllRequest.newBuilder().setRegionName(regionName); + putAllRequestBuilder.addAllEntry(entries); + return ClientProtocol.Request.newBuilder().setPutAllRequest(putAllRequestBuilder).build(); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java index d6ef278..9a1a9ce 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufResponseUtilities.java @@ -24,7 +24,7 @@ import java.util.Set; /** * This class contains helper functions for generating ClientProtocol.Response objects. - * + * <p> * Request building helpers can be found in {@link ProtobufRequestUtilities}, while more general * purpose helpers can be found in {@link ProtobufUtilities} */ @@ -32,16 +32,12 @@ public abstract class ProtobufResponseUtilities { /** * This creates response object containing a ClientProtocol.ErrorResponse * - * @param serverInternal - is this error internal to the server - * @param retriable - can the operation be retried with a potentially different result * @param errorMessage - description of the error * @return An error response containing the above parameters */ - public static ClientProtocol.Response createErrorResponse(boolean serverInternal, - boolean retriable, String errorMessage) { + public static ClientProtocol.Response createErrorResponse(String errorMessage) { ClientProtocol.ErrorResponse error = - ClientProtocol.ErrorResponse.newBuilder().setInternalServerError(serverInternal) - .setRetriable(retriable).setMessage(errorMessage).build(); + ClientProtocol.ErrorResponse.newBuilder().setMessage(errorMessage).build(); return ClientProtocol.Response.newBuilder().setErrorResponse(error).build(); } @@ -49,21 +45,19 @@ public abstract class ProtobufResponseUtilities { * This creates response object containing a ClientProtocol.ErrorResponse, and also logs the * passed error message and exception (if present) to the provided logger. * - * @param serverInternal - is this error internal to the server - * @param retriable - can the operation be retried with a potentially different result * @param errorMessage - description of the error * @param logger - logger to write the error message to * @param ex - exception which should be logged * @return An error response containing the first three parameters. */ - public static ClientProtocol.Response createAndLogErrorResponse(boolean serverInternal, - boolean retriable, String errorMessage, Logger logger, Exception ex) { + public static ClientProtocol.Response createAndLogErrorResponse(String errorMessage, + Logger logger, Exception ex) { if (ex != null) { logger.error(errorMessage, ex); } else { logger.error(errorMessage); } - return createErrorResponse(serverInternal, retriable, errorMessage); + return createErrorResponse(errorMessage); } /** @@ -123,4 +117,27 @@ public abstract class ProtobufResponseUtilities { return ClientProtocol.Response.newBuilder().setPutResponse(RegionAPI.PutResponse.newBuilder()) .build(); } + + /** + * This creates a response object containing a RegionAPI.GetAllResponse + * + * @param entries - key, value pairs for which lookups succeeded + * @return A response object containing all the passed results + */ + public static ClientProtocol.Response createGetAllResponse(Set<BasicTypes.Entry> entries) { + RegionAPI.GetAllResponse.Builder builder = RegionAPI.GetAllResponse.newBuilder(); + builder.addAllEntries(entries); + return ClientProtocol.Response.newBuilder().setGetAllResponse(builder).build(); + } + + /** + * This creates a response for a putAll request + * + * @return A response object indicating any invalid keys (all others are assumed to have + * succeeded) + */ + public static ClientProtocol.Response createPutAllResponse() { + return ClientProtocol.Response.newBuilder() + .setPutAllResponse(RegionAPI.PutAllResponse.newBuilder()).build(); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java index 9249793..b632037 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java +++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/utilities/ProtobufUtilities.java @@ -16,8 +16,6 @@ package org.apache.geode.protocol.protobuf.utilities; import com.google.protobuf.ByteString; import org.apache.geode.protocol.protobuf.*; -import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; -import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities; import org.apache.geode.serialization.SerializationService; import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException; import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException; @@ -55,7 +53,7 @@ public abstract class ProtobufUtilities { } /** - * Creates a protobuf key,value pair + * Creates a protobuf key,value pair from an encoded key and value * * @param key - an EncodedValue containing the key of the entry * @param value - an EncodedValue containing the value of the entry @@ -67,6 +65,26 @@ public abstract class ProtobufUtilities { } /** + * Creates a protobuf key,value pair from unencoded data + * + * @param serializationService - object which knows how to encode objects for the protobuf + * protocol {@link ProtobufSerializationService} + * @param unencodedKey - the unencoded key for the entry + * @param unencodedValue - the unencoded value for the entry + * @return a protobuf Entry containing the encoded key and value + * @throws UnsupportedEncodingTypeException - The key or value passed doesn't have a corresponding + * SerializationType + * @throws CodecNotRegisteredForTypeException - There isn't a protobuf codec for the + * SerializationType of the passed key or value + */ + public static BasicTypes.Entry createEntry(SerializationService serializationService, + Object unencodedKey, Object unencodedValue) + throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { + return createEntry(createEncodedValue(serializationService, unencodedKey), + createEncodedValue(serializationService, unencodedValue)); + } + + /** * This creates a protobuf message containing a ClientProtocol.Response * * @param messageHeader - The header for the message http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/proto/clientProtocol.proto ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/proto/clientProtocol.proto b/geode-protobuf/src/main/proto/clientProtocol.proto index d94c0f3..42de37c 100644 --- a/geode-protobuf/src/main/proto/clientProtocol.proto +++ b/geode-protobuf/src/main/proto/clientProtocol.proto @@ -55,7 +55,6 @@ message Request { } message Response { - ResponseHeader responseHeader = 1; oneof responseAPI { PutResponse putResponse = 2; GetResponse getResponse = 3; @@ -77,16 +76,8 @@ message Response { } message ErrorResponse { - bool internalServerError = 1; - bool retriable = 2; - string message = 3; -} - -message ResponseHeader { - oneof reponseType { - int32 responseTypeID = 1; - int32 errorCode = 2; - } + int32 errorCode = 1; + string message = 2; } message MetaData { http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/main/proto/region_API.proto ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/main/proto/region_API.proto b/geode-protobuf/src/main/proto/region_API.proto index 3108cb7..b8f410d 100644 --- a/geode-protobuf/src/main/proto/region_API.proto +++ b/geode-protobuf/src/main/proto/region_API.proto @@ -42,7 +42,7 @@ message PutAllRequest { } message PutAllResponse { - repeated EncodedValue failedKeys = 1; + // message presence indicates success. } message GetAllRequest { http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/test/java/org/apache/geode/protocol/MessageUtil.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/MessageUtil.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/MessageUtil.java index fee9448..174ccfa 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/MessageUtil.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/MessageUtil.java @@ -47,7 +47,7 @@ public class MessageUtil { } public static ClientProtocol.Message makeGetRequestMessage( - SerializationService serializationService, String requestKey, String requestRegion, + SerializationService serializationService, Object requestKey, String requestRegion, ClientProtocol.MessageHeader header) throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { ClientProtocol.Request request = ProtobufRequestUtilities.createGetRequest(requestRegion, http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java index 612b9c9..74c1b0e 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java @@ -17,7 +17,6 @@ package org.apache.geode.protocol; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.ConfigurationProperties; @@ -42,6 +41,7 @@ import org.apache.geode.test.junit.categories.IntegrationTest; import org.apache.geode.util.test.TestUtil; import org.awaitility.Awaitility; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -52,13 +52,20 @@ import org.junit.rules.TestName; import java.io.IOException; import java.io.OutputStream; import java.net.Socket; +import java.util.HashSet; import java.util.Properties; +import java.util.Set; import java.util.concurrent.TimeUnit; -import static org.apache.geode.distributed.ConfigurationProperties.*; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_ENABLED_COMPONENTS; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_PASSWORD; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_TYPE; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE; +import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; /** * Test that switching on the header byte makes instances of @@ -77,6 +84,13 @@ public class RoundTripCacheConnectionJUnitTest { private static final String SSL_PROTOCOLS = "any"; private static final String SSL_CIPHERS = "any"; + public static final String TEST_MULTIOP_KEY1 = "multiopKey1"; + public static final String TEST_MULTIOP_KEY2 = "multiopKey2"; + public static final String TEST_MULTIOP_KEY3 = "multiopKey3"; + public static final String TEST_MULTIOP_VALUE1 = "multiopValue1"; + public static final String TEST_MULTIOP_VALUE2 = "multiopValue2"; + public static final String TEST_MULTIOP_VALUE3 = "multiopValue3"; + private Cache cache; private int cacheServerPort; private SerializationService serializationService; @@ -144,7 +158,76 @@ public class RoundTripCacheConnectionJUnitTest { ClientProtocol.Message getMessage = MessageUtil.makeGetRequestMessage(serializationService, TEST_KEY, TEST_REGION, ProtobufUtilities.createMessageHeader(TEST_GET_CORRELATION_ID)); protobufProtocolSerializer.serialize(getMessage, outputStream); - validateGetResponse(socket, protobufProtocolSerializer); + validateGetResponse(socket, protobufProtocolSerializer, TEST_VALUE); + } + + @Test + public void testNewProtocolWithMultikeyOperations() throws Exception { + System.setProperty("geode.feature-protobuf-protocol", "true"); + + Socket socket = new Socket("localhost", cacheServerPort); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + OutputStream outputStream = socket.getOutputStream(); + outputStream.write(110); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + Set<BasicTypes.Entry> putEntries = new HashSet<>(); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY1, + TEST_MULTIOP_VALUE1)); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, + TEST_MULTIOP_VALUE2)); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, + TEST_MULTIOP_VALUE3)); + ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufRequest( + ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID), + ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, putEntries)); + protobufProtocolSerializer.serialize(putAllMessage, outputStream); + validatePutAllResponse(socket, protobufProtocolSerializer, + ClientProtocol.Response.ResponseAPICase.PUTALLRESPONSE); + + Set<BasicTypes.EncodedValue> getEntries = new HashSet<>(); + getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY1)); + getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY2)); + getEntries.add(ProtobufUtilities.createEncodedValue(serializationService, TEST_MULTIOP_KEY3)); + ClientProtocol.Message getAllMessage = ProtobufUtilities.createProtobufRequest( + ProtobufUtilities.createMessageHeader(TEST_GET_CORRELATION_ID), + ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, getEntries)); + protobufProtocolSerializer.serialize(getAllMessage, outputStream); + validateGetAllResponse(socket, protobufProtocolSerializer); + } + + @Test + public void multiKeyOperationErrorsWithClasscastException() throws Exception { + RegionFactory<Float, Object> regionFactory = cache.createRegionFactory(); + regionFactory.setKeyConstraint(Float.class); + String regionName = "constraintRegion"; + regionFactory.create(regionName); + System.setProperty("geode.feature-protobuf-protocol", "true"); + + Socket socket = new Socket("localhost", cacheServerPort); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + OutputStream outputStream = socket.getOutputStream(); + outputStream.write(110); + + ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer(); + Set<BasicTypes.Entry> putEntries = new HashSet<>(); + putEntries.add( + ProtobufUtilities.createEntry(serializationService, new Float(2.2), TEST_MULTIOP_VALUE1)); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY2, + TEST_MULTIOP_VALUE2)); + putEntries.add(ProtobufUtilities.createEntry(serializationService, TEST_MULTIOP_KEY3, + TEST_MULTIOP_VALUE3)); + ClientProtocol.Message putAllMessage = ProtobufUtilities.createProtobufRequest( + ProtobufUtilities.createMessageHeader(TEST_PUT_CORRELATION_ID), + ProtobufRequestUtilities.createPutAllRequest(regionName, putEntries)); + protobufProtocolSerializer.serialize(putAllMessage, outputStream); + validatePutAllResponse(socket, protobufProtocolSerializer, + ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE); + + ClientProtocol.Message getMessage = MessageUtil.makeGetRequestMessage(serializationService, + new Float(2.2), regionName, ProtobufUtilities.createMessageHeader(TEST_GET_CORRELATION_ID)); + protobufProtocolSerializer.serialize(getMessage, outputStream); + validateGetResponse(socket, protobufProtocolSerializer, TEST_MULTIOP_VALUE1); ClientProtocol.Message removeMessage = ProtobufUtilities.createProtobufRequest( ProtobufUtilities.createMessageHeader(TEST_REMOVE_CORRELATION_ID), @@ -207,7 +290,7 @@ public class RoundTripCacheConnectionJUnitTest { } private void validateGetResponse(Socket socket, - ProtobufProtocolSerializer protobufProtocolSerializer) + ProtobufProtocolSerializer protobufProtocolSerializer, Object expectedValue) throws InvalidProtocolMessageException, IOException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException, CodecAlreadyRegisteredForTypeException { ClientProtocol.Message message = @@ -220,7 +303,7 @@ public class RoundTripCacheConnectionJUnitTest { RegionAPI.GetResponse getResponse = response.getGetResponse(); BasicTypes.EncodedValue result = getResponse.getResult(); assertEquals(BasicTypes.EncodingType.STRING, result.getEncodingType()); - assertEquals(TEST_VALUE, new ProtobufSerializationService().decode(result.getEncodingType(), + assertEquals(expectedValue, new ProtobufSerializationService().decode(result.getEncodingType(), result.getValue().toByteArray())); } @@ -239,6 +322,50 @@ public class RoundTripCacheConnectionJUnitTest { assertEquals(TEST_REGION, getRegionsResponse.getRegions(0)); } + private void validatePutAllResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer, + ClientProtocol.Response.ResponseAPICase responseCase) throws Exception { + ClientProtocol.Message message = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(TEST_PUT_CORRELATION_ID, message.getMessageHeader().getCorrelationId()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); + ClientProtocol.Response response = message.getResponse(); + assertEquals(responseCase, response.getResponseAPICase()); + } + + private void validateGetAllResponse(Socket socket, + ProtobufProtocolSerializer protobufProtocolSerializer) + throws InvalidProtocolMessageException, IOException, UnsupportedEncodingTypeException, + CodecNotRegisteredForTypeException, CodecAlreadyRegisteredForTypeException { + ClientProtocol.Message message = + protobufProtocolSerializer.deserialize(socket.getInputStream()); + assertEquals(TEST_GET_CORRELATION_ID, message.getMessageHeader().getCorrelationId()); + assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE, message.getMessageTypeCase()); + ClientProtocol.Response response = message.getResponse(); + assertEquals(ClientProtocol.Response.ResponseAPICase.GETALLRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetAllResponse getAllResponse = response.getGetAllResponse(); + assertEquals(3, getAllResponse.getEntriesCount()); + for (BasicTypes.Entry result : getAllResponse.getEntriesList()) { + String key = (String) ProtobufUtilities.decodeValue(serializationService, result.getKey()); + String value = + (String) ProtobufUtilities.decodeValue(serializationService, result.getValue()); + switch (key) { + case TEST_MULTIOP_KEY1: + assertEquals(TEST_MULTIOP_VALUE1, value); + break; + case TEST_MULTIOP_KEY2: + assertEquals(TEST_MULTIOP_VALUE2, value); + break; + case TEST_MULTIOP_KEY3: + assertEquals(TEST_MULTIOP_VALUE3, value); + break; + default: + Assert.fail("Unexpected key found by getAll: " + key); + } + } + } + private void validateRemoveResponse(Socket socket, ProtobufProtocolSerializer protobufProtocolSerializer) throws Exception { ClientProtocol.Message message = http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java new file mode 100644 index 0000000..0e716fb --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.protocol.protobuf.operations; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.RegionAPI; +import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; +import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; +import org.apache.geode.serialization.SerializationService; +import org.apache.geode.serialization.codec.StringCodec; +import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException; +import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException; +import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.nio.charset.Charset; +import java.util.*; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(UnitTest.class) +public class GetAllRequestOperationHandlerJUnitTest { + private static final String TEST_KEY1 = "my key1"; + private static final String TEST_VALUE1 = "my value1"; + private static final String TEST_KEY2 = "my key2"; + private static final String TEST_VALUE2 = "my value2"; + private static final String TEST_KEY3 = "my key3"; + private static final String TEST_VALUE3 = "my value3"; + private static final String TEST_REGION = "test region"; + private Cache cacheStub; + private SerializationService serializationServiceStub; + private GetAllRequestOperationHandler operationHandler; + private StringCodec stringDecoder; + + @Before + public void setUp() throws Exception { + serializationServiceStub = mock(SerializationService.class); + addStringMockEncoding(serializationServiceStub, TEST_KEY1, true, true); + addStringMockEncoding(serializationServiceStub, TEST_KEY2, true, true); + addStringMockEncoding(serializationServiceStub, TEST_KEY3, true, true); + addStringMockEncoding(serializationServiceStub, TEST_VALUE1, true, false); + addStringMockEncoding(serializationServiceStub, TEST_VALUE2, true, false); + addStringMockEncoding(serializationServiceStub, TEST_VALUE3, true, false); + + Region regionStub = mock(Region.class); + when(regionStub.getAll(new HashSet<Object>() { + { + add(TEST_KEY1); + add(TEST_KEY2); + add(TEST_KEY3); + } + })).thenReturn(new HashMap() { + { + put(TEST_KEY1, TEST_VALUE1); + put(TEST_KEY2, TEST_VALUE2); + put(TEST_KEY3, TEST_VALUE3); + } + }); + + cacheStub = mock(Cache.class); + when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionStub); + operationHandler = new GetAllRequestOperationHandler(); + stringDecoder = new StringCodec(); + } + + private void addStringMockEncoding(SerializationService mock, String s, boolean add_encoding, + boolean add_decoding) throws Exception { + if (add_encoding) { + when(mock.encode(BasicTypes.EncodingType.STRING, s)) + .thenReturn(s.getBytes(Charset.forName("UTF-8"))); + } + if (add_decoding) { + when(mock.decode(BasicTypes.EncodingType.STRING, s.getBytes(Charset.forName("UTF-8")))) + .thenReturn(s); + } + } + + @Test + public void processReturnsExpectedValuesForValidKeys() + throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, + CodecNotRegisteredForTypeException { + ClientProtocol.Request getRequest = generateTestRequest(true); + ClientProtocol.Response response = + operationHandler.process(serializationServiceStub, getRequest, cacheStub); + + Assert.assertEquals(ClientProtocol.Response.ResponseAPICase.GETALLRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetAllResponse getAllResponse = response.getGetAllResponse(); + Assert.assertEquals(3, getAllResponse.getEntriesCount()); + + List<BasicTypes.Entry> entriesList = getAllResponse.getEntriesList(); + Map<String, String> responseEntries = convertEntryListToMap(entriesList); + + Assert.assertEquals(TEST_VALUE1, responseEntries.get(TEST_KEY1)); + Assert.assertEquals(TEST_VALUE2, responseEntries.get(TEST_KEY2)); + Assert.assertEquals(TEST_VALUE3, responseEntries.get(TEST_KEY3)); + } + + @Test + public void processReturnsNoEntriesForNoKeysRequested() + throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { + ClientProtocol.Request getRequest = generateTestRequest(false); + ClientProtocol.Response response = + operationHandler.process(serializationServiceStub, getRequest, cacheStub); + + Assert.assertEquals(ClientProtocol.Response.ResponseAPICase.GETALLRESPONSE, + response.getResponseAPICase()); + + RegionAPI.GetAllResponse getAllResponse = response.getGetAllResponse(); + List<BasicTypes.Entry> entriesList = getAllResponse.getEntriesList(); + Map<String, String> responseEntries = convertEntryListToMap(entriesList); + Assert.assertEquals(0, responseEntries.size()); + } + + private ClientProtocol.Request generateTestRequest(boolean addKeys) + throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { + HashSet<BasicTypes.EncodedValue> testKeys = new HashSet<>(); + if (addKeys) { + testKeys.add(ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_KEY1)); + testKeys.add(ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_KEY2)); + testKeys.add(ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_KEY3)); + } + return ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, testKeys); + } + + private Map<String, String> convertEntryListToMap(List<BasicTypes.Entry> entriesList) { + Map<String, String> result = new HashMap<>(); + for (BasicTypes.Entry entry : entriesList) { + BasicTypes.EncodedValue encodedKey = entry.getKey(); + Assert.assertEquals(BasicTypes.EncodingType.STRING, encodedKey.getEncodingType()); + String key = stringDecoder.decode(encodedKey.getValue().toByteArray()); + BasicTypes.EncodedValue encodedValue = entry.getValue(); + Assert.assertEquals(BasicTypes.EncodingType.STRING, encodedValue.getEncodingType()); + String value = stringDecoder.decode(encodedValue.getValue().toByteArray()); + result.put(key, value); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java index b7d5201..2f2e47e 100644 --- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java @@ -16,7 +16,9 @@ package org.apache.geode.protocol.protobuf.operations; import org.apache.geode.cache.Cache; import org.apache.geode.cache.Region; -import org.apache.geode.protocol.protobuf.*; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.RegionAPI; import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; import org.apache.geode.serialization.SerializationService; @@ -42,6 +44,7 @@ public class GetRequestOperationHandlerJUnitTest { public static final String TEST_REGION = "test region"; public static final String MISSING_REGION = "missing region"; public static final String MISSING_KEY = "missing key"; + public static final String NULLED_KEY = "nulled key"; public Cache cacheStub; public SerializationService serializationServiceStub; private GetRequestOperationHandler operationHandler; @@ -58,9 +61,19 @@ public class GetRequestOperationHandlerJUnitTest { .thenReturn(TEST_KEY.getBytes(Charset.forName("UTF-8"))); when(serializationServiceStub.encode(BasicTypes.EncodingType.STRING, MISSING_KEY)) .thenReturn(MISSING_KEY.getBytes(Charset.forName("UTF-8"))); + when(serializationServiceStub.decode(BasicTypes.EncodingType.STRING, + MISSING_KEY.getBytes(Charset.forName("UTF-8")))).thenReturn(MISSING_KEY); + when(serializationServiceStub.encode(BasicTypes.EncodingType.STRING, NULLED_KEY)) + .thenReturn(NULLED_KEY.getBytes(Charset.forName("UTF-8"))); + when(serializationServiceStub.decode(BasicTypes.EncodingType.STRING, + NULLED_KEY.getBytes(Charset.forName("UTF-8")))).thenReturn(NULLED_KEY); Region regionStub = mock(Region.class); when(regionStub.get(TEST_KEY)).thenReturn(TEST_VALUE); + when(regionStub.get(MISSING_KEY)).thenReturn(null); + when(regionStub.get(NULLED_KEY)).thenReturn(null); + when(regionStub.containsKey(MISSING_KEY)).thenReturn(false); + when(regionStub.containsKey(NULLED_KEY)).thenReturn(true); cacheStub = mock(Cache.class); when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionStub); @@ -73,7 +86,7 @@ public class GetRequestOperationHandlerJUnitTest { public void processReturnsTheEncodedValueFromTheRegion() throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { - ClientProtocol.Request getRequest = generateTestRequest(false, false); + ClientProtocol.Request getRequest = generateTestRequest(false, false, false); ClientProtocol.Response response = operationHandler.process(serializationServiceStub, getRequest, cacheStub); @@ -89,7 +102,7 @@ public class GetRequestOperationHandlerJUnitTest { public void processReturnsUnsucessfulResponseForInvalidRegion() throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { - ClientProtocol.Request getRequest = generateTestRequest(true, false); + ClientProtocol.Request getRequest = generateTestRequest(true, false, false); ClientProtocol.Response response = operationHandler.process(serializationServiceStub, getRequest, cacheStub); @@ -101,7 +114,21 @@ public class GetRequestOperationHandlerJUnitTest { public void processReturnsKeyNotFoundWhenKeyIsNotFound() throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { - ClientProtocol.Request getRequest = generateTestRequest(false, true); + ClientProtocol.Request getRequest = generateTestRequest(false, true, false); + ClientProtocol.Response response = + operationHandler.process(serializationServiceStub, getRequest, cacheStub); + + Assert.assertEquals(ClientProtocol.Response.ResponseAPICase.GETRESPONSE, + response.getResponseAPICase()); + RegionAPI.GetResponse getResponse = response.getGetResponse(); + Assert.assertFalse(getResponse.hasResult()); + } + + @Test + public void processReturnsLookupFailureWhenKeyFoundWithNoValue() + throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException, + CodecNotRegisteredForTypeException { + ClientProtocol.Request getRequest = generateTestRequest(false, false, true); ClientProtocol.Response response = operationHandler.process(serializationServiceStub, getRequest, cacheStub); @@ -120,7 +147,7 @@ public class GetRequestOperationHandlerJUnitTest { when(serializationServiceStub.decode(BasicTypes.EncodingType.STRING, TEST_KEY.getBytes(Charset.forName("UTF-8")))).thenThrow(exception); - ClientProtocol.Request getRequest = generateTestRequest(false, false); + ClientProtocol.Request getRequest = generateTestRequest(false, false, false); ClientProtocol.Response response = operationHandler.process(serializationServiceStub, getRequest, cacheStub); @@ -128,10 +155,11 @@ public class GetRequestOperationHandlerJUnitTest { response.getResponseAPICase()); } - private ClientProtocol.Request generateTestRequest(boolean missingRegion, boolean missingKey) + private ClientProtocol.Request generateTestRequest(boolean missingRegion, boolean missingKey, + boolean nulledKey) throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { String region = missingRegion ? MISSING_REGION : TEST_REGION; - String key = missingKey ? MISSING_KEY : TEST_KEY; + String key = missingKey ? MISSING_KEY : (nulledKey ? NULLED_KEY : TEST_KEY); BasicTypes.EncodedValue testKey = ProtobufUtilities.createEncodedValue(serializationServiceStub, key); return ProtobufRequestUtilities.createGetRequest(region, testKey); http://git-wip-us.apache.org/repos/asf/geode/blob/cd1a3235/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java new file mode 100644 index 0000000..1b9648e --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.protocol.protobuf.operations; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.Region; +import org.apache.geode.protocol.protobuf.BasicTypes; +import org.apache.geode.protocol.protobuf.ClientProtocol; +import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities; +import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities; +import org.apache.geode.serialization.SerializationService; +import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException; +import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException; +import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException; +import org.apache.geode.test.dunit.Assert; +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.mockito.Mockito.*; +import org.hamcrest.CoreMatchers; +import org.mockito.ArgumentMatcher; + +@Category(UnitTest.class) +public class PutAllRequestOperationHandlerJUnitTest { + private static final String TEST_KEY1 = "my key1"; + private static final String TEST_KEY2 = "my key2"; + private static final String TEST_KEY3 = "my key3"; + private static final String TEST_INVALID_KEY = "invalid key"; + private static final String TEST_VALUE1 = "my value1"; + private static final String TEST_VALUE2 = "my value2"; + private static final String TEST_VALUE3 = "my value3"; + private static final Integer TEST_INVALID_VALUE = 732; + private static final String TEST_REGION = "test region"; + private static final String EXCEPTION_TEXT = "Simulating put failure"; + private Cache cacheStub; + private SerializationService serializationServiceStub; + private Region regionMock; + + class isAMapContainingInvalidKey implements ArgumentMatcher<Map> { + @Override + public boolean matches(Map argument) { + return argument.containsKey(TEST_INVALID_KEY); + } + } + + @Before + public void setUp() throws Exception { + serializationServiceStub = mock(SerializationService.class); + addStringStubEncoding(serializationServiceStub, TEST_KEY1); + addStringStubEncoding(serializationServiceStub, TEST_KEY2); + addStringStubEncoding(serializationServiceStub, TEST_KEY3); + addStringStubEncoding(serializationServiceStub, TEST_INVALID_KEY); + addStringStubEncoding(serializationServiceStub, TEST_VALUE1); + addStringStubEncoding(serializationServiceStub, TEST_VALUE2); + addStringStubEncoding(serializationServiceStub, TEST_VALUE3); + when(serializationServiceStub.encode(BasicTypes.EncodingType.INT, TEST_INVALID_VALUE)) + .thenReturn(ByteBuffer.allocate(Integer.BYTES).putInt(TEST_INVALID_VALUE).array()); + when(serializationServiceStub.decode(BasicTypes.EncodingType.INT, + ByteBuffer.allocate(Integer.BYTES).putInt(TEST_INVALID_VALUE).array())) + .thenReturn(TEST_INVALID_VALUE); + + regionMock = mock(Region.class); + + doThrow(new ClassCastException(EXCEPTION_TEXT)).when(regionMock) + .putAll(argThat(new isAMapContainingInvalidKey())); + + cacheStub = mock(Cache.class); + when(cacheStub.getRegion(TEST_REGION)).thenReturn(regionMock); + } + + private void addStringStubEncoding(SerializationService stub, String s) throws Exception { + when(stub.encode(BasicTypes.EncodingType.STRING, s)) + .thenReturn(s.getBytes(Charset.forName("UTF-8"))); + when(stub.decode(BasicTypes.EncodingType.STRING, s.getBytes(Charset.forName("UTF-8")))) + .thenReturn(s); + } + + @Test + public void processInsertsMultipleValidEntriesInCache() throws UnsupportedEncodingTypeException, + CodecNotRegisteredForTypeException, CodecAlreadyRegisteredForTypeException { + PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler(); + + ClientProtocol.Response response = operationHandler.process(serializationServiceStub, + generateTestRequest(false, true), cacheStub); + + Assert.assertEquals(ClientProtocol.Response.ResponseAPICase.PUTALLRESPONSE, + response.getResponseAPICase()); + + HashMap<Object, Object> expectedValues = new HashMap<>(); + expectedValues.put(TEST_KEY1, TEST_VALUE1); + expectedValues.put(TEST_KEY2, TEST_VALUE2); + expectedValues.put(TEST_KEY3, TEST_VALUE3); + + verify(regionMock).putAll(expectedValues); + } + + @Test + public void processWithInvalidEntryReturnsError() throws Exception { + PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler(); + + ClientProtocol.Response response = operationHandler.process(serializationServiceStub, + generateTestRequest(true, true), cacheStub); + + Assert.assertEquals(ClientProtocol.Response.ResponseAPICase.ERRORRESPONSE, + response.getResponseAPICase()); + Assert.assertThat(response.getErrorResponse().getMessage(), + CoreMatchers.containsString(EXCEPTION_TEXT)); + // can't verify anything about put keys because we make no guarantees. + } + + @Test + public void processWithNoEntriesPasses() throws Exception { + PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler(); + + ClientProtocol.Response response = operationHandler.process(serializationServiceStub, + generateTestRequest(false, false), cacheStub); + + Assert.assertEquals(ClientProtocol.Response.ResponseAPICase.PUTALLRESPONSE, + response.getResponseAPICase()); + + verify(regionMock, times(0)).put(any(), any()); + } + + private ClientProtocol.Request generateTestRequest(boolean addInvalidKey, boolean addValidKeys) + throws UnsupportedEncodingTypeException, CodecNotRegisteredForTypeException { + Set<BasicTypes.Entry> entries = new HashSet<>(); + if (addInvalidKey) { + entries.add(ProtobufUtilities.createEntry( + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_INVALID_KEY), + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_INVALID_VALUE))); + } + if (addValidKeys) { + entries.add(ProtobufUtilities.createEntry( + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_KEY1), + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_VALUE1))); + entries.add(ProtobufUtilities.createEntry( + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_KEY2), + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_VALUE2))); + entries.add(ProtobufUtilities.createEntry( + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_KEY3), + ProtobufUtilities.createEncodedValue(serializationServiceStub, TEST_VALUE3))); + } + return ProtobufRequestUtilities.createPutAllRequest(TEST_REGION, entries); + } +}
