This is an automated email from the ASF dual-hosted git repository.

wirebaron pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new fa1f2ac  GEODE-4406: Improve authorization granularity for protobuf 
(#1514)
fa1f2ac is described below

commit fa1f2ac98070bf445f24ea3b2a8f3ce5cf6b5b7d
Author: Brian Rowe <[email protected]>
AuthorDate: Tue Feb 27 16:17:15 2018 -0800

    GEODE-4406: Improve authorization granularity for protobuf (#1514)
---
 .../protobuf/v1/ProtobufOperationContext.java      |  40 +++-
 .../protocol/protobuf/v1/ProtobufOpsProcessor.java |   4 +-
 .../AbstractFunctionRequestOperationHandler.java   |  13 ++
 .../operations/GetAllRequestOperationHandler.java  |  48 ++++-
 .../v1/operations/GetRequestOperationHandler.java  |   8 +
 .../operations/PutAllRequestOperationHandler.java  |  48 ++++-
 .../v1/operations/PutRequestOperationHandler.java  |   8 +
 .../operations/RemoveRequestOperationHandler.java  |   8 +
 .../registry/ProtobufOperationContextRegistry.java |  45 ++--
 ...cySecurityProtobufConnectionStateProcessor.java |   6 +-
 ...NoSecurityProtobufConnectionStateProcessor.java |   6 +-
 ...obufConnectionAuthenticatingStateProcessor.java |   6 +-
 ...rotobufConnectionAuthorizingStateProcessor.java |  18 +-
 .../ProtobufConnectionHandshakeStateProcessor.java |   6 +-
 .../v1/state/ProtobufConnectionStateProcessor.java |   7 +-
 ...rotobufConnectionTerminatingStateProcessor.java |   6 +-
 .../protobuf/v1/AuthorizationIntegrationTest.java  | 229 +++++++++++++++++----
 17 files changed, 419 insertions(+), 87 deletions(-)

diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java
index 8903346..8e46786 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOperationContext.java
@@ -17,14 +17,35 @@ package org.apache.geode.internal.protocol.protobuf.v1;
 import java.util.function.Function;
 
 import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
+import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import org.apache.geode.security.ResourcePermission;
 
 public class ProtobufOperationContext<OperationRequest, OperationResponse> {
+  @FunctionalInterface
+  public interface PermissionFunction<OperationRequest> {
+    ResourcePermission apply(OperationRequest request, 
ProtobufSerializationService service)
+        throws DecodingException;
+  }
+
   private final ProtobufOperationHandler<OperationRequest, OperationResponse> 
operationHandler;
   private final Function<ClientProtocol.Message, OperationRequest> fromRequest;
   private final Function<OperationResponse, ClientProtocol.Message.Builder> 
toResponse;
   private final Function<ClientProtocol.ErrorResponse, 
ClientProtocol.Message.Builder> toErrorResponse;
-  private final ResourcePermission accessPermissionRequired;
+  private final PermissionFunction<OperationRequest> accessPermissionRequired;
+
+  private class StaticResourcePermissionProvider implements 
PermissionFunction<OperationRequest> {
+    private final ResourcePermission permission;
+
+    StaticResourcePermissionProvider(ResourcePermission requiredPermission) {
+      permission = requiredPermission;
+    }
+
+    @Override
+    public ResourcePermission apply(OperationRequest request,
+        ProtobufSerializationService serializer) {
+      return permission;
+    }
+  }
 
   public ProtobufOperationContext(Function<ClientProtocol.Message, 
OperationRequest> fromRequest,
       ProtobufOperationHandler<OperationRequest, OperationResponse> 
operationHandler,
@@ -34,9 +55,21 @@ public class ProtobufOperationContext<OperationRequest, 
OperationResponse> {
     this.fromRequest = fromRequest;
     this.toResponse = toResponse;
     this.toErrorResponse = this::makeErrorBuilder;
+    accessPermissionRequired = new 
StaticResourcePermissionProvider(permissionRequired);
+  }
+
+  public ProtobufOperationContext(Function<ClientProtocol.Message, 
OperationRequest> fromRequest,
+      ProtobufOperationHandler<OperationRequest, OperationResponse> 
operationHandler,
+      Function<OperationResponse, ClientProtocol.Message.Builder> toResponse,
+      PermissionFunction<OperationRequest> permissionRequired) {
+    this.operationHandler = operationHandler;
+    this.fromRequest = fromRequest;
+    this.toResponse = toResponse;
+    this.toErrorResponse = this::makeErrorBuilder;
     accessPermissionRequired = permissionRequired;
   }
 
+
   protected ClientProtocol.Message.Builder makeErrorBuilder(
       ClientProtocol.ErrorResponse errorResponse) {
     return ClientProtocol.Message.newBuilder().setErrorResponse(errorResponse);
@@ -58,7 +91,8 @@ public class ProtobufOperationContext<OperationRequest, 
OperationResponse> {
     return toErrorResponse;
   }
 
-  public ResourcePermission getAccessPermissionRequired() {
-    return accessPermissionRequired;
+  public ResourcePermission getAccessPermissionRequired(OperationRequest 
request,
+      ProtobufSerializationService serializer) throws DecodingException {
+    return accessPermissionRequired.apply(request, serializer);
   }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java
index 16b1e98..60df3c5 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/ProtobufOpsProcessor.java
@@ -51,8 +51,8 @@ public class ProtobufOpsProcessor {
     Result result;
 
     try {
-      messageExecutionContext.getConnectionStateProcessor()
-          .validateOperation(messageExecutionContext, operationContext);
+      
messageExecutionContext.getConnectionStateProcessor().validateOperation(request,
+          serializationService, messageExecutionContext, operationContext);
       result = processOperation(request, messageExecutionContext, requestType, 
operationContext);
     } catch (OperationNotAuthorizedException e) {
       // Don't move to a terminating state for authorization state failures
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
index 548e9ba..f7baef2 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
@@ -18,6 +18,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.logging.log4j.Logger;
+import org.apache.shiro.util.ThreadState;
 
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
@@ -35,6 +36,7 @@ import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationServi
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
+import 
org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionAuthorizingStateProcessor;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.NotAuthorizedException;
 
@@ -59,6 +61,12 @@ public abstract class 
AbstractFunctionRequestOperationHandler<Req, Resp>
     final SecurityService securityService = 
messageExecutionContext.getCache().getSecurityService();
     final String regionName = getRegionName(request);
 
+    ThreadState threadState = null;
+    if (messageExecutionContext
+        .getConnectionStateProcessor() instanceof 
ProtobufConnectionAuthorizingStateProcessor) {
+      threadState = ((ProtobufConnectionAuthorizingStateProcessor) 
messageExecutionContext
+          .getConnectionStateProcessor()).prepareThreadForAuthorization();
+    }
     try {
       // check security for function.
       
function.getRequiredPermissions(regionName).forEach(securityService::authorize);
@@ -66,6 +74,11 @@ public abstract class 
AbstractFunctionRequestOperationHandler<Req, Resp>
       final String message = "Authorization failed for function \"" + 
functionID + "\"";
       logger.warn(message, ex);
       return Failure.of(BasicTypes.ErrorCode.AUTHORIZATION_FAILED, message);
+    } finally {
+      if (threadState != null) {
+        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
+            .getConnectionStateProcessor()).restoreThreadState(threadState);
+      }
     }
 
     Object executionTarget = getExecutionTarget(request, regionName, 
messageExecutionContext);
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java
index a67551c..0ca071c 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetAllRequestOperationHandler.java
@@ -18,6 +18,8 @@ import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCod
 import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.SERVER_ERROR;
 
 import org.apache.logging.log4j.Logger;
+import org.apache.shiro.authz.AuthorizationException;
+import org.apache.shiro.util.ThreadState;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
@@ -33,7 +35,11 @@ import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
+import 
org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionAuthorizingStateProcessor;
 import 
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.geode.security.ResourcePermission;
 
 @Experimental
 public class GetAllRequestOperationHandler
@@ -52,21 +58,48 @@ public class GetAllRequestOperationHandler
           "Region \"" + regionName + "\" not found");
     }
 
+    ThreadState threadState = null;
+    SecurityService securityService = 
messageExecutionContext.getCache().getSecurityService();
+    boolean perKeyAuthorization = false;
+    if (messageExecutionContext
+        .getConnectionStateProcessor() instanceof 
ProtobufConnectionAuthorizingStateProcessor) {
+      threadState = ((ProtobufConnectionAuthorizingStateProcessor) 
messageExecutionContext
+          .getConnectionStateProcessor()).prepareThreadForAuthorization();
+      // Check if authorized for entire region
+      try {
+        securityService.authorize(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+            ResourcePermission.Operation.READ, regionName));
+        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
+            .getConnectionStateProcessor()).restoreThreadState(threadState);
+        threadState = null;
+      } catch (NotAuthorizedException ex) {
+        // Not authorized for the region, have to check keys individually
+        perKeyAuthorization = true;
+      }
+    }
+    final boolean authorizeKeys = perKeyAuthorization; // Required for use in 
lambda
+
+    long startTime = messageExecutionContext.getStatistics().startOperation();
     RegionAPI.GetAllResponse.Builder responseBuilder = 
RegionAPI.GetAllResponse.newBuilder();
     try {
       
messageExecutionContext.getCache().setReadSerializedForCurrentThread(true);
-      request.getKeyList().stream()
-          .forEach((key) -> processSingleKey(responseBuilder, 
serializationService, region, key));
+      request.getKeyList().stream().forEach((key) -> 
processSingleKey(responseBuilder,
+          serializationService, region, key, securityService, authorizeKeys));
     } finally {
       
messageExecutionContext.getCache().setReadSerializedForCurrentThread(false);
+      messageExecutionContext.getStatistics().endOperation(startTime);
+      if (threadState != null) {
+        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
+            .getConnectionStateProcessor()).restoreThreadState(threadState);
+      }
     }
 
     return Success.of(responseBuilder.build());
   }
 
   private void processSingleKey(RegionAPI.GetAllResponse.Builder 
responseBuilder,
-      ProtobufSerializationService serializationService, Region region,
-      BasicTypes.EncodedValue key) {
+      ProtobufSerializationService serializationService, Region region, 
BasicTypes.EncodedValue key,
+      SecurityService securityService, boolean authorizeKeys) {
     try {
 
       Object decodedKey = serializationService.decode(key);
@@ -75,11 +108,18 @@ public class GetAllRequestOperationHandler
             .addFailures(buildKeyedError(key, "NULL is not a valid key for 
get.", INVALID_REQUEST));
         return;
       }
+      if (authorizeKeys) {
+        securityService.authorize(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+            ResourcePermission.Operation.READ, region.getName(), 
decodedKey.toString()));
+      }
       Object value = region.get(decodedKey);
       BasicTypes.Entry entry =
           ProtobufUtilities.createEntry(serializationService, decodedKey, 
value);
       responseBuilder.addEntries(entry);
 
+    } catch (NotAuthorizedException ex) {
+      responseBuilder.addFailures(
+          buildKeyedError(key, "Unauthorized access", 
BasicTypes.ErrorCode.AUTHORIZATION_FAILED));
     } catch (DecodingException ex) {
       logger.info("Key encoding not supported: {}", ex);
       responseBuilder
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java
index e7a27ea..856bb13 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/GetRequestOperationHandler.java
@@ -30,6 +30,7 @@ import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
+import org.apache.geode.security.ResourcePermission;
 
 @Experimental
 public class GetRequestOperationHandler
@@ -67,4 +68,11 @@ public class GetRequestOperationHandler
       
messageExecutionContext.getCache().setReadSerializedForCurrentThread(false);
     }
   }
+
+  public static ResourcePermission 
determineRequiredPermission(RegionAPI.GetRequest request,
+      ProtobufSerializationService serializer) throws DecodingException {
+    return new ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.READ, request.getRegionName(),
+        serializer.decode(request.getKey()).toString());
+  }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java
index 3f43e0c..c882cbe 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutAllRequestOperationHandler.java
@@ -19,6 +19,7 @@ import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCod
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.apache.shiro.util.ThreadState;
 
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.Region;
@@ -33,6 +34,10 @@ import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.SerializationService;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
+import 
org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionAuthorizingStateProcessor;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.geode.security.ResourcePermission;
 
 @Experimental
 public class PutAllRequestOperationHandler
@@ -52,30 +57,65 @@ public class PutAllRequestOperationHandler
           "Region \"" + regionName + "\" not found");
     }
 
+    ThreadState threadState = null;
+    SecurityService securityService = 
messageExecutionContext.getCache().getSecurityService();
+    boolean perKeyAuthorization = false;
+    if (messageExecutionContext
+        .getConnectionStateProcessor() instanceof 
ProtobufConnectionAuthorizingStateProcessor) {
+      threadState = ((ProtobufConnectionAuthorizingStateProcessor) 
messageExecutionContext
+          .getConnectionStateProcessor()).prepareThreadForAuthorization();
+      // Check if authorized for entire region
+      try {
+        securityService.authorize(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+            ResourcePermission.Operation.WRITE, regionName));
+        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
+            .getConnectionStateProcessor()).restoreThreadState(threadState);
+        threadState = null;
+      } catch (NotAuthorizedException ex) {
+        // Not authorized for the region, have to check keys individually
+        perKeyAuthorization = true;
+      }
+    }
+    final boolean authorizeKeys = perKeyAuthorization; // Required for use in 
lambda
+
+    long startTime = messageExecutionContext.getStatistics().startOperation();
     RegionAPI.PutAllResponse.Builder builder = 
RegionAPI.PutAllResponse.newBuilder();
     try {
       
messageExecutionContext.getCache().setReadSerializedForCurrentThread(true);
 
-      putAllRequest.getEntryList().stream()
-          .forEach((entry) -> processSinglePut(builder, serializationService, 
region, entry));
+      putAllRequest.getEntryList().stream().forEach((entry) -> 
processSinglePut(builder,
+          serializationService, region, entry, securityService, 
authorizeKeys));
 
     } finally {
       
messageExecutionContext.getCache().setReadSerializedForCurrentThread(false);
+      if (threadState != null) {
+        ((ProtobufConnectionAuthorizingStateProcessor) messageExecutionContext
+            .getConnectionStateProcessor()).restoreThreadState(threadState);
+      }
     }
     return Success.of(builder.build());
   }
 
   private void processSinglePut(RegionAPI.PutAllResponse.Builder builder,
-      SerializationService serializationService, Region region, 
BasicTypes.Entry entry) {
+      SerializationService serializationService, Region region, 
BasicTypes.Entry entry,
+      SecurityService securityService, boolean authorizeKeys) {
     try {
-      Object decodedValue = serializationService.decode(entry.getValue());
+
       Object decodedKey = serializationService.decode(entry.getKey());
+      Object decodedValue = serializationService.decode(entry.getValue());
       if (decodedKey == null || decodedValue == null) {
         builder.addFailedKeys(
             buildKeyedError(entry, INVALID_REQUEST, "Key and value must both 
be non-NULL"));
       }
+      if (authorizeKeys) {
+        securityService.authorize(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+            ResourcePermission.Operation.WRITE, region.getName(), 
decodedKey.toString()));
+      }
       region.put(decodedKey, decodedValue);
 
+    } catch (NotAuthorizedException ex) {
+      builder.addFailedKeys(
+          buildKeyedError(entry, BasicTypes.ErrorCode.AUTHORIZATION_FAILED, 
"Unauthorized access"));
     } catch (DecodingException ex) {
       logger.info("Encoding not supported: " + ex);
       builder.addFailedKeys(this.buildKeyedError(entry, INVALID_REQUEST, 
"Encoding not supported"));
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java
index 1ea8bb3..df4c254 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/PutRequestOperationHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
+import org.apache.geode.security.ResourcePermission;
 
 @Experimental
 public class PutRequestOperationHandler
@@ -64,4 +65,11 @@ public class PutRequestOperationHandler
       return Failure.of(BasicTypes.ErrorCode.SERVER_ERROR, ex.toString());
     }
   }
+
+  public static ResourcePermission 
determineRequiredPermission(RegionAPI.PutRequest request,
+      ProtobufSerializationService serializer) throws DecodingException {
+    return new ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.WRITE, request.getRegionName(),
+        serializer.decode(request.getEntry().getKey()).toString());
+  }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java
index 18eeca9..b2ba6f7 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/RemoveRequestOperationHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
+import org.apache.geode.security.ResourcePermission;
 
 @Experimental
 public class RemoveRequestOperationHandler
@@ -57,4 +58,11 @@ public class RemoveRequestOperationHandler
 
     return Success.of(RegionAPI.RemoveResponse.newBuilder().build());
   }
+
+  public static ResourcePermission 
determineRequiredPermission(RegionAPI.RemoveRequest request,
+      ProtobufSerializationService serializer) throws DecodingException {
+    return new ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.WRITE, request.getRegionName(),
+        serializer.decode(request.getKey()).toString());
+  }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index 6a88e41..97a748e 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -21,6 +21,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.RegionAPI;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnGroupRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnMemberRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler;
@@ -33,6 +35,7 @@ import 
org.apache.geode.internal.protocol.protobuf.v1.operations.PutAllRequestOp
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.PutRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.RemoveRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.security.AuthenticationRequestOperationHandler;
+import org.apache.geode.management.internal.security.ResourcePermissions;
 import org.apache.geode.security.ResourcePermission;
 
 @Experimental
@@ -49,68 +52,70 @@ public class ProtobufOperationContextRegistry {
     return operationContexts.get(apiCase);
   }
 
+  private final ResourcePermission noneRequired =
+      new ResourcePermission(ResourcePermission.NULL, ResourcePermission.NULL);
+
+  private ResourcePermission skipAuthorizationCheck(Object unused,
+      ProtobufSerializationService unused2) {
+    return noneRequired;
+  }
+
   private void addContexts() {
     
operationContexts.put(ClientProtocol.Message.MessageTypeCase.AUTHENTICATIONREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getAuthenticationRequest,
             new AuthenticationRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setAuthenticationResponse(opsResp),
-            new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
+            this::skipAuthorizationCheck));
 
     operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getGetRequest,
             new GetRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setGetResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.READ)));
+            GetRequestOperationHandler::determineRequiredPermission));
 
     operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETALLREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getGetAllRequest,
             new GetAllRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setGetAllResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.READ)));
+            // May require per-key checks, will be handled by OperationHandler
+            this::skipAuthorizationCheck));
 
     operationContexts.put(ClientProtocol.Message.MessageTypeCase.PUTREQUEST,
         new ProtobufOperationContext<>(ClientProtocol.Message::getPutRequest,
             new PutRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setPutResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.WRITE)));
+            PutRequestOperationHandler::determineRequiredPermission));
 
     operationContexts.put(ClientProtocol.Message.MessageTypeCase.PUTALLREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getPutAllRequest,
             new PutAllRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setPutAllResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.WRITE)));
+            // May require per-key checks, will be handled by OperationHandler
+            this::skipAuthorizationCheck));
 
     operationContexts.put(ClientProtocol.Message.MessageTypeCase.REMOVEREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getRemoveRequest,
             new RemoveRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setRemoveResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.WRITE)));
+            RemoveRequestOperationHandler::determineRequiredPermission));
 
     
operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETREGIONNAMESREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getGetRegionNamesRequest,
             new GetRegionNamesRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setGetRegionNamesResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.READ)));
+            ResourcePermissions.DATA_READ));
 
     
operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETREGIONREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getGetRegionRequest,
             new GetRegionRequestOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setGetRegionResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.DATA,
-                ResourcePermission.Operation.READ)));
+            ResourcePermissions.DATA_READ));
 
     
operationContexts.put(ClientProtocol.Message.MessageTypeCase.GETSERVERREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getGetServerRequest,
             new GetServerOperationHandler(),
             opsResp -> 
ClientProtocol.Message.newBuilder().setGetServerResponse(opsResp),
-            new ResourcePermission(ResourcePermission.Resource.CLUSTER,
-                ResourcePermission.Operation.READ)));
+            ResourcePermissions.DATA_READ));
 
     
operationContexts.put(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONREGIONREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnRegionRequest,
@@ -119,7 +124,7 @@ public class ProtobufOperationContextRegistry {
                 .setExecuteFunctionOnRegionResponse(opsResp),
             // Resource permissions get handled per-function, since they have 
varying permission
             // requirements.
-            new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
+            this::skipAuthorizationCheck));
 
     
operationContexts.put(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONMEMBERREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnMemberRequest,
@@ -128,7 +133,7 @@ public class ProtobufOperationContextRegistry {
                 .setExecuteFunctionOnMemberResponse(opsResp),
             // Resource permissions get handled per-function, since they have 
varying permission
             // requirements.
-            new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
+            this::skipAuthorizationCheck));
 
     
operationContexts.put(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPREQUEST,
         new 
ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnGroupRequest,
@@ -137,6 +142,6 @@ public class ProtobufOperationContextRegistry {
                 .setExecuteFunctionOnGroupResponse(opsResp),
             // Resource permissions get handled per-function, since they have 
varying permission
             // requirements.
-            new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
+            this::skipAuthorizationCheck));
   }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/LegacySecurityProtobufConnectionStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/LegacySecurityProtobufConnectionStateProcessor.java
index e996dfd..c3afb04 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/LegacySecurityProtobufConnectionStateProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/LegacySecurityProtobufConnectionStateProcessor.java
@@ -17,13 +17,15 @@ package 
org.apache.geode.internal.protocol.protobuf.v1.state;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 
 public class LegacySecurityProtobufConnectionStateProcessor
     implements ProtobufConnectionStateProcessor {
   @Override
-  public void validateOperation(MessageExecutionContext messageContext,
-      ProtobufOperationContext operationContext) throws 
ConnectionStateException {
+  public void validateOperation(Object message, ProtobufSerializationService 
serializer,
+      MessageExecutionContext messageContext, ProtobufOperationContext 
operationContext)
+      throws ConnectionStateException {
     throw new 
ConnectionStateException(BasicTypes.ErrorCode.AUTHENTICATION_FAILED,
         "Attempting to authenticate incoming protobuf message using legacy 
security implementation. This is not supported. Failing authentication.");
   }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/NoSecurityProtobufConnectionStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/NoSecurityProtobufConnectionStateProcessor.java
index d373f11..3f8e88e 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/NoSecurityProtobufConnectionStateProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/NoSecurityProtobufConnectionStateProcessor.java
@@ -17,13 +17,15 @@ package 
org.apache.geode.internal.protocol.protobuf.v1.state;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 
 public class NoSecurityProtobufConnectionStateProcessor
     implements ProtobufConnectionStateProcessor {
   @Override
-  public void validateOperation(MessageExecutionContext messageContext,
-      ProtobufOperationContext operationContext) throws 
ConnectionStateException {}
+  public void validateOperation(Object message, ProtobufSerializationService 
serializer,
+      MessageExecutionContext messageContext, ProtobufOperationContext 
operationContext)
+      throws ConnectionStateException {}
 
   public ProtobufConnectionAuthenticatingStateProcessor allowAuthentication()
       throws ConnectionStateException {
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java
index 4e030ee..f2dabaf 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthenticatingStateProcessor.java
@@ -21,6 +21,7 @@ import org.apache.shiro.subject.Subject;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.security.AuthenticationRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 import org.apache.geode.internal.security.SecurityService;
@@ -35,8 +36,9 @@ public class ProtobufConnectionAuthenticatingStateProcessor
   }
 
   @Override
-  public void validateOperation(MessageExecutionContext messageContext,
-      ProtobufOperationContext operationContext) throws 
ConnectionStateException {
+  public void validateOperation(Object message, ProtobufSerializationService 
serializer,
+      MessageExecutionContext messageContext, ProtobufOperationContext 
operationContext)
+      throws ConnectionStateException {
     if (!(operationContext
         .getOperationHandler() instanceof 
AuthenticationRequestOperationHandler)) {
       throw new 
ConnectionStateException(BasicTypes.ErrorCode.AUTHENTICATION_FAILED,
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java
index f3590d5..356ce5b 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionAuthorizingStateProcessor.java
@@ -20,6 +20,8 @@ import org.apache.shiro.util.ThreadState;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.OperationNotAuthorizedException;
 import org.apache.geode.internal.security.SecurityService;
@@ -37,11 +39,13 @@ public class ProtobufConnectionAuthorizingStateProcessor
   }
 
   @Override
-  public void validateOperation(MessageExecutionContext messageContext,
-      ProtobufOperationContext operationContext) throws 
ConnectionStateException {
+  public void validateOperation(Object message, ProtobufSerializationService 
serializer,
+      MessageExecutionContext messageContext, ProtobufOperationContext 
operationContext)
+      throws ConnectionStateException, DecodingException {
     ThreadState threadState = securityService.bindSubject(subject);
     try {
-      
securityService.authorize(operationContext.getAccessPermissionRequired());
+      securityService.authorize(operationContext.getAccessPermissionRequired(
+          operationContext.getFromRequest().apply(message), serializer));
     } catch (NotAuthorizedException e) {
       messageContext.getStatistics().incAuthorizationViolations();
       throw new 
OperationNotAuthorizedException(BasicTypes.ErrorCode.AUTHORIZATION_FAILED,
@@ -57,4 +61,12 @@ public class ProtobufConnectionAuthorizingStateProcessor
     throw new 
ConnectionStateException(BasicTypes.ErrorCode.ALREADY_AUTHENTICATED,
         "The user has already been authenticated for this connection. 
Re-authentication is not supported at this time.");
   }
+
+  public ThreadState prepareThreadForAuthorization() {
+    return securityService.bindSubject(subject);
+  }
+
+  public void restoreThreadState(ThreadState state) {
+    state.restore();
+  }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
index b28d62d..3886680 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionHandshakeStateProcessor.java
@@ -23,6 +23,7 @@ import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ProtocolVersionHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 import org.apache.geode.internal.security.SecurityService;
@@ -35,8 +36,9 @@ public class ProtobufConnectionHandshakeStateProcessor 
implements ProtobufConnec
   }
 
   @Override
-  public void validateOperation(MessageExecutionContext messageContext,
-      ProtobufOperationContext operationContext) throws 
ConnectionStateException {
+  public void validateOperation(Object message, ProtobufSerializationService 
serializer,
+      MessageExecutionContext messageContext, ProtobufOperationContext 
operationContext)
+      throws ConnectionStateException {
     throw new ConnectionStateException(BasicTypes.ErrorCode.INVALID_REQUEST,
         "Connection processing should never be asked to validate an 
operation");
   }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionStateProcessor.java
index 1898f59..c0254c4 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionStateProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionStateProcessor.java
@@ -21,6 +21,8 @@ import java.io.OutputStream;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.DecodingException;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 
 /**
@@ -35,8 +37,9 @@ public interface ProtobufConnectionStateProcessor {
    *         connection is in the state contained in the provided 
messageContext. Otherwise, does
    *         nothing.
    */
-  void validateOperation(MessageExecutionContext messageContext,
-      ProtobufOperationContext operationContext) throws 
ConnectionStateException;
+  void validateOperation(Object message, ProtobufSerializationService 
serializer,
+      MessageExecutionContext messageContext, ProtobufOperationContext 
operationContext)
+      throws ConnectionStateException, DecodingException;
 
   /**
    * This indicates whether this specific state processor is able to handle 
authentication requests.
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionTerminatingStateProcessor.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionTerminatingStateProcessor.java
index e884e2c..40f37e3 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionTerminatingStateProcessor.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/state/ProtobufConnectionTerminatingStateProcessor.java
@@ -17,13 +17,15 @@ package 
org.apache.geode.internal.protocol.protobuf.v1.state;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 
 public class ProtobufConnectionTerminatingStateProcessor
     implements ProtobufConnectionStateProcessor {
   @Override
-  public void validateOperation(MessageExecutionContext messageContext,
-      ProtobufOperationContext operationContext) throws 
ConnectionStateException {
+  public void validateOperation(Object message, ProtobufSerializationService 
serializer,
+      MessageExecutionContext messageContext, ProtobufOperationContext 
operationContext)
+      throws ConnectionStateException {
     throw new ConnectionStateException(BasicTypes.ErrorCode.SERVER_ERROR,
         "This connection has been marked as terminating.");
   }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
index b0429bd..2eaed1e 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/AuthorizationIntegrationTest.java
@@ -15,17 +15,18 @@
 package org.apache.geode.internal.protocol.protobuf.v1;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.same;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.awaitility.Awaitility;
@@ -45,6 +46,8 @@ import 
org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtoco
 import 
org.apache.geode.internal.protocol.protobuf.v1.serializer.exception.InvalidProtocolMessageException;
 import 
org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
 import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.management.internal.security.ResourcePermissions;
+import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.ResourcePermission;
 import org.apache.geode.security.SecurityManager;
 import org.apache.geode.test.junit.categories.IntegrationTest;
@@ -54,7 +57,11 @@ public class AuthorizationIntegrationTest {
 
   private static final String TEST_USERNAME = "bob";
   private static final String TEST_PASSWORD = "bobspassword";
-  public static final String TEST_REGION = "testRegion";
+
+  private static final String TEST_REGION1 = "testRegion1";
+  private static final String TEST_REGION2 = "testRegion2";
+  private static final String TEST_KEY1 = "testKey1";
+  private static final String TEST_KEY2 = "testKey2";
 
   @Rule
   public final RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
@@ -68,11 +75,47 @@ public class AuthorizationIntegrationTest {
   private InputStream inputStream;
   private ProtobufProtocolSerializer protobufProtocolSerializer;
   private Object securityPrincipal;
-  private SecurityManager mockSecurityManager;
-  public static final ResourcePermission READ_PERMISSION =
-      new ResourcePermission(ResourcePermission.Resource.DATA, 
ResourcePermission.Operation.READ);
-  public static final ResourcePermission WRITE_PERMISSION =
-      new ResourcePermission(ResourcePermission.Resource.DATA, 
ResourcePermission.Operation.WRITE);
+  private TestSecurityManager securityManager;
+
+  // Read access to all regions, all keys
+  public static final ResourcePermission READ_PERMISSION = 
ResourcePermissions.DATA_READ;
+
+  // Write access to all regions, all keys
+  public static final ResourcePermission WRITE_PERMISSION = 
ResourcePermissions.DATA_WRITE;
+
+  private class TestSecurityManager implements SecurityManager {
+    private Set<ResourcePermission> allowedPermissions = new HashSet<>();
+
+    void addAllowedPermission(ResourcePermission permission) {
+      allowedPermissions.add(permission);
+    }
+
+    @Override
+    public Object authenticate(Properties credentials) throws 
AuthenticationFailedException {
+      return securityPrincipal;
+    }
+
+    @Override
+    public boolean authorize(Object principal, ResourcePermission permission) {
+      // Only allow data operations and only from the expected principal
+      if (principal != securityPrincipal
+          || permission.getResource() != ResourcePermission.Resource.DATA) {
+        return false;
+      }
+      // Succeed if user has permission for all regions and all keys for the 
given operation
+      if (allowedPermissions.contains(
+          new ResourcePermission(ResourcePermission.Resource.DATA, 
permission.getOperation()))) {
+        return true;
+      }
+      // Succeed if user has permission for all keys in the given region for 
the given operation
+      if (allowedPermissions.contains(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+          permission.getOperation(), permission.getTarget()))) {
+        return true;
+      }
+
+      return allowedPermissions.contains(permission);
+    }
+  }
 
   @Before
   public void setUp() throws IOException, InvalidProtocolMessageException {
@@ -81,14 +124,13 @@ public class AuthorizationIntegrationTest {
     expectedAuthProperties.setProperty(ResourceConstants.PASSWORD, 
TEST_PASSWORD);
 
     securityPrincipal = "mockSecurityPrincipal";
-    mockSecurityManager = mock(SecurityManager.class);
-    
when(mockSecurityManager.authenticate(expectedAuthProperties)).thenReturn(securityPrincipal);
+    securityManager = new TestSecurityManager();
 
     Properties properties = new Properties();
     CacheFactory cacheFactory = new CacheFactory(properties);
     cacheFactory.set("mcast-port", "0"); // sometimes it isn't due to other 
tests.
 
-    cacheFactory.setSecurityManager(mockSecurityManager);
+    cacheFactory.setSecurityManager(securityManager);
     cache = cacheFactory.create();
 
     cacheServer = cache.addCacheServer();
@@ -96,7 +138,8 @@ public class AuthorizationIntegrationTest {
     cacheServer.setPort(cacheServerPort);
     cacheServer.start();
 
-    cache.createRegionFactory().create(TEST_REGION);
+    cache.createRegionFactory().create(TEST_REGION1);
+    cache.createRegionFactory().create(TEST_REGION2);
 
     System.setProperty("geode.feature-protobuf-protocol", "true");
     System.setProperty("geode.protocol-authentication-mode", "SIMPLE");
@@ -109,8 +152,6 @@ public class AuthorizationIntegrationTest {
     serializationService = new ProtobufSerializationService();
     protobufProtocolSerializer = new ProtobufProtocolSerializer();
 
-    when(mockSecurityManager.authorize(same(securityPrincipal), 
any())).thenReturn(false);
-
     MessageUtil.performAndVerifyHandshake(socket);
 
     ClientProtocol.Message authenticationRequest = 
ClientProtocol.Message.newBuilder()
@@ -137,57 +178,167 @@ public class AuthorizationIntegrationTest {
 
   @Test
   public void validateNoPermissions() throws Exception {
-    when(mockSecurityManager.authorize(securityPrincipal, 
READ_PERMISSION)).thenReturn(false);
-    when(mockSecurityManager.authorize(securityPrincipal, 
WRITE_PERMISSION)).thenReturn(false);
-
-    verifyOperations(false, false);
+    verifyLocatorOperation(false);
+    verifyOperations(false, false, TEST_REGION1, TEST_KEY1);
   }
 
   @Test
   public void validateWritePermission() throws Exception {
-    when(mockSecurityManager.authorize(securityPrincipal, 
READ_PERMISSION)).thenReturn(false);
-    when(mockSecurityManager.authorize(securityPrincipal, 
WRITE_PERMISSION)).thenReturn(true);
+    securityManager.addAllowedPermission(WRITE_PERMISSION);
 
-    verifyOperations(false, true);
+    verifyLocatorOperation(false);
+    verifyOperations(false, true, TEST_REGION1, TEST_KEY1);
   }
 
   @Test
   public void validateReadPermission() throws Exception {
-    when(mockSecurityManager.authorize(securityPrincipal, 
READ_PERMISSION)).thenReturn(true);
-    when(mockSecurityManager.authorize(securityPrincipal, 
WRITE_PERMISSION)).thenReturn(false);
+    securityManager.addAllowedPermission(READ_PERMISSION);
 
-    verifyOperations(true, false);
+    verifyLocatorOperation(true);
+    verifyOperations(true, false, TEST_REGION1, TEST_KEY1);
   }
 
   @Test
   public void validateReadAndWritePermission() throws Exception {
-    when(mockSecurityManager.authorize(securityPrincipal, 
READ_PERMISSION)).thenReturn(true);
-    when(mockSecurityManager.authorize(securityPrincipal, 
WRITE_PERMISSION)).thenReturn(true);
+    securityManager.addAllowedPermission(WRITE_PERMISSION);
+    securityManager.addAllowedPermission(READ_PERMISSION);
+
+    verifyLocatorOperation(true);
+    verifyOperations(true, true, TEST_REGION1, TEST_KEY1);
+  }
 
-    verifyOperations(true, true);
+  @Test
+  public void validateRegionLevelPermissions() throws Exception {
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.WRITE, TEST_REGION1));
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.READ, TEST_REGION2));
+
+    verifyOperations(false, true, TEST_REGION1, TEST_KEY1);
+    verifyOperations(false, true, TEST_REGION1, TEST_KEY2);
+    verifyOperations(true, false, TEST_REGION2, TEST_KEY1);
+    verifyOperations(true, false, TEST_REGION2, TEST_KEY2);
   }
 
-  private void verifyOperations(boolean readAllowed, boolean writeAllowed) 
throws Exception {
+  @Test
+  public void validateKeyLevelPermissions() throws Exception {
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.WRITE, TEST_REGION1, TEST_KEY1));
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.READ, TEST_REGION2, TEST_KEY2));
+
+    verifyOperations(false, true, TEST_REGION1, TEST_KEY1);
+    verifyOperations(false, false, TEST_REGION1, TEST_KEY2);
+    verifyOperations(false, false, TEST_REGION2, TEST_KEY1);
+    verifyOperations(true, false, TEST_REGION2, TEST_KEY2);
+  }
+
+  @Test
+  public void validateRegionLevelPermissionsOnBatchOperations() throws 
Exception {
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.WRITE, TEST_REGION1));
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.READ, TEST_REGION2));
+
+    verifyBatchOperation(true, TEST_REGION1, false, false);
+    verifyBatchOperation(false, TEST_REGION1, true, true);
+    verifyBatchOperation(true, TEST_REGION2, true, true);
+    verifyBatchOperation(false, TEST_REGION2, false, false);
+  }
+
+  @Test
+  public void validateKeyLevelPermissionsOnBatchOperations() throws Exception {
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.WRITE, TEST_REGION1, TEST_KEY1));
+    securityManager.addAllowedPermission(new 
ResourcePermission(ResourcePermission.Resource.DATA,
+        ResourcePermission.Operation.READ, TEST_REGION2, TEST_KEY2));
+
+    verifyBatchOperation(true, TEST_REGION1, false, false);
+    verifyBatchOperation(false, TEST_REGION1, true, false);
+    verifyBatchOperation(true, TEST_REGION2, false, true);
+    verifyBatchOperation(false, TEST_REGION2, false, false);
+  }
+
+  private void verifyBatchOperation(boolean testRead, String region, boolean 
expectedKey1Success,
+      boolean expectedKey2Success) throws Exception {
+    ClientProtocol.Message request;
+    if (testRead) {
+      request = ClientProtocol.Message.newBuilder()
+          
.setGetAllRequest(RegionAPI.GetAllRequest.newBuilder().setRegionName(region)
+              .addKey(serializationService.encode(TEST_KEY1))
+              .addKey(serializationService.encode(TEST_KEY2)))
+          .build();
+    } else {
+      request = 
ClientProtocol.Message.newBuilder().setPutAllRequest(RegionAPI.PutAllRequest
+          .newBuilder().setRegionName(region)
+          .addEntry(ProtobufUtilities.createEntry(serializationService, 
TEST_KEY1, "TEST_VALUE"))
+          .addEntry(ProtobufUtilities.createEntry(serializationService, 
TEST_KEY2, "TEST_VALUE")))
+          .build();
+    }
+
+    protobufProtocolSerializer.serialize(request, outputStream);
+    ClientProtocol.Message response = 
protobufProtocolSerializer.deserialize(inputStream);
+    assertNotEquals(ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE,
+        response.getMessageTypeCase());
+
+    List<BasicTypes.KeyedError> keyedErrors =
+        testRead ? response.getGetAllResponse().getFailuresList()
+            : response.getPutAllResponse().getFailedKeysList();
+    String operation = testRead ? "getAll" : "putAll";
+    if (errorListContainsKey(keyedErrors, 
serializationService.encode(TEST_KEY1))) {
+      if (expectedKey1Success) {
+        fail("Unexpectedly failed " + operation + " operation for key " + 
TEST_KEY1);
+      }
+    } else if (!expectedKey1Success) {
+      fail("Unexpected success in " + operation + " operation for key " + 
TEST_KEY1);
+    }
+    if (errorListContainsKey(keyedErrors, 
serializationService.encode(TEST_KEY2))) {
+      if (expectedKey2Success) {
+        fail("Unexpectedly failed " + operation + " operation for key " + 
TEST_KEY2);
+      }
+    } else if (!expectedKey2Success) {
+      fail("Unexpected success in " + operation + " operation for key " + 
TEST_KEY2);
+    }
+  }
+
+  private boolean errorListContainsKey(List<BasicTypes.KeyedError> errors,
+      BasicTypes.EncodedValue key) {
+    for (BasicTypes.KeyedError error : errors) {
+      if (error.getKey().equals(key)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void verifyLocatorOperation(boolean readAllowed) throws Exception {
     ClientProtocol.Message getRegionsMessage = 
ClientProtocol.Message.newBuilder()
         
.setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder()).build();
     validateOperationAuthorized(getRegionsMessage, inputStream, outputStream,
         readAllowed ? 
ClientProtocol.Message.MessageTypeCase.GETREGIONNAMESRESPONSE
             : ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE);
+  }
+
+  private void verifyOperations(boolean readAllowed, boolean writeAllowed, 
String region,
+      String key) throws Exception {
+    ClientProtocol.Message getMessage =
+        
ClientProtocol.Message.newBuilder().setGetRequest(RegionAPI.GetRequest.newBuilder()
+            
.setRegionName(region).setKey(serializationService.encode(key))).build();
+    validateOperationAuthorized(getMessage, inputStream, outputStream,
+        readAllowed ? ClientProtocol.Message.MessageTypeCase.GETRESPONSE
+            : ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE);
 
-    ClientProtocol.Message putMessage =
-        ClientProtocol.Message.newBuilder()
-            
.setPutRequest(RegionAPI.PutRequest.newBuilder().setRegionName(TEST_REGION).setEntry(
-                ProtobufUtilities.createEntry(serializationService, 
"TEST_KEY", "TEST_VALUE")))
-            .build();
+    ClientProtocol.Message putMessage = ClientProtocol.Message.newBuilder()
+        .setPutRequest(RegionAPI.PutRequest.newBuilder().setRegionName(region)
+            .setEntry(ProtobufUtilities.createEntry(serializationService, key, 
"TEST_VALUE")))
+        .build();
     validateOperationAuthorized(putMessage, inputStream, outputStream,
         writeAllowed ? ClientProtocol.Message.MessageTypeCase.PUTRESPONSE
             : ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE);
 
     ClientProtocol.Message removeMessage =
-        ClientProtocol.Message
-            .newBuilder().setRemoveRequest(RegionAPI.RemoveRequest.newBuilder()
-                
.setRegionName(TEST_REGION).setKey(serializationService.encode("TEST_KEY")))
-            .build();
+        
ClientProtocol.Message.newBuilder().setRemoveRequest(RegionAPI.RemoveRequest.newBuilder()
+            
.setRegionName(region).setKey(serializationService.encode(key))).build();
     validateOperationAuthorized(removeMessage, inputStream, outputStream,
         writeAllowed ? ClientProtocol.Message.MessageTypeCase.REMOVERESPONSE
             : ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE);

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to