This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new b7c4bd8  merge from feature/GEODE-3643
b7c4bd8 is described below

commit b7c4bd835f21342f686cf86d4ce2f073be2f7086
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Fri Feb 9 08:48:55 2018 -0800

    merge from feature/GEODE-3643
    
    commit 026c32604e932659c348a5f1078a972c98ef8ef9
    commit f82f458469ddb0621d4a8c4ce9305efd7969aee3
    
        GEODE-3643: Add function execution on specific member
    
        This implements function execution onMember and onMembers.  Members are
        identified by their names - see ConfigurationProperties.NAME.
    
    This closes #1409
---
 .../internal/ClusterDistributionManager.java       |  13 ++
 .../distributed/internal/DistributionManager.java  |   5 +
 .../internal/LonerDistributionManager.java         |  27 ++-
 .../src/main/proto/v1/clientProtocol.proto         |   7 +-
 .../src/main/proto/v1/function_API.proto           |  13 +-
 ...teFunctionOnMemberRequestOperationHandler.java} |  84 +++++----
 ...uteFunctionOnRegionRequestOperationHandler.java |  12 +-
 .../registry/ProtobufOperationContextRegistry.java |  10 ++
 ...=> ExecuteFunctionOnMemberIntegrationTest.java} |  81 ++++-----
 ...=> ExecuteFunctionOnRegionIntegrationTest.java} |   7 +-
 ...onOnMemberRequestOperationHandlerJUnitTest.java | 189 +++++++++++++++++++++
 11 files changed, 346 insertions(+), 102 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
index 103baf8..ba87f52 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1451,6 +1452,18 @@ public class ClusterDistributionManager implements 
DistributionManager {
   }
 
 
+  @Override
+  public DistributedMember getMemberWithName(String name) {
+    for (DistributedMember id : members.values()) {
+      if (Objects.equals(id.getName(), name)) {
+        return id;
+      }
+    }
+    if (Objects.equals(localAddress, name)) {
+      return localAddress;
+    }
+    return null;
+  }
 
   /**
    * Returns the id of this distribution manager.
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
index ade3daa..d05d248 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java
@@ -489,4 +489,9 @@ public interface DistributionManager extends ReplySender {
   boolean exceptionInThreads();
 
   void clearExceptionInThreads();
+
+  /**
+   * returns the ID of a member having the given name, or null if no such 
member exists
+   */
+  DistributedMember getMemberWithName(String name);
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
index cde316a..c5b2f41 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LonerDistributionManager.java
@@ -62,8 +62,8 @@ public class LonerDistributionManager implements 
DistributionManager {
   public LonerDistributionManager(InternalDistributedSystem system, 
InternalLogWriter logger) {
     this.system = system;
     this.logger = logger;
-    this.id = generateMemberId();
-    this.allIds = Collections.singleton(id);
+    this.localAddress = generateMemberId();
+    this.allIds = Collections.singleton(localAddress);
     this.viewMembers = new ArrayList<InternalDistributedMember>(allIds);
     DistributionStats.enableClockStats = 
this.system.getConfig().getEnableTimeStatistics();
   }
@@ -83,7 +83,7 @@ public class LonerDistributionManager implements 
DistributionManager {
     }
   }
 
-  private final InternalDistributedMember id;
+  private final InternalDistributedMember localAddress;
 
   /*
    * static { // Make the id a little unique String host; try { host =
@@ -112,7 +112,7 @@ public class LonerDistributionManager implements 
DistributionManager {
   }
 
   public InternalDistributedMember getDistributionManagerId() {
-    return id;
+    return localAddress;
   }
 
   public Set getDistributionManagerIds() {
@@ -136,6 +136,19 @@ public class LonerDistributionManager implements 
DistributionManager {
     return iid;
   }
 
+  @Override
+  public DistributedMember getMemberWithName(String name) {
+    for (DistributedMember id : canonicalIds.values()) {
+      if (Objects.equals(id.getName(), name)) {
+        return id;
+      }
+    }
+    if (Objects.equals(localAddress.getName(), name)) {
+      return localAddress;
+    }
+    return null;
+  }
+
   public Set getOtherDistributionManagerIds() {
     return Collections.EMPTY_SET;
   }
@@ -1154,17 +1167,17 @@ public class LonerDistributionManager implements 
DistributionManager {
 
   /** Returns count of members filling the specified role */
   public int getRoleCount(Role role) {
-    return id.getRoles().contains(role) ? 1 : 0;
+    return localAddress.getRoles().contains(role) ? 1 : 0;
   }
 
   /** Returns true if at least one member is filling the specified role */
   public boolean isRolePresent(Role role) {
-    return id.getRoles().contains(role);
+    return localAddress.getRoles().contains(role);
   }
 
   /** Returns a set of all roles currently in the distributed system. */
   public Set getAllRoles() {
-    return id.getRoles();
+    return localAddress.getRoles();
   }
 
   private int lonerPort = 0;
diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto 
b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
index 0a899ee..13498d4 100644
--- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
@@ -58,8 +58,11 @@ message Message {
         ExecuteFunctionOnRegionRequest executeFunctionOnRegionRequest = 18;
         ExecuteFunctionOnRegionResponse executeFunctionOnRegionResponse= 19;
 
-        AuthenticationRequest authenticationRequest = 20;
-        AuthenticationResponse authenticationResponse = 21;
+        ExecuteFunctionOnMemberRequest executeFunctionOnMemberRequest = 20;
+        ExecuteFunctionOnMemberResponse executeFunctionOnMemberResponse= 21;
+
+        AuthenticationRequest authenticationRequest = 22;
+        AuthenticationResponse authenticationResponse = 23;
     }
 }
 
diff --git a/geode-protobuf-messages/src/main/proto/v1/function_API.proto 
b/geode-protobuf-messages/src/main/proto/v1/function_API.proto
index 77d2280..9558e72 100644
--- a/geode-protobuf-messages/src/main/proto/v1/function_API.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/function_API.proto
@@ -25,5 +25,16 @@ message ExecuteFunctionOnRegionRequest {
 }
 
 message ExecuteFunctionOnRegionResponse {
-    repeated EncodedValue results = 1; // some functions don't return 
arguments.
+    repeated EncodedValue results = 1; // some functions don't return results.
 }
+
+message ExecuteFunctionOnMemberRequest {
+    string functionID = 1;
+    repeated string memberName = 2;
+    EncodedValue arguments = 3;
+}
+
+message ExecuteFunctionOnMemberResponse {
+    repeated EncodedValue results = 1; // some functions don't return results.
+}
+
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
similarity index 66%
copy from 
geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
copy to 
geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
index 932db99..4519f5a 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
@@ -18,16 +18,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.logging.log4j.Logger;
+import com.google.protobuf.ProtocolStringList;
 
-import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
@@ -42,31 +43,23 @@ import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.Connection
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.NotAuthorizedException;
 
-public class ExecuteFunctionOnRegionRequestOperationHandler implements
-    ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnRegionRequest, 
FunctionAPI.ExecuteFunctionOnRegionResponse> {
+public class ExecuteFunctionOnMemberRequestOperationHandler implements
+    ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnMemberRequest, 
FunctionAPI.ExecuteFunctionOnMemberResponse> {
   @Override
-  public Result<FunctionAPI.ExecuteFunctionOnRegionResponse, 
ClientProtocol.ErrorResponse> process(
+  public Result<FunctionAPI.ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse> process(
       ProtobufSerializationService serializationService,
-      FunctionAPI.ExecuteFunctionOnRegionRequest request,
-      MessageExecutionContext messageExecutionContext)
-      throws InvalidExecutionContextException, ConnectionStateException {
+      FunctionAPI.ExecuteFunctionOnMemberRequest request,
+      MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
 
     final String functionID = request.getFunctionID();
-    final String regionName = request.getRegion();
 
     final Function<?> function = FunctionService.getFunction(functionID);
     if (function == null) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
           
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
-              .setMessage("Function with ID \"" + functionID + "\" not 
found").build())
-          .build());
-    }
-
-    final Region<Object, Object> region = 
messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
-              .setMessage("Region \"" + regionName + "\" not found"))
+              
.setMessage(LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
+                  .toLocalizedString(functionID))
+              .build())
           .build());
     }
 
@@ -74,7 +67,8 @@ public class ExecuteFunctionOnRegionRequestOperationHandler 
implements
 
     try {
       // check security for function.
-      
function.getRequiredPermissions(regionName).forEach(securityService::authorize);
+      final String noRegion = null;
+      
function.getRequiredPermissions(noRegion).forEach(securityService::authorize);
     } catch (NotAuthorizedException ex) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
           .setError(BasicTypes.Error.newBuilder()
@@ -83,8 +77,38 @@ public class ExecuteFunctionOnRegionRequestOperationHandler 
implements
           .build());
     }
 
+    ProtocolStringList memberNameList = request.getMemberNameList();
+
+    Set<DistributedMember> memberIds = new HashSet<>(memberNameList.size());
+    DistributionManager distributionManager =
+        messageExecutionContext.getCache().getDistributionManager();
+    for (String name : memberNameList) {
+      DistributedMember member = distributionManager.getMemberWithName(name);
+      if (member == null) {
+        return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+            .setError(BasicTypes.Error.newBuilder()
+                .setMessage("Member " + name + " not found to execute \"" + 
functionID + "\"")
+                .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER))
+            .build());
+      }
+      memberIds.add(member);
+    }
+
+    if (memberIds.isEmpty()) {
+      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+          .setError(BasicTypes.Error.newBuilder()
+              .setMessage("No members found to execute \"" + functionID + "\"")
+              .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER))
+          .build());
+    }
+
     try {
-      Execution execution = FunctionService.onRegion(region);
+      Execution execution;
+      if (memberIds.size() == 1) {
+        execution = FunctionService.onMember(memberIds.iterator().next());
+      } else {
+        execution = FunctionService.onMembers(memberIds);
+      }
 
       final Object arguments = 
serializationService.decode(request.getArguments());
 
@@ -92,22 +116,20 @@ public class 
ExecuteFunctionOnRegionRequestOperationHandler implements
         execution = execution.setArguments(arguments);
       }
 
-      execution = execution.withFilter(parseFilter(serializationService, 
request));
-
       final ResultCollector<Object, List<Object>> resultCollector = 
execution.execute(functionID);
 
       if (function.hasResult()) {
         List<Object> results = resultCollector.getResult();
 
-        final FunctionAPI.ExecuteFunctionOnRegionResponse.Builder 
responseMessage =
-            FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder();
+        final FunctionAPI.ExecuteFunctionOnMemberResponse.Builder 
responseMessage =
+            FunctionAPI.ExecuteFunctionOnMemberResponse.newBuilder();
         for (Object result : results) {
           responseMessage.addResults(serializationService.encode(result));
         }
         return Success.of(responseMessage.build());
       } else {
         // This is fire and forget.
-        return 
Success.of(FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder().build());
+        return 
Success.of(FunctionAPI.ExecuteFunctionOnMemberResponse.newBuilder().build());
       }
     } catch (FunctionException ex) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
@@ -122,14 +144,4 @@ public class 
ExecuteFunctionOnRegionRequestOperationHandler implements
     }
   }
 
-  private Set<Object> parseFilter(ProtobufSerializationService 
serializationService,
-      FunctionAPI.ExecuteFunctionOnRegionRequest request) throws 
EncodingException {
-    List<BasicTypes.EncodedValue> encodedFilter = request.getKeyFilterList();
-    Set<Object> filter = new HashSet<>();
-
-    for (BasicTypes.EncodedValue filterKey : encodedFilter) {
-      filter.add(serializationService.decode(filterKey));
-    }
-    return filter;
-  }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
index 932db99..17e9f09 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
@@ -18,8 +18,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.logging.log4j.Logger;
-
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
@@ -27,7 +25,7 @@ import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
@@ -38,7 +36,6 @@ import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationServi
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
-import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.NotAuthorizedException;
 
@@ -48,8 +45,7 @@ public class ExecuteFunctionOnRegionRequestOperationHandler 
implements
   public Result<FunctionAPI.ExecuteFunctionOnRegionResponse, 
ClientProtocol.ErrorResponse> process(
       ProtobufSerializationService serializationService,
       FunctionAPI.ExecuteFunctionOnRegionRequest request,
-      MessageExecutionContext messageExecutionContext)
-      throws InvalidExecutionContextException, ConnectionStateException {
+      MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
 
     final String functionID = request.getFunctionID();
     final String regionName = request.getRegion();
@@ -58,7 +54,9 @@ public class ExecuteFunctionOnRegionRequestOperationHandler 
implements
     if (function == null) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
           
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
-              .setMessage("Function with ID \"" + functionID + "\" not 
found").build())
+              
.setMessage(LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
+                  .toLocalizedString(functionID))
+              .build())
           .build());
     }
 
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index 43c4595..f9bbba1 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnMemberRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.GetAllRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionNamesRequestOperationHandler;
@@ -118,5 +119,14 @@ public class ProtobufOperationContextRegistry {
             // Resource permissions get handled per-function, since they have 
varying permission
             // requirements.
             new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
+
+    
operationContexts.put(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONMEMBERREQUEST,
+        new 
ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnMemberRequest,
+            new ExecuteFunctionOnMemberRequestOperationHandler(),
+            opsResp -> ClientProtocol.Message.newBuilder()
+                .setExecuteFunctionOnMemberResponse(opsResp),
+            // Resource permissions get handled per-function, since they have 
varying permission
+            // requirements.
+            new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
   }
 }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/FunctionExecutionIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnMemberIntegrationTest.java
similarity index 81%
copy from 
geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/FunctionExecutionIntegrationTest.java
copy to 
geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnMemberIntegrationTest.java
index 06611d4..b44a23f 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/FunctionExecutionIntegrationTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnMemberIntegrationTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
@@ -37,7 +38,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Cache;
@@ -59,21 +62,26 @@ import org.apache.geode.security.SecurityManager;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 @Category(IntegrationTest.class)
-public class FunctionExecutionIntegrationTest {
+public class ExecuteFunctionOnMemberIntegrationTest {
   private static final String TEST_REGION = "testRegion";
   private static final String TEST_FUNCTION_ID = "testFunction";
-  public static final String SECURITY_PRINCIPAL = "principle";
+  private static final String SECURITY_PRINCIPAL = "principle";
+  private static final String SERVER_NAME = "pericles";
   private ProtobufSerializationService serializationService;
   private Socket socket;
   private Cache cache;
   private SecurityManager securityManager;
 
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
   @Before
   public void setUp() throws Exception {
     CacheFactory cacheFactory = new CacheFactory(new Properties());
     cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
     cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, 
"false");
     cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, 
"false");
+    cacheFactory.set(ConfigurationProperties.NAME, SERVER_NAME);
 
     securityManager = mock(SecurityManager.class);
     cacheFactory.setSecurityManager(securityManager);
@@ -89,7 +97,7 @@ public class FunctionExecutionIntegrationTest {
 
     RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
     regionFactory.setDataPolicy(DataPolicy.PARTITION);
-    Region<Object, Object> testRegion = regionFactory.create(TEST_REGION);
+    regionFactory.create(TEST_REGION);
 
 
     System.setProperty("geode.feature-protobuf-protocol", "true");
@@ -165,12 +173,12 @@ public class FunctionExecutionIntegrationTest {
     final ClientProtocol.Message responseMessage = 
authenticateAndSendMessage();
 
     assertNotNull(responseMessage);
-    
assertEquals(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONREGIONRESPONSE,
+    
assertEquals(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONMEMBERRESPONSE,
         responseMessage.getMessageTypeCase());
-    final FunctionAPI.ExecuteFunctionOnRegionResponse 
executeFunctionOnRegionResponse =
-        responseMessage.getExecuteFunctionOnRegionResponse();
+    final FunctionAPI.ExecuteFunctionOnMemberResponse 
executeFunctionOnMemberResponse =
+        responseMessage.getExecuteFunctionOnMemberResponse();
 
-    assertEquals(0, executeFunctionOnRegionResponse.getResultsCount());
+    assertEquals(0, executeFunctionOnMemberResponse.getResultsCount());
 
     Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
testFunction.getContext() != null);
   }
@@ -182,13 +190,13 @@ public class FunctionExecutionIntegrationTest {
     FunctionService.registerFunction(testFunction);
     final ClientProtocol.Message responseMessage = 
authenticateAndSendMessage();
 
-    final FunctionAPI.ExecuteFunctionOnRegionResponse 
executeFunctionOnRegionResponse =
+    final FunctionAPI.ExecuteFunctionOnMemberResponse 
executeFunctionOnMemberResponse =
         getFunctionResponse(responseMessage);
 
-    assertEquals(1, executeFunctionOnRegionResponse.getResultsCount());
+    assertEquals(1, executeFunctionOnMemberResponse.getResultsCount());
 
     final Object responseValue =
-        
serializationService.decode(executeFunctionOnRegionResponse.getResults(0));
+        
serializationService.decode(executeFunctionOnMemberResponse.getResults(0));
     assertTrue(responseValue instanceof Integer);
     assertEquals(22, responseValue);
   }
@@ -217,6 +225,7 @@ public class FunctionExecutionIntegrationTest {
 
     final ClientProtocol.Message message = authenticateAndSendMessage();
 
+
     assertEquals(ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE,
         message.getMessageTypeCase());
     final BasicTypes.Error error = message.getErrorResponse().getError();
@@ -231,13 +240,13 @@ public class FunctionExecutionIntegrationTest {
     FunctionService.registerFunction(testFunction);
     final ClientProtocol.Message responseMessage = 
authenticateAndSendMessage();
 
-    final FunctionAPI.ExecuteFunctionOnRegionResponse 
executeFunctionOnRegionResponse =
+    final FunctionAPI.ExecuteFunctionOnMemberResponse 
executeFunctionOnMemberResponse =
         getFunctionResponse(responseMessage);
 
-    assertEquals(1, executeFunctionOnRegionResponse.getResultsCount());
+    assertEquals(1, executeFunctionOnMemberResponse.getResultsCount());
 
     final Object responseValue =
-        
serializationService.decode(executeFunctionOnRegionResponse.getResults(0));
+        
serializationService.decode(executeFunctionOnMemberResponse.getResults(0));
     assertNull(responseValue);
   }
 
@@ -247,43 +256,18 @@ public class FunctionExecutionIntegrationTest {
         new TestFunction<>(functionContext -> functionContext.getArguments(), 
true);
     FunctionService.registerFunction(testFunction);
     ClientProtocol.Message.Builder message = createRequestMessageBuilder(
-        
FunctionAPI.ExecuteFunctionOnRegionRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
-            
.setRegion(TEST_REGION).setArguments(serializationService.encode("hello")));
+        
FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            
.addMemberName(SERVER_NAME).setArguments(serializationService.encode("hello")));
 
     authenticateWithServer();
     final ClientProtocol.Message responseMessage = 
writeMessage(message.build());
 
-    FunctionAPI.ExecuteFunctionOnRegionResponse response = 
getFunctionResponse(responseMessage);
+    FunctionAPI.ExecuteFunctionOnMemberResponse response = 
getFunctionResponse(responseMessage);
 
     assertEquals("hello", serializationService.decode(response.getResults(0)));
   }
 
   @Test
-  public void filterIsPassedToFunction() throws Exception {
-    final TestFunction<Object> testFunction = new TestFunction<>(context -> 
"result", true);
-    FunctionService.registerFunction(testFunction);
-    Set<Object> expectedFilter = new HashSet<>(Arrays.asList("key1", "key2"));
-
-    final ClientProtocol.Message.Builder message = createRequestMessageBuilder(
-        
FunctionAPI.ExecuteFunctionOnRegionRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
-            
.setRegion(TEST_REGION).addKeyFilter(serializationService.encode("key1"))
-            .addKeyFilter(serializationService.encode("key2")));
-
-    authenticateWithServer();
-    final ClientProtocol.Message responseMessage = 
writeMessage(message.build());
-
-    FunctionAPI.ExecuteFunctionOnRegionResponse response = 
getFunctionResponse(responseMessage);
-    assertEquals("result", 
serializationService.decode(response.getResults(0)));
-
-    final RegionFunctionContext context = (RegionFunctionContext) 
testFunction.getContext();
-
-    final Set<?> filter = context.getFilter();
-
-    assertEquals(expectedFilter, filter);
-
-  }
-
-  @Test
   public void permissionsAreRequiredToExecute() throws IOException {
     final ResourcePermission requiredPermission = new ResourcePermission(
         ResourcePermission.Resource.DATA, ResourcePermission.Operation.MANAGE);
@@ -308,12 +292,12 @@ public class FunctionExecutionIntegrationTest {
     verify(securityManager).authorize(eq(SECURITY_PRINCIPAL), 
eq(requiredPermission));
   }
 
-  private FunctionAPI.ExecuteFunctionOnRegionResponse getFunctionResponse(
+  private FunctionAPI.ExecuteFunctionOnMemberResponse getFunctionResponse(
       ClientProtocol.Message responseMessage) {
     assertEquals(responseMessage.toString(),
-        ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONREGIONRESPONSE,
+        ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONMEMBERRESPONSE,
         responseMessage.getMessageTypeCase());
-    return responseMessage.getExecuteFunctionOnRegionResponse();
+    return responseMessage.getExecuteFunctionOnMemberResponse();
   }
 
   private void authenticateWithServer() throws IOException {
@@ -327,20 +311,21 @@ public class FunctionExecutionIntegrationTest {
         response.getAuthenticationResponse().getAuthenticated());
   }
 
+
   private ClientProtocol.Message authenticateAndSendMessage() throws 
IOException {
     authenticateWithServer();
 
     final ClientProtocol.Message request =
-        
createRequestMessageBuilder(FunctionAPI.ExecuteFunctionOnRegionRequest.newBuilder()
-            .setFunctionID(TEST_FUNCTION_ID).setRegion(TEST_REGION)).build();
+        
createRequestMessageBuilder(FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder()
+            
.setFunctionID(TEST_FUNCTION_ID).addMemberName(SERVER_NAME)).build();
 
     return writeMessage(request);
   }
 
 
   private ClientProtocol.Message.Builder createRequestMessageBuilder(
-      FunctionAPI.ExecuteFunctionOnRegionRequest.Builder functionRequest) {
-    return 
ClientProtocol.Message.newBuilder().setExecuteFunctionOnRegionRequest(functionRequest);
+      FunctionAPI.ExecuteFunctionOnMemberRequest.Builder functionRequest) {
+    return 
ClientProtocol.Message.newBuilder().setExecuteFunctionOnMemberRequest(functionRequest);
   }
 
   private ClientProtocol.Message writeMessage(ClientProtocol.Message request) 
throws IOException {
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/FunctionExecutionIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnRegionIntegrationTest.java
similarity index 98%
rename from 
geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/FunctionExecutionIntegrationTest.java
rename to 
geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnRegionIntegrationTest.java
index 06611d4..066dd90 100644
--- 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/FunctionExecutionIntegrationTest.java
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnRegionIntegrationTest.java
@@ -37,7 +37,9 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Cache;
@@ -59,7 +61,7 @@ import org.apache.geode.security.SecurityManager;
 import org.apache.geode.test.junit.categories.IntegrationTest;
 
 @Category(IntegrationTest.class)
-public class FunctionExecutionIntegrationTest {
+public class ExecuteFunctionOnRegionIntegrationTest {
   private static final String TEST_REGION = "testRegion";
   private static final String TEST_FUNCTION_ID = "testFunction";
   public static final String SECURITY_PRINCIPAL = "principle";
@@ -68,6 +70,9 @@ public class FunctionExecutionIntegrationTest {
   private Cache cache;
   private SecurityManager securityManager;
 
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
   @Before
   public void setUp() throws Exception {
     CacheFactory cacheFactory = new CacheFactory(new Properties());
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest.java
new file mode 100644
index 0000000..59016df
--- /dev/null
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.AUTHORIZATION_FAILED;
+import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.INVALID_REQUEST;
+import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.NO_AVAILABLE_SERVER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import 
org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.Failure;
+import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.Result;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.management.internal.security.ResourcePermissions;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ExecuteFunctionOnMemberRequestOperationHandlerJUnitTest {
+  private static final String TEST_MEMBER1 = "member1";
+  private static final String TEST_MEMBER2 = "member2";
+  private static final String TEST_FUNCTION_ID = "testFunction";
+  public static final String NOT_A_MEMBER = "notAMember";
+  private InternalCache cacheStub;
+  private DistributionManager distributionManager;
+  private ExecuteFunctionOnMemberRequestOperationHandler operationHandler;
+  private ProtobufSerializationService serializationService;
+  private TestFunction function;
+
+  private static class TestFunction implements Function {
+    // non-null iff function has been executed.
+    private AtomicReference<FunctionContext> context = new AtomicReference<>();
+
+    @Override
+    public String getId() {
+      return TEST_FUNCTION_ID;
+    }
+
+    @Override
+    public void execute(FunctionContext context) {
+      this.context.set(context);
+      context.getResultSender().lastResult("result");
+    }
+
+    FunctionContext getContext() {
+      return context.get();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    cacheStub = mock(InternalCache.class);
+    serializationService = new ProtobufSerializationService();
+    
when(cacheStub.getSecurityService()).thenReturn(mock(SecurityService.class));
+
+    distributionManager = mock(DistributionManager.class);
+    when(cacheStub.getDistributionManager()).thenReturn(distributionManager);
+
+
+    operationHandler = new ExecuteFunctionOnMemberRequestOperationHandler();
+
+    function = new TestFunction();
+    FunctionService.registerFunction(function);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FunctionService.unregisterFunction(TEST_FUNCTION_ID);
+  }
+
+  @Test
+  public void failsOnUnknownMember() throws Exception {
+    final FunctionAPI.ExecuteFunctionOnMemberRequest request =
+        
FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .addMemberName(NOT_A_MEMBER).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    assertTrue(result instanceof Failure);
+    assertEquals(NO_AVAILABLE_SERVER, 
result.getErrorMessage().getError().getErrorCode());
+  }
+
+  @Test
+  public void failsIfNoMemberSpecified() throws Exception {
+    final FunctionAPI.ExecuteFunctionOnMemberRequest request =
+        
FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    assertTrue(result instanceof Failure);
+    assertEquals(NO_AVAILABLE_SERVER, 
result.getErrorMessage().getError().getErrorCode());
+  }
+
+  @Test(expected = DistributedSystemDisconnectedException.class)
+  public void succeedsWithValidMembers() throws Exception {
+    when(distributionManager.getMemberWithName(any(String.class))).thenReturn(
+        new InternalDistributedMember("localhost", 0),
+        new InternalDistributedMember("localhost", 1), null);
+
+    final FunctionAPI.ExecuteFunctionOnMemberRequest request =
+        
FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .addMemberName(TEST_MEMBER1).addMemberName(TEST_MEMBER2).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    // unfortunately FunctionService fishes for a DistributedSystem and throws 
an exception
+    // if it can't find one. It uses a static method on 
InternalDistributedSystem, so no
+    // mocking is possible. If the test throws 
DistributedSystemDisconnectedException it
+    // means that the operation handler got to the point of trying get an 
execution
+    // context
+  }
+
+  @Test
+  public void requiresPermissions() throws Exception {
+    final SecurityService securityService = mock(SecurityService.class);
+    doThrow(new NotAuthorizedException("we should catch 
this")).when(securityService)
+        .authorize(ResourcePermissions.DATA_WRITE);
+    when(cacheStub.getSecurityService()).thenReturn(securityService);
+
+    final FunctionAPI.ExecuteFunctionOnMemberRequest request =
+        
FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .addMemberName(TEST_MEMBER1).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    assertTrue(result instanceof Failure);
+
+    assertEquals(AUTHORIZATION_FAILED, 
result.getErrorMessage().getError().getErrorCode());
+
+  }
+
+  @Test
+  public void functionNotFound() throws Exception {
+    final FunctionAPI.ExecuteFunctionOnMemberRequest request =
+        FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder()
+            .setFunctionID("I am not a function, I am a 
human").addMemberName(TEST_MEMBER1).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    final ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
+
+    assertEquals(INVALID_REQUEST, errorMessage.getError().getErrorCode());
+  }
+
+  private ServerMessageExecutionContext mockedMessageExecutionContext() {
+    return new ServerMessageExecutionContext(cacheStub, 
mock(ProtobufClientStatistics.class), null);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
bschucha...@apache.org.

Reply via email to