GEODE-1053: Adding "filter" on Function Rest Invoction Refactoring of RestAPIOnRegionFunctionExecutionDUnitTest.java RestAPIsOnGroupsFunctionExecutionDUnitTest.java RestAPIsOnMembersFunctionExecutionDUnitTest.java. Updating dependency-versions.properties http-core and http-client
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f2175524 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f2175524 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f2175524 Branch: refs/heads/feature/GEODE-17-3 Commit: f2175524491fcab3206b718d6de0164d4fed8906 Parents: 22ab270 Author: Udo Kohlmeyer <ukohlme...@pivotal.io> Authored: Wed Mar 9 19:58:06 2016 +1100 Committer: Udo Kohlmeyer <ukohlme...@pivotal.io> Committed: Wed Mar 16 05:53:57 2016 +1100 ---------------------------------------------------------------------- .../rest/internal/web/RestFunctionTemplate.java | 23 + ...stAPIOnRegionFunctionExecutionDUnitTest.java | 488 +++++-------------- .../web/controllers/RestAPITestBase.java | 182 +++++-- ...tAPIsOnGroupsFunctionExecutionDUnitTest.java | 334 ++++--------- ...APIsOnMembersFunctionExecutionDUnitTest.java | 314 +++--------- .../controllers/FunctionAccessController.java | 195 ++++---- .../rest/internal/web/util/ArrayUtils.java | 12 +- gradle/dependency-versions.properties | 4 +- 8 files changed, 554 insertions(+), 998 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/RestFunctionTemplate.java ---------------------------------------------------------------------- diff --git a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/RestFunctionTemplate.java b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/RestFunctionTemplate.java new file mode 100644 index 0000000..8cd0638 --- /dev/null +++ b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/RestFunctionTemplate.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.rest.internal.web; + +import com.gemstone.gemfire.cache.execute.FunctionAdapter; + +public abstract class RestFunctionTemplate extends FunctionAdapter { + public int invocationCount = 0; +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIOnRegionFunctionExecutionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIOnRegionFunctionExecutionDUnitTest.java b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIOnRegionFunctionExecutionDUnitTest.java index 4a958ce..63bd9fa 100644 --- a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIOnRegionFunctionExecutionDUnitTest.java +++ b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIOnRegionFunctionExecutionDUnitTest.java @@ -16,28 +16,8 @@ */ package com.gemstone.gemfire.rest.internal.web.controllers; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; - -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; - import com.gemstone.gemfire.LogWriter; -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.Region; -import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.cache.Scope; -import com.gemstone.gemfire.cache.execute.Function; +import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.cache.execute.FunctionContext; import com.gemstone.gemfire.cache.execute.FunctionService; import com.gemstone.gemfire.cache.execute.RegionFunctionContext; @@ -47,44 +27,24 @@ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.internal.cache.PartitionAttributesImpl; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegionTestHelper; -import com.gemstone.gemfire.internal.cache.functions.DistributedRegionFunction; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.IgnoredException; -import com.gemstone.gemfire.test.dunit.SerializableCallable; -import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.rest.internal.web.RestFunctionTemplate; +import org.apache.http.client.methods.CloseableHttpResponse; + +import java.io.Serializable; +import java.util.*; /** * Dunit Test to validate OnRegion function execution with REST APIs - * + * * @author Nilkanth Patel * @since 8.0 */ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { - private static final long serialVersionUID = 1L; - - public static final String REGION_NAME = "DistributedRegionFunctionExecutionDUnitTest"; - - public static final String PR_REGION_NAME = "samplePRRegion"; - - public static Region region = null; - - public static List<String> restURLs = new ArrayList<String>(); - - public static String restEndPoint = null; - - public static String getRestEndPoint() { - return restEndPoint; - } - - public static void setRestEndPoint(String restEndPoint) { - RestAPIOnRegionFunctionExecutionDUnitTest.restEndPoint = restEndPoint; - } - - public static final Function function = new DistributedRegionFunction(); + private final String REPLICATE_REGION_NAME = "sampleRRegion"; - public static final Function functionWithNoResultThrowsException = new MyFunctionException(); + private final String PR_REGION_NAME = "samplePRRegion"; public RestAPIOnRegionFunctionExecutionDUnitTest(String name) { super(name); @@ -92,52 +52,10 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { public void setUp() throws Exception { super.setUp(); - final Host host = Host.getHost(0); - } - - static class FunctionWithNoLastResult implements Function { - private static final long serialVersionUID = -1032915440862585532L; - public static final String Id = "FunctionWithNoLastResult"; - public static int invocationCount; - - @Override - public void execute(FunctionContext context) { - invocationCount++; - InternalDistributedSystem - .getConnectedInstance() - .getLogWriter() - .info( - "<ExpectedException action=add>did not send last result" - + "</ExpectedException>"); - context.getResultSender().sendResult( - (Serializable) context.getArguments()); - } - - @Override - public String getId() { - return Id; - } - - @Override - public boolean hasResult() { - return true; - } - - @Override - public boolean optimizeForWrite() { - return false; - } - - @Override - public boolean isHA() { - return false; - } } - static class SampleFunction implements Function { - private static final long serialVersionUID = -1032915440862585534L; + private class SampleFunction extends RestFunctionTemplate { public static final String Id = "SampleFunction"; - public static int invocationCount; @Override public void execute(FunctionContext context) { @@ -145,7 +63,7 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { if (context instanceof RegionFunctionContext) { RegionFunctionContext rfContext = (RegionFunctionContext) context; rfContext.getDataSet().getCache().getLogger() - .info("Executing function : TestFunction2.execute " + rfContext); + .info("Executing function : SampleFunction.execute(hasResult=true) with filter: " + rfContext.getFilter() + " " + rfContext); if (rfContext.getArguments() instanceof Boolean) { /* return rfContext.getArguments(); */ if (hasResult()) { @@ -157,7 +75,7 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { .getCache() .getLogger() .info( - "Executing function : TestFunction2.execute " + rfContext); + "Executing function : SampleFunction.execute(hasResult=false) " + rfContext); while (true && !rfContext.getDataSet().isDestroyed()) { rfContext.getDataSet().getCache().getLogger() .info("For Bug43513 "); @@ -172,7 +90,7 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { } else if (rfContext.getArguments() instanceof String) { String key = (String) rfContext.getArguments(); if (key.equals("TestingTimeOut")) { // for test - // PRFunctionExecutionDUnitTest#testRemoteMultiKeyExecution_timeout + // PRFunctionExecutionDUnitTest#testRemoteMultiKeyExecution_timeout try { Thread.sleep(2000); } catch (InterruptedException e) { @@ -208,7 +126,7 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { /* return vals; */ } else if (rfContext.getArguments() instanceof HashMap) { HashMap putData = (HashMap) rfContext.getArguments(); - for (Iterator i = putData.entrySet().iterator(); i.hasNext();) { + for (Iterator i = putData.entrySet().iterator(); i.hasNext(); ) { Map.Entry me = (Map.Entry) i.next(); rfContext.getDataSet().put(me.getKey(), me.getValue()); } @@ -222,7 +140,7 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { } else { DistributedSystem ds = InternalDistributedSystem.getAnyInstance(); LogWriter logger = ds.getLogWriter(); - logger.info("Executing in TestFunction on Server : " + logger.info("Executing in SampleFunction on Server : " + ds.getDistributedMember() + "with Context : " + context); while (ds.isConnected()) { logger @@ -249,7 +167,7 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { @Override public boolean optimizeForWrite() { - return false; + return true; } @Override @@ -258,54 +176,22 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { } } - private int getInvocationCount(VM vm) { - return (Integer) vm.invoke(new SerializableCallable() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Object call() throws Exception { - SampleFunction f = (SampleFunction) FunctionService - .getFunction(SampleFunction.Id); - int count = f.invocationCount; - f.invocationCount = 0; - return count; - } - }); - } - - private void verifyAndResetInvocationCount(VM vm, final int count) { - vm.invoke(new SerializableCallable() { - /** - * - */ - private static final long serialVersionUID = 1L; - - @Override - public Object call() throws Exception { - SampleFunction f = (SampleFunction) FunctionService - .getFunction(SampleFunction.Id); - assertEquals(count, f.invocationCount); - // assert succeeded, reset count - f.invocationCount = 0; - return null; - } - }); + private void verifyAndResetInvocationCount(final int count) { + SampleFunction f = (SampleFunction) FunctionService + .getFunction(SampleFunction.Id); + assertEquals(count, f.invocationCount); } - public static void createPeer(DataPolicy policy) { + private void createPeer(DataPolicy policy) { AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK); factory.setDataPolicy(policy); - assertNotNull(cache); - region = cache.createRegion(REGION_NAME, factory.create()); + Region region = CacheFactory.getAnyInstance().createRegion(REPLICATE_REGION_NAME, factory.create()); com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Region Created :" + region); assertNotNull(region); } - public static boolean createPeerWithPR() { + private boolean createPeerWithPR() { RegionAttributes ra = PartitionedRegionTestHelper.createRegionAttrsForPR(0, 10); AttributesFactory raf = new AttributesFactory(ra); @@ -314,41 +200,19 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { pa.setTotalNumBuckets(17); raf.setPartitionAttributes(pa); - if (cache == null || cache.isClosed()) { - // Cache not available - } - assertNotNull(cache); - - region = cache.createRegion(PR_REGION_NAME, raf.create()); + Region region = CacheFactory.getAnyInstance().createRegion(PR_REGION_NAME, raf.create()); com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Region Created :" + region); assertNotNull(region); return Boolean.TRUE; } - public static void populateRegion() { - assertNotNull(cache); - region = cache.getRegion(REGION_NAME); - assertNotNull(region); - for (int i = 1; i <= 200; i++) { - region.put("execKey-" + i, new Integer(i)); - } - } - - public static void populatePRRegion() { - assertNotNull(cache); - region = cache.getRegion(REGION_NAME); - - PartitionedRegion pr = (PartitionedRegion) cache.getRegion(PR_REGION_NAME); + private void populatePRRegion() { + PartitionedRegion pr = (PartitionedRegion) CacheFactory.getAnyInstance().getRegion(PR_REGION_NAME); DistributedSystem.setThreadsSocketPolicy(false); - final HashSet testKeys = new HashSet(); for (int i = (pr.getTotalNumberOfBuckets() * 3); i > 0; i--) { - testKeys.add("execKey-" + i); - } - int j = 0; - for (Iterator i = testKeys.iterator(); i.hasNext();) { - Integer val = new Integer(j++); - pr.put(i.next(), val); + Integer val = new Integer(i + 1); + pr.put("execKey-" + i, val); } // Assert there is data in each bucket for (int bid = 0; bid < pr.getTotalNumberOfBuckets(); bid++) { @@ -356,9 +220,8 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { } } - public static void populateRRRegion() { - assertNotNull(cache); - region = cache.getRegion(REGION_NAME); + private void populateRRRegion() { + Region region = CacheFactory.getAnyInstance().getRegion(REPLICATE_REGION_NAME); assertNotNull(region); final HashSet testKeys = new HashSet(); @@ -366,163 +229,48 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { testKeys.add("execKey-" + i); } int j = 0; - for (Iterator i = testKeys.iterator(); i.hasNext();) { + for (Iterator i = testKeys.iterator(); i.hasNext(); ) { Integer val = new Integer(j++); region.put(i.next(), val); } } - public static void executeFunction_NoLastResult(String regionName) { - - try { - CloseableHttpClient httpclient = HttpClients.createDefault(); - CloseableHttpResponse response = null; - Random randomGenerator = new Random(); - int index = randomGenerator.nextInt(restURLs.size()); - HttpPost post = new HttpPost(restURLs.get(index) + "/functions/" - + "FunctionWithNoLastResult" + "?onRegion=" + regionName); - post.addHeader("Content-Type", "application/json"); - post.addHeader("Accept", "application/json"); - response = httpclient.execute(post); - } catch (Exception e) { - throw new RuntimeException("unexpected exception", e); - } - - } - - public static void executeFunctionThroughRestCall(String regionName) { - com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Entering executeFunctionThroughRestCall"); - try { - CloseableHttpClient httpclient = HttpClients.createDefault(); - CloseableHttpResponse response = null; - Random randomGenerator = new Random(); - int index = randomGenerator.nextInt(restURLs.size()); - - HttpPost post = new HttpPost(restURLs.get(index) + "/functions/" - + "SampleFunction" + "?onRegion=" + regionName); - post.addHeader("Content-Type", "application/json"); - post.addHeader("Accept", "application/json"); - - com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Request: POST " + post.toString()); - response = httpclient.execute(post); - com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Response: POST " + response.toString()); - - assertEquals(response.getStatusLine().getStatusCode(), 200); - assertNotNull(response.getEntity()); - } catch (Exception e) { - throw new RuntimeException("unexpected exception", e); - } - com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("Exiting executeFunctionThroughRestCall"); - - } - - private void registerFunction(VM vm) { - vm.invoke(new SerializableCallable() { - private static final long serialVersionUID = 1L; - @Override - public Object call() throws Exception { - FunctionService.registerFunction(new FunctionWithNoLastResult()); - return null; - } - }); - } - - private void registerSampleFunction(VM vm) { - vm.invoke(new SerializableCallable() { - private static final long serialVersionUID = 1L; - - @Override - public Object call() throws Exception { - FunctionService.registerFunction(new SampleFunction()); - return null; - } - }); + @Override + protected String getFunctionID() { + return SampleFunction.Id; } - public void __testOnRegionExecutionOnDataPolicyEmpty_NoLastResult() { - // Step-1 : create cache on each VM, this will start HTTP service in - // embedded mode and deploy REST APIs web app on it. - - fail("This test is trying to invoke non existent methods"); -// String url1 = (String) vm3.invoke(() -> createCacheInVm( vm3 )); -// restURLs.add(url1); -// -// String url2 = (String) vm0.invoke(() -> createCacheInVm( vm0 )); -// restURLs.add(url2); -// -// String url3 = (String) vm1.invoke(() -> createCacheInVm( vm1 )); -// restURLs.add(url3); -// -// String url4 = (String) vm2.invoke(() -> createCacheInVm( vm2 )); -// restURLs.add(url4); - - // Step-2: Register function in all VMs - registerFunction(vm3); - registerFunction(vm0); - registerFunction(vm1); - registerFunction(vm2); - - // Step-3: Create and configure Region on all VMs - vm3.invoke(() -> createPeer( DataPolicy.EMPTY )); - vm0.invoke(() -> createPeer( DataPolicy.REPLICATE )); - vm1.invoke(() -> createPeer( DataPolicy.REPLICATE )); - vm2.invoke(() -> createPeer( DataPolicy.REPLICATE )); - - // Step-4 : Do some puts on region created earlier - vm3.invoke(() -> populateRegion()); - - // add expected exception to avoid suspect strings - final IgnoredException ex = IgnoredException.addIgnoredException("did not send last result"); - - // Step-5 : Execute function randomly (in iteration) on all available (per - // VM) REST end-points and verify its result - for (int i = 0; i < 10; i++) { - executeFunction_NoLastResult(REGION_NAME); - } - ex.remove(); + private void createCacheAndRegisterFunction() { + restURLs.add(vm0.invoke(() -> createCacheWithGroups(vm0, null))); + restURLs.add(vm1.invoke(() -> createCacheWithGroups(vm1, null))); + restURLs.add(vm2.invoke(() -> createCacheWithGroups(vm2, null))); + restURLs.add(vm3.invoke(() -> createCacheWithGroups(vm3, null))); - restURLs.clear(); + vm0.invoke(() -> FunctionService.registerFunction(new SampleFunction())); + vm1.invoke(() -> FunctionService.registerFunction(new SampleFunction())); + vm2.invoke(() -> FunctionService.registerFunction(new SampleFunction())); + vm3.invoke(() -> FunctionService.registerFunction(new SampleFunction())); } - public void testOnRegionExecutionWithRR() { - // Step-1 : create cache on each VM, this will start HTTP service in - // embedded mode and deploy REST APIs web app on it. - // - String url1 = (String) vm3.invoke(() -> RestAPITestBase.createCache( vm3 )); - restURLs.add(url1); + public void testOnRegionExecutionWithReplicateRegion() { + createCacheAndRegisterFunction(); - String url2 = (String) vm0.invoke(() -> RestAPITestBase.createCache( vm0 )); - restURLs.add(url2); + vm3.invoke(() -> createPeer(DataPolicy.EMPTY)); + vm0.invoke(() -> createPeer(DataPolicy.REPLICATE)); + vm1.invoke(() -> createPeer(DataPolicy.REPLICATE)); + vm2.invoke(() -> createPeer(DataPolicy.REPLICATE)); - String url3 = (String) vm1.invoke(() -> RestAPITestBase.createCache( vm1 )); - restURLs.add(url3); - - String url4 = (String) vm2.invoke(() -> RestAPITestBase.createCache( vm2 )); - restURLs.add(url4); - - // Step-2: Register function in all VMs - registerSampleFunction(vm3); - registerSampleFunction(vm0); - registerSampleFunction(vm1); - registerSampleFunction(vm2); - - // Step-3: Create and configure PR on all VMs - vm3.invoke(() -> createPeer( DataPolicy.EMPTY )); - vm0.invoke(() -> createPeer( DataPolicy.REPLICATE )); - vm1.invoke(() -> createPeer( DataPolicy.REPLICATE )); - vm2.invoke(() -> createPeer( DataPolicy.REPLICATE )); - - // Step-4 : Do some puts in Replicated region on vm3 vm3.invoke(() -> populateRRRegion()); - // Step-5 : Execute function randomly (in iteration) on all available (per - // VM) REST end-points and verify its result - executeFunctionThroughRestCall(REGION_NAME); - int c0 = getInvocationCount(vm0); - int c1 = getInvocationCount(vm1); - int c2 = getInvocationCount(vm2); - int c3 = getInvocationCount(vm3); + CloseableHttpResponse response = executeFunctionThroughRestCall("SampleFunction", REPLICATE_REGION_NAME, null, null, null, null); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertNotNull(response.getEntity()); + + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + int c3 = vm3.invoke(() -> getInvocationCount()); assertEquals(1, c0 + c1 + c2 + c3); @@ -530,88 +278,84 @@ public class RestAPIOnRegionFunctionExecutionDUnitTest extends RestAPITestBase { restURLs.clear(); } - public void testOnRegionExecutionWithPR() throws Exception { - final String rName = getUniqueName(); - - // Step-1 : create cache on each VM, this will start HTTP service in - // embedded mode and deploy REST APIs web app on it. - String url1 = (String) vm3.invoke(() -> RestAPITestBase.createCache( vm3 )); - restURLs.add(url1); - - String url2 = (String) vm0.invoke(() -> RestAPITestBase.createCache( vm0 )); - restURLs.add(url2); + public void testOnRegionExecutionWithPartitionRegion() throws Exception { + createCacheAndRegisterFunction(); - String url3 = (String) vm1.invoke(() -> RestAPITestBase.createCache( vm1 )); - restURLs.add(url3); - - String url4 = (String) vm2.invoke(() -> RestAPITestBase.createCache( vm2 )); - restURLs.add(url4); - - // Step-2: Register function in all VMs - registerSampleFunction(vm3); - registerSampleFunction(vm0); - registerSampleFunction(vm1); - registerSampleFunction(vm2); - - // Step-3: Create and configure PR on all VMs - vm3.invoke(() -> createPeerWithPR()); vm0.invoke(() -> createPeerWithPR()); vm1.invoke(() -> createPeerWithPR()); vm2.invoke(() -> createPeerWithPR()); + vm3.invoke(() -> createPeerWithPR()); - // Step-4: Do some puts such that data exist in each bucket vm3.invoke(() -> populatePRRegion()); - // Step-5 : Execute function randomly (in iteration) on all available (per - // VM) REST end-points and verify its result - executeFunctionThroughRestCall(PR_REGION_NAME); - - // Assert that each node has executed the function once. - verifyAndResetInvocationCount(vm0, 1); - verifyAndResetInvocationCount(vm1, 1); - verifyAndResetInvocationCount(vm2, 1); - verifyAndResetInvocationCount(vm3, 1); + CloseableHttpResponse response = executeFunctionThroughRestCall("SampleFunction", PR_REGION_NAME, null, null, null, null); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertNotNull(response.getEntity()); - int c0 = getInvocationCount(vm0); - int c1 = getInvocationCount(vm1); - int c2 = getInvocationCount(vm2); - int c3 = getInvocationCount(vm3); + vm0.invoke(() -> verifyAndResetInvocationCount(1)); + vm1.invoke(() -> verifyAndResetInvocationCount(1)); + vm2.invoke(() -> verifyAndResetInvocationCount(1)); + vm3.invoke(() -> verifyAndResetInvocationCount(1)); restURLs.clear(); } -} + public void testOnRegionWithFilterExecutionWithPartitionRegion() throws Exception { + createCacheAndRegisterFunction(); -class MyFunctionException implements Function { + vm0.invoke(() -> createPeerWithPR()); + vm1.invoke(() -> createPeerWithPR()); + vm2.invoke(() -> createPeerWithPR()); + vm3.invoke(() -> createPeerWithPR()); - /** - * - */ - private static final long serialVersionUID = 1L; + vm3.invoke(() -> populatePRRegion()); - @Override - public void execute(FunctionContext context) { - throw new RuntimeException("failure"); - } + CloseableHttpResponse response = executeFunctionThroughRestCall("SampleFunction", PR_REGION_NAME, "key2", null, null, null); + assertEquals(200, response.getStatusLine().getStatusCode()); + assertNotNull(response.getEntity()); - @Override - public String getId() { - return this.getClass().getName(); - } + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + int c3 = vm3.invoke(() -> getInvocationCount()); - @Override - public boolean hasResult() { - return true; - } + assertEquals(1, (c0 + c1 + c2 + c3)); - @Override - public boolean isHA() { - return false; + restURLs.clear(); } - @Override - public boolean optimizeForWrite() { - return false; - } +// public void testOnRegionWithFilterExecutionWithPartitionRegionJsonArgs() throws Exception { +// createCacheAndRegisterFunction(); +// +// vm0.invoke(() -> createPeerWithPR()); +// vm1.invoke(() -> createPeerWithPR()); +// vm2.invoke(() -> createPeerWithPR()); +// vm3.invoke(() -> createPeerWithPR()); +// +// vm3.invoke(() -> populatePRRegion()); +// +// String jsonBody = "[" +// + "{\"@type\": \"double\",\"@value\": 210}" +// + ",{\"@type\":\"com.gemstone.gemfire.web.rest.domain.Item\"," +// + "\"itemNo\":\"599\",\"description\":\"Part X Free on Bumper Offer\"," +// + "\"quantity\":\"2\"," +// + "\"unitprice\":\"5\"," +// + "\"totalprice\":\"10.00\"}" +// + "]"; +// +// CloseableHttpResponse response = executeFunctionThroughRestCall("SampleFunction", PR_REGION_NAME, null, jsonBody, null, null); +// assertEquals(200, response.getStatusLine().getStatusCode()); +// assertNotNull(response.getEntity()); +// +// // Assert that only 1 node has executed the function. +// int c0 = vm0.invoke(() -> getInvocationCount()); +// int c1 = vm1.invoke(() -> getInvocationCount()); +// int c2 = vm2.invoke(() -> getInvocationCount()); +// int c3 = vm3.invoke(() -> getInvocationCount()); +// +// assertEquals(1, (c0 + c1 + c2 + c3)); +// +// restURLs.clear(); +// } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPITestBase.java ---------------------------------------------------------------------- diff --git a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPITestBase.java b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPITestBase.java index 3709475..0d1fee8 100644 --- a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPITestBase.java +++ b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPITestBase.java @@ -16,36 +16,48 @@ */ package com.gemstone.gemfire.rest.internal.web.controllers; -import java.util.Properties; - import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheClosedException; import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.RegionShortcut; +import com.gemstone.gemfire.cache.execute.FunctionService; import com.gemstone.gemfire.distributed.internal.DistributionConfig; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.GemFireVersion; +import com.gemstone.gemfire.internal.lang.StringUtils; import com.gemstone.gemfire.management.internal.AgentUtil; -import com.gemstone.gemfire.test.dunit.DistributedTestCase; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.Invoke; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.rest.internal.web.RestFunctionTemplate; +import com.gemstone.gemfire.test.dunit.*; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.json.JSONArray; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.Random; public class RestAPITestBase extends DistributedTestCase { - private static final long serialVersionUID = 1L; - public static Cache cache = null; + protected Cache cache = null; + protected List<String> restURLs = new ArrayList(); VM vm0 = null; VM vm1 = null; VM vm2 = null; VM vm3 = null; - + public RestAPITestBase(String name) { super(name); } - - + @Override public void setUp() throws Exception { super.setUp(); @@ -54,7 +66,7 @@ public class RestAPITestBase extends DistributedTestCase { if (agentUtil.findWarLocation("geode-web-api") == null) { fail("unable to locate geode-web-api WAR file"); } - Wait.pause(5000); + Wait.pause(1000); final Host host = Host.getHost(0); vm0 = host.getVM(0); vm1 = host.getVM(1); @@ -62,9 +74,9 @@ public class RestAPITestBase extends DistributedTestCase { vm3 = host.getVM(3); // gradle sets a property telling us where the build is located final String buildDir = System.getProperty("geode.build.dir", System.getProperty("user.dir")); - Invoke.invokeInEveryVM(()-> System.setProperty("geode.build.dir", buildDir)); - } - + Invoke.invokeInEveryVM(() -> System.setProperty("geode.build.dir", buildDir)); + } + /** * close the clients and teh servers */ @@ -78,56 +90,122 @@ public class RestAPITestBase extends DistributedTestCase { /** * close the cache - * */ - public static void closeCache() { + private void closeCache() { if (cache != null && !cache.isClosed()) { cache.close(); cache.getDistributedSystem().disconnect(); } } - - protected static String createCache(VM currentVM) { - - RestAPITestBase test = new RestAPITestBase(getTestMethodName()); - - final String hostName = currentVM.getHost().getHostName(); - final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - - Properties props = new Properties(); - - props.setProperty(DistributionConfig.START_DEV_REST_API_NAME, "true"); - props.setProperty(DistributionConfig.HTTP_SERVICE_BIND_ADDRESS_NAME, hostName); - props.setProperty(DistributionConfig.HTTP_SERVICE_PORT_NAME,String.valueOf(serverPort)); - - InternalDistributedSystem ds = test.getSystem(props); - cache = CacheFactory.create(ds); - return "http://" + hostName + ":" + serverPort + "/gemfire-api/v1"; - - } - - public static String createCacheWithGroups (VM vm, final String groups, final String regionName ) { + public String createCacheWithGroups(VM vm, final String groups) { RestAPITestBase test = new RestAPITestBase(getTestMethodName()); - - final String hostName = vm.getHost().getHostName(); + + final String hostName = vm.getHost().getHostName(); final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - + Properties props = new Properties(); - - if(groups != null) { + + if (groups != null) { props.put("groups", groups); } - + props.setProperty(DistributionConfig.START_DEV_REST_API_NAME, "true"); props.setProperty(DistributionConfig.HTTP_SERVICE_BIND_ADDRESS_NAME, hostName); props.setProperty(DistributionConfig.HTTP_SERVICE_PORT_NAME, String.valueOf(serverPort)); - + InternalDistributedSystem ds = test.getSystem(props); cache = CacheFactory.create(ds); - - String restEndPoint = "http://" + hostName + ":" + serverPort + "/gemfire-api/v1"; - return restEndPoint; + + String restEndPoint = "http://" + hostName + ":" + serverPort + "/gemfire-api/v1"; + return restEndPoint; + } + + protected int getInvocationCount() { + RestFunctionTemplate function = (RestFunctionTemplate) FunctionService.getFunction(getFunctionID()); + return function.invocationCount; + } + + protected CloseableHttpResponse executeFunctionThroughRestCall(String function, String regionName, String filter, String jsonBody, String groups, + String members) { + LogWriterUtils.getLogWriter().info("Entering executeFunctionThroughRestCall"); + try { + CloseableHttpClient httpclient = HttpClients.createDefault(); + Random randomGenerator = new Random(); + int restURLIndex = randomGenerator.nextInt(restURLs.size()); + + HttpPost post = createHTTPPost(function, regionName, filter, restURLIndex, groups, members, jsonBody); + + LogWriterUtils.getLogWriter().info("Request: POST " + post.toString()); + return httpclient.execute(post); + } catch (Exception e) { + throw new RuntimeException("unexpected exception", e); + } } - + + private HttpPost createHTTPPost(String function, String regionName, String filter, int restUrlIndex, String groups, String members, String jsonBody) { + StringBuilder restURLBuilder = new StringBuilder(); + restURLBuilder.append(restURLs.get(restUrlIndex) + "/functions/" + function+"?"); + if (regionName != null && !regionName.isEmpty()) { + restURLBuilder.append("onRegion=" + regionName); + } + else if (groups != null && !groups.isEmpty()) { + restURLBuilder.append("onGroups=" + groups); + } + else if (members != null && !members.isEmpty()) { + restURLBuilder.append("onMembers=" + members); + } + if (filter != null && !filter.isEmpty()) { + restURLBuilder.append("&filter=" + filter); + } + String restString = restURLBuilder.toString(); + HttpPost post = new HttpPost(restString); + post.addHeader("Content-Type", "application/json"); + post.addHeader("Accept", "application/json"); + if (jsonBody != null && !StringUtils.isEmpty(jsonBody)) { + StringEntity jsonStringEntity = new StringEntity(jsonBody, ContentType.DEFAULT_TEXT); + post.setEntity(jsonStringEntity); + } + return post; + } + + protected String getFunctionID() { + throw new RuntimeException("This method should be overridden"); + } + + protected void assertHttpResponse(CloseableHttpResponse response, int httpCode, int expectedServerResponses) { + assertEquals(httpCode, response.getStatusLine().getStatusCode()); + + //verify response has body flag, expected is true. + assertNotNull(response.getEntity()); + try { + String httpResponseString = processHttpResponse(response); + response.close(); + LogWriterUtils.getLogWriter().info("Response : " + httpResponseString); + //verify function execution result + JSONArray resultArray = new JSONArray(httpResponseString); + assertEquals(resultArray.length(), expectedServerResponses); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private String processHttpResponse(HttpResponse response) { + try { + HttpEntity entity = response.getEntity(); + InputStream content = entity.getContent(); + BufferedReader reader = new BufferedReader(new InputStreamReader( + content)); + String line; + StringBuffer sb = new StringBuffer(); + while ((line = reader.readLine()) != null) { + sb.append(line); + } + return sb.toString(); + } catch (IOException e) { + LogWriterUtils.getLogWriter().error("Error in processing Http Response", e); + } + return ""; + } + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnGroupsFunctionExecutionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnGroupsFunctionExecutionDUnitTest.java b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnGroupsFunctionExecutionDUnitTest.java index 1ae3810..5acaccb 100644 --- a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnGroupsFunctionExecutionDUnitTest.java +++ b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnGroupsFunctionExecutionDUnitTest.java @@ -16,62 +16,119 @@ */ package com.gemstone.gemfire.rest.internal.web.controllers; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Random; - -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.json.JSONArray; - -import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionContext; import com.gemstone.gemfire.cache.execute.FunctionService; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.IgnoredException; +import com.gemstone.gemfire.rest.internal.web.RestFunctionTemplate; import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableCallable; -import com.gemstone.gemfire.test.dunit.VM; +import org.apache.http.client.methods.CloseableHttpResponse; + +import java.util.ArrayList; +import java.util.Collections; public class RestAPIsOnGroupsFunctionExecutionDUnitTest extends RestAPITestBase { public RestAPIsOnGroupsFunctionExecutionDUnitTest(String name) { super(name); } - + public void setUp() throws Exception { super.setUp(); - final Host host = Host.getHost(0); } - - private void registerFunction(VM vm) { - vm.invoke(new SerializableCallable() { - private static final long serialVersionUID = 1L; - - @Override - public Object call() throws Exception { - FunctionService.registerFunction(new OnGroupsFunction()); - return null; - } - }); + + @Override + protected String getFunctionID() { + return OnGroupsFunction.Id; + } + + private void resetInvocationCount() { + OnGroupsFunction f = (OnGroupsFunction) FunctionService.getFunction(OnGroupsFunction.Id); + f.invocationCount = 0; + } + + public void testonGroupsExecutionOnAllMembers() { + setupCacheWithGroupsAndFunction(); + + for (int i = 0; i < 10; i++) { + CloseableHttpResponse response = executeFunctionThroughRestCall("OnGroupsFunction", null, null, null, "g0,g1", null); + assertHttpResponse(response, 200, 3); + } + + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + + assertEquals(30, (c0 + c1 + c2)); + + restURLs.clear(); + } + + private void setupCacheWithGroupsAndFunction() { + restURLs.add(vm0.invoke(() -> createCacheWithGroups(vm0, "g0,gm"))); + restURLs.add(vm1.invoke(() -> createCacheWithGroups(vm1, "g1"))); + restURLs.add(vm2.invoke(() -> createCacheWithGroups(vm2, "g0,g1"))); + + vm0.invoke(() -> FunctionService.registerFunction(new OnGroupsFunction())); + vm1.invoke(() -> FunctionService.registerFunction(new OnGroupsFunction())); + vm2.invoke(() -> FunctionService.registerFunction(new OnGroupsFunction())); } - - static class OnGroupsFunction implements Function { - private static final long serialVersionUID = -1032915440862585532L; + + public void testonGroupsExecutionOnAllMembersWithFilter() { + setupCacheWithGroupsAndFunction(); + + //Execute function randomly (in iteration) on all available (per VM) REST end-points and verify its result + for (int i = 0; i < 10; i++) { + CloseableHttpResponse response = executeFunctionThroughRestCall("OnGroupsFunction", null, "someKey", null, "g1", null); + assertHttpResponse(response, 500, 0); + } + + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + + assertEquals(0, (c0 + c1 + c2)); + restURLs.clear(); + } + + public void testBasicP2PFunctionSelectedGroup() { + setupCacheWithGroupsAndFunction(); + + //Step-3 : Execute function randomly (in iteration) on all available (per VM) REST end-points and verify its result + for (int i = 0; i < 5; i++) { + CloseableHttpResponse response = executeFunctionThroughRestCall("OnGroupsFunction", null, null, null, "no%20such%20group", null); + assertHttpResponse(response, 500, 0); + } + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + + assertEquals(0, (c0 + c1 + c2)); + + for (int i = 0; i < 5; i++) { + + CloseableHttpResponse response = executeFunctionThroughRestCall("OnGroupsFunction", null, null, null, "gm", null); + assertHttpResponse(response, 200, 1); + } + + c0 = vm0.invoke(() -> getInvocationCount()); + c1 = vm1.invoke(() -> getInvocationCount()); + c2 = vm2.invoke(() -> getInvocationCount()); + + assertEquals(5, (c0 + c1 + c2)); + + vm0.invoke(() -> resetInvocationCount()); + vm1.invoke(() -> resetInvocationCount()); + vm2.invoke(() -> resetInvocationCount()); + + restURLs.clear(); + } + + private class OnGroupsFunction extends RestFunctionTemplate { public static final String Id = "OnGroupsFunction"; - public static int invocationCount; @Override public void execute(FunctionContext context) { - LogWriterUtils.getLogWriter().fine("SWAP:1:executing OnGroupsFunction:"+invocationCount); + LogWriterUtils.getLogWriter().fine("SWAP:1:executing OnGroupsFunction:" + invocationCount); InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance(); invocationCount++; ArrayList<String> l = (ArrayList<String>) context.getArguments(); @@ -101,206 +158,5 @@ public class RestAPIsOnGroupsFunctionExecutionDUnitTest extends RestAPITestBase return false; } } - - - public static void executeFunctionThroughRestCall(List<String> restURLs) { - Random randomGenerator = new Random(); - int index = randomGenerator.nextInt(restURLs.size()); - - try { - - CloseableHttpClient httpclient = HttpClients.createDefault(); - CloseableHttpResponse response = null; - HttpPost post = new HttpPost(restURLs.get(index) + "/functions/OnGroupsFunction?onGroups=g0,g1"); - post.addHeader("Content-Type", "application/json"); - post.addHeader("Accept", "application/json"); - LogWriterUtils.getLogWriter().info("Request POST : " + post.toString()); - response = httpclient.execute(post); - - HttpEntity entity = response.getEntity(); - InputStream content = entity.getContent(); - BufferedReader reader = new BufferedReader(new InputStreamReader( - content)); - String line; - StringBuffer sb = new StringBuffer(); - while ((line = reader.readLine()) != null) { - sb.append(line); - } - LogWriterUtils.getLogWriter().info("Response : " + sb.toString()); - - //verify response status code. expected status code is 200 OK. - assertEquals(response.getStatusLine().getStatusCode(), 200); - - - //verify response hasbody flag, expected is true. - assertNotNull(response.getEntity()); - - - response.close(); - - //verify function execution result - JSONArray resultArray = new JSONArray(sb.toString()); - assertEquals(resultArray.length(), 3); - - } catch (Exception e) { - throw new RuntimeException("unexpected exception", e); - } - - } - - public static void executeFunctionOnMemberThroughRestCall(List<String> restURLs) { - Random randomGenerator = new Random(); - int index = randomGenerator.nextInt(restURLs.size()); - - //Testcase-1: Executing function on non-existing group. - final IgnoredException ex = IgnoredException.addIgnoredException("com.gemstone.gemfire.cache.execute.FunctionException"); - try { - - CloseableHttpClient httpclient = HttpClients.createDefault(); - CloseableHttpResponse response = null; - HttpPost post = new HttpPost(restURLs.get(index) + "/functions/OnGroupsFunction?onGroups=no%20such%20group"); - post.addHeader("Content-Type", "application/json"); - post.addHeader("Accept", "application/json"); - response = httpclient.execute(post); - - if ( response.getStatusLine().getStatusCode() == 200 ) { - fail("FunctionException expected : no member(s) are found belonging to the provided group(s)"); - } - } catch (Exception e) { - throw new RuntimeException("unexpected exception", e); - - } finally { - ex.remove(); - } - - //Testcase-2: Executing function on group with args. - - final String FUNCTION_ARGS1 = "{" - + "\"@type\": \"string\"," - + "\"@value\": \"gm\"" - + "}"; - - try { - - CloseableHttpClient httpclient = HttpClients.createDefault(); - CloseableHttpResponse response = null; - HttpPost post = new HttpPost(restURLs.get(index) + "/functions/OnGroupsFunction?onGroups=gm"); - post.addHeader("Content-Type", "application/json"); - post.addHeader("Accept", "application/json"); - response = httpclient.execute(post); - - //verify response status code - assertEquals(response.getStatusLine().getStatusCode(), 200); - - //verify response hasbody flag - assertNotNull(response.getEntity()); - - HttpEntity entity = response.getEntity(); - InputStream content = entity.getContent(); - BufferedReader reader = new BufferedReader(new InputStreamReader( - content)); - String line; - StringBuffer sb = new StringBuffer(); - while ((line = reader.readLine()) != null) { - sb.append(line); - } - response.close(); - - //verify function execution result - JSONArray resultArray = new JSONArray(sb.toString()); - assertEquals(resultArray.length(), 1); - - } catch (Exception e) { - throw new RuntimeException("unexpected exception", e); - } - } - - private void verifyAndResetInvocationCount(VM vm, final int count) { - vm.invoke(new SerializableCallable() { - @Override - public Object call() throws Exception { - OnGroupsFunction f = (OnGroupsFunction) FunctionService.getFunction(OnGroupsFunction.Id); - assertEquals(count, f.invocationCount); - // assert succeeded, reset count - f.invocationCount = 0; - return null; - } - }); - } - - private void resetInvocationCount(VM vm) { - vm.invoke(new SerializableCallable() { - @Override - public Object call() throws Exception { - OnGroupsFunction f = (OnGroupsFunction) FunctionService.getFunction(OnGroupsFunction.Id); - f.invocationCount = 0; - return null; - } - }); - } - - public void testonGroupsExecutionOnAllMembers() { - - List<String> restURLs = new ArrayList<String>(); - //Step-1 : create cache on each VM, this will start HTTP service in embedded mode and deploy REST APIs web app on it. - // Create and configure Region on all VMs. Add Rest end-point into the restURLs list. - - String url1 = (String)vm0.invoke(() -> RestAPITestBase.createCacheWithGroups(vm0, "g0,gm", "TEST_REGION")); - restURLs.add(url1); - - String url2 = (String)vm1.invoke(() -> RestAPITestBase.createCacheWithGroups(vm1, "g1", "TEST_REGION" )); - restURLs.add(url2); - - String url3 = (String)vm2.invoke(() -> RestAPITestBase.createCacheWithGroups(vm2, "g0,g1", "TEST_REGION")); - restURLs.add(url3); - - //Step-2: Register function in all VMs - registerFunction(vm0); - registerFunction(vm1); - registerFunction(vm2); - - //Step-3 : Execute function randomly (in iteration) on all available (per VM) REST end-points and verify its result - for (int i=0; i< 10; i++) - executeFunctionThroughRestCall(restURLs); - - //Verify that each node belonging to specified group has run the function - verifyAndResetInvocationCount(vm0, 10); - verifyAndResetInvocationCount(vm1, 10); - verifyAndResetInvocationCount(vm2, 10); - - restURLs.clear(); - } - - - public void testBasicP2PFunctionSelectedGroup() { - - List<String> restURLs = new ArrayList<String>(); - - //Step-1 : create cache on each VM, this will start HTTP service in embedded mode and deploy REST APIs web app on it. - // Create and configure Region on all VMs. Add Rest end-point into the restURLs list. - String url1 = (String)vm0.invoke(() -> RestAPITestBase.createCacheWithGroups(vm0, "g0,gm", "null" )); - restURLs.add(url1); - - String url2 = (String)vm1.invoke(() -> RestAPITestBase.createCacheWithGroups(vm1, "g1", "null" )); - restURLs.add(url2); - - String url3 = (String)vm2.invoke(() -> RestAPITestBase.createCacheWithGroups(vm2, "g0,g1", "null" )); - restURLs.add(url3); - - //Step-2: Register function in all VMs - registerFunction(vm0); - registerFunction(vm1); - registerFunction(vm2); - - //Step-3 : Execute function randomly (in iteration) on all available (per VM) REST end-points and verify its result - for (int i=0; i< 5; i++) - executeFunctionOnMemberThroughRestCall(restURLs); - - resetInvocationCount(vm0); - resetInvocationCount(vm1); - resetInvocationCount(vm2); - - restURLs.clear(); - } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnMembersFunctionExecutionDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnMembersFunctionExecutionDUnitTest.java b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnMembersFunctionExecutionDUnitTest.java index adb2b55..ac922ad 100644 --- a/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnMembersFunctionExecutionDUnitTest.java +++ b/geode-assembly/src/test/java/com/gemstone/gemfire/rest/internal/web/controllers/RestAPIsOnMembersFunctionExecutionDUnitTest.java @@ -16,82 +16,49 @@ */ package com.gemstone.gemfire.rest.internal.web.controllers; -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.Random; - -import org.apache.http.HttpEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.json.JSONArray; - import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.CacheClosedException; import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionContext; import com.gemstone.gemfire.cache.execute.FunctionService; -import com.gemstone.gemfire.cache30.CacheTestCase; import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.internal.AvailablePortHelper; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.rest.internal.web.RestFunctionTemplate; import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.SerializableCallable; import com.gemstone.gemfire.test.dunit.VM; +import org.apache.http.client.methods.CloseableHttpResponse; + +import java.util.Properties; /** - * * @author Nilkanth Patel */ -public class RestAPIsOnMembersFunctionExecutionDUnitTest extends CacheTestCase { - +public class RestAPIsOnMembersFunctionExecutionDUnitTest extends RestAPITestBase { + private static final long serialVersionUID = 1L; - - VM member1 = null; - VM member2 = null; - VM member3 = null; - VM member4 = null; - - static InternalDistributedSystem ds = null; public RestAPIsOnMembersFunctionExecutionDUnitTest(String name) { super(name); } - + @Override public void setUp() throws Exception { super.setUp(); - Host host = Host.getHost(0); - member1 = host.getVM(0); - member2 = host.getVM(1); - member3 = host.getVM(2); - member4 = host.getVM(3); } - - static class OnMembersFunction implements Function { - private static final long serialVersionUID = -1032915440862585532L; + + private class OnMembersFunction extends RestFunctionTemplate { public static final String Id = "OnMembersFunction"; - public static int invocationCount; @Override public void execute(FunctionContext context) { - - LogWriterUtils.getLogWriter().fine("SWAP:1:executing OnMembersFunction:"+invocationCount); - InternalDistributedSystem ds = InternalDistributedSystem.getConnectedInstance(); + + LogWriterUtils.getLogWriter().fine("SWAP:1:executing OnMembersFunction:" + invocationCount); invocationCount++; - + context.getResultSender().lastResult(Boolean.TRUE); } - + @Override public String getId() { return Id; @@ -112,217 +79,98 @@ public class RestAPIsOnMembersFunctionExecutionDUnitTest extends CacheTestCase { return false; } } - - private void verifyAndResetInvocationCount(VM vm, final int count) { - vm.invoke(new SerializableCallable() { - @Override - public Object call() throws Exception { - OnMembersFunction f = (OnMembersFunction) FunctionService.getFunction(OnMembersFunction.Id); - assertEquals(count, f.invocationCount); - // assert succeeded, reset count - f.invocationCount = 0; - return null; - } - }); - } - - private InternalDistributedSystem createSystem(Properties props){ - try { - ds = getSystem(props); - assertNotNull(ds); - FunctionService.registerFunction(new OnMembersFunction()); - - } - catch (Exception e) { - Assert.fail("Failed while creating the Distribued System", e); - } - return ds; - } - - public static String createCacheAndRegisterFunction(VM vm, String memberName) { - final String hostName = vm.getHost().getHostName(); + + private String createCacheAndRegisterFunction(VM vm, String memberName) { + final String hostName = vm.getHost().getHostName(); final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - + Properties props = new Properties(); props.setProperty(DistributionConfig.NAME_NAME, memberName); props.setProperty(DistributionConfig.START_DEV_REST_API_NAME, "true"); props.setProperty(DistributionConfig.HTTP_SERVICE_BIND_ADDRESS_NAME, hostName); props.setProperty(DistributionConfig.HTTP_SERVICE_PORT_NAME, String.valueOf(serverPort)); - + Cache c = null; try { - c = CacheFactory.getInstance( new RestAPIsOnMembersFunctionExecutionDUnitTest("temp").getSystem(props)); + c = CacheFactory.getInstance(new RestAPIsOnMembersFunctionExecutionDUnitTest("temp").getSystem(props)); c.close(); } catch (CacheClosedException cce) { } - + c = CacheFactory.create(new RestAPIsOnMembersFunctionExecutionDUnitTest("temp").getSystem(props)); FunctionService.registerFunction(new OnMembersFunction()); - - String restEndPoint = "http://" + hostName + ":" + serverPort + "/gemfire-api/v1"; + + String restEndPoint = "http://" + hostName + ":" + serverPort + "/gemfire-api/v1"; return restEndPoint; - + } - - public static void executeFunctionOnAllMembersThroughRestCall(List<String> restURLs) { - Random randomGenerator = new Random(); - int index = randomGenerator.nextInt(restURLs.size()); - - //Testcase: onMembers Function execution with No groups specified - try { - - CloseableHttpClient httpclient = HttpClients.createDefault(); - CloseableHttpResponse response = null; - HttpPost post = new HttpPost(restURLs.get(index) + "/functions/OnMembersFunction"); - post.addHeader("Content-Type", "application/json"); - post.addHeader("Accept", "application/json"); - - LogWriterUtils.getLogWriter().info("Request POST : " + post.toString()); - - response = httpclient.execute(post); - - HttpEntity entity = response.getEntity(); - InputStream content = entity.getContent(); - BufferedReader reader = new BufferedReader(new InputStreamReader( - content)); - String line; - StringBuffer sb = new StringBuffer(); - while ((line = reader.readLine()) != null) { - sb.append(line); - } - LogWriterUtils.getLogWriter().info("Response : " + sb.toString()); - - - //verify response status code - assertEquals(200, response.getStatusLine().getStatusCode()); - - //verify response hasbody flag - assertNotNull(response.getEntity()); - - - response.close(); - - JSONArray resultArray = new JSONArray(sb.toString()); - assertEquals(resultArray.length(), 4); - - //fail("Expected exception while executing function onMembers without any members "); - - } catch (Exception e) { - throw new RuntimeException("unexpected exception", e); - } + + @Override + protected String getFunctionID() { + return OnMembersFunction.Id; } - - public static void executeFunctionOnGivenMembersThroughRestCall(List<String> restURLs) { - Random randomGenerator = new Random(); - int index = randomGenerator.nextInt(restURLs.size()); - - //Testcase: onMembers Function execution with valid groups - try { - - CloseableHttpClient httpclient = HttpClients.createDefault(); - CloseableHttpResponse response = null; - HttpPost post = new HttpPost(restURLs.get(index) + "/functions/OnMembersFunction?onMembers=m1,m2,m3"); - post.addHeader("Content-Type", "application/json"); - post.addHeader("Accept", "application/json"); - response = httpclient.execute(post); - - //verify response status code. expected status code is 200 OK. - assertEquals(response.getStatusLine().getStatusCode(), 200); - - //verify response hasbody flag, expected is true. - assertNotNull(response.getEntity()); - - - HttpEntity entity = response.getEntity(); - InputStream content = entity.getContent(); - BufferedReader reader = new BufferedReader(new InputStreamReader( - content)); - String line; - StringBuffer sb = new StringBuffer(); - while ((line = reader.readLine()) != null) { - sb.append(line); - } - response.close(); - - //verify function execution result - JSONArray resultArray = new JSONArray(sb.toString()); - assertEquals(resultArray.length(), 3); - - } catch (Exception e) { - throw new RuntimeException("unexpected exception", e); + + public void testFunctionExecutionOnAllMembers() { + restURLs.add(vm0.invoke(() -> createCacheAndRegisterFunction(vm0, "m1"))); + restURLs.add(vm1.invoke(() -> createCacheAndRegisterFunction(vm1, "m2"))); + restURLs.add(vm2.invoke(() -> createCacheAndRegisterFunction(vm2, "m3"))); + restURLs.add(vm3.invoke(() -> createCacheAndRegisterFunction(vm3, "m4"))); + + for (int i = 0; i < 10; i++) { + CloseableHttpResponse response = executeFunctionThroughRestCall("OnMembersFunction",null,null,null,null,null); + assertHttpResponse(response, 200, 4); } + + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + int c3 = vm3.invoke(() -> getInvocationCount()); + + assertEquals(40, (c0 + c1 + c2 + c3)); + + restURLs.clear(); } - - public void testFunctionExecutionOnAllMembers() { - - List<String> restURLs = new ArrayList<String>(); - - //Step-1 : create cache on each VM, this will start HTTP service in embedded mode and deploy REST APIs web app on it. - // Connect to DS and Register function. Add Rest end-point into the restURLs list. - - String url1 = (String)member1.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member1, "m1")); - restURLs.add(url1); - - String url2 = (String)member2.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member2, "m2")); - restURLs.add(url2); - - String url3 = (String)member3.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member3, "m3")); - restURLs.add(url3); - - String url4 = (String)member4.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member4, "m4")); - restURLs.add(url4); - - //default case, execute function on all members, register the function in controller VM - //FunctionService.registerFunction(new OnMembersFunction()); - - //Step-2 : Execute function randomly (in iteration) on all available (per VM) REST end-points and verify its result - for (int i=0; i< 10; i++) { - executeFunctionOnAllMembersThroughRestCall(restURLs); + + public void testFunctionExecutionEOnSelectedMembers() { + restURLs.add((String) vm0.invoke(() -> createCacheAndRegisterFunction(vm0, "m1"))); + restURLs.add((String) vm1.invoke(() -> createCacheAndRegisterFunction(vm1, "m2"))); + restURLs.add((String) vm2.invoke(() -> createCacheAndRegisterFunction(vm2, "m3"))); + restURLs.add((String) vm3.invoke(() -> createCacheAndRegisterFunction(vm3, "m4"))); + + for (int i = 0; i < 10; i++) { + CloseableHttpResponse response = executeFunctionThroughRestCall("OnMembersFunction",null,null,null,null,"m1,m2,m3"); + assertHttpResponse(response, 200, 3); } - - //Verify that each node (m1, m2, m3) has run the function - verifyAndResetInvocationCount(member1, 10); - verifyAndResetInvocationCount(member2, 10); - verifyAndResetInvocationCount(member3, 10); - verifyAndResetInvocationCount(member4, 10); + + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + int c3 = vm3.invoke(() -> getInvocationCount()); + + assertEquals(30, (c0 + c1 + c2 + c3)); restURLs.clear(); } - - public void testFunctionExecutionEOnSelectedMembers() { - - List<String> restURLs = new ArrayList<String>(); - - //Step-1 : create cache on each VM, this will start HTTP service in embedded mode and deploy REST APIs web app on it. - // Connect to DS and Register function. Add Rest end-point into the restURLs list. - - String url1 = (String)member1.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member1, "m1")); - restURLs.add(url1); - - String url2 = (String)member2.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member2, "m2")); - restURLs.add(url2); - - String url3 = (String)member3.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member3, "m3")); - restURLs.add(url3); - - String url4 = (String)member4.invoke(() -> RestAPIsOnMembersFunctionExecutionDUnitTest.createCacheAndRegisterFunction(member4, "m4")); - restURLs.add(url4); - - //default case, execute function on all members, register the function in controller VM - //FunctionService.registerFunction(new OnMembersFunction()); - - //Step-2 : Execute function randomly (in iteration) on all available (per VM) REST end-points and verify its result - for (int i=0; i< 10; i++) { - executeFunctionOnGivenMembersThroughRestCall(restURLs); + + public void testFunctionExecutionOnMembersWithFilter() { + restURLs.add((String) vm0.invoke(() -> createCacheAndRegisterFunction(vm0, "m1"))); + restURLs.add((String) vm1.invoke(() -> createCacheAndRegisterFunction(vm1, "m2"))); + restURLs.add((String) vm2.invoke(() -> createCacheAndRegisterFunction(vm2, "m3"))); + restURLs.add((String) vm3.invoke(() -> createCacheAndRegisterFunction(vm3, "m4"))); + + for (int i = 0; i < 10; i++) { + CloseableHttpResponse response = executeFunctionThroughRestCall("OnMembersFunction",null,"key2",null,null,"m1,m2,m3"); + assertHttpResponse(response, 500, 0); } - - //Verify that each node (m1, m2, m3) has run the function - verifyAndResetInvocationCount(member1, 10); - verifyAndResetInvocationCount(member2, 10); - verifyAndResetInvocationCount(member3, 10); - + + int c0 = vm0.invoke(() -> getInvocationCount()); + int c1 = vm1.invoke(() -> getInvocationCount()); + int c2 = vm2.invoke(() -> getInvocationCount()); + int c3 = vm3.invoke(() -> getInvocationCount()); + + assertEquals(0, (c0 + c1 + c2 + c3)); restURLs.clear(); } - + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/controllers/FunctionAccessController.java ---------------------------------------------------------------------- diff --git a/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/controllers/FunctionAccessController.java b/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/controllers/FunctionAccessController.java index 2c37c7f..929b70a 100644 --- a/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/controllers/FunctionAccessController.java +++ b/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/controllers/FunctionAccessController.java @@ -17,44 +17,32 @@ package com.gemstone.gemfire.rest.internal.web.controllers; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.apache.logging.log4j.Logger; -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Controller; -import org.springframework.util.StringUtils; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RequestMethod; -import org.springframework.web.bind.annotation.RequestParam; -import org.springframework.web.bind.annotation.ResponseBody; -import org.springframework.web.bind.annotation.ResponseStatus; - import com.gemstone.gemfire.cache.LowMemoryException; -import com.gemstone.gemfire.cache.execute.Execution; -import com.gemstone.gemfire.cache.execute.Function; -import com.gemstone.gemfire.cache.execute.FunctionException; -import com.gemstone.gemfire.cache.execute.FunctionService; -import com.gemstone.gemfire.cache.execute.ResultCollector; +import com.gemstone.gemfire.cache.execute.*; import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.rest.internal.web.exception.GemfireRestException; import com.gemstone.gemfire.rest.internal.web.util.ArrayUtils; import com.gemstone.gemfire.rest.internal.web.util.JSONUtils; -import org.json.JSONException; import com.wordnik.swagger.annotations.Api; import com.wordnik.swagger.annotations.ApiOperation; import com.wordnik.swagger.annotations.ApiResponse; import com.wordnik.swagger.annotations.ApiResponses; +import org.apache.logging.log4j.Logger; +import org.json.JSONException; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Controller; +import org.springframework.util.StringUtils; +import org.springframework.web.bind.annotation.*; + +import java.util.*; /** * The FunctionsController class serving REST Requests related to the function execution - * <p/> + * <p> + * * @author Nilkanth Patel, john blum * @see org.springframework.stereotype.Controller * @since 8.0 @@ -72,172 +60,181 @@ public class FunctionAccessController extends AbstractBaseController { /** * Gets the version of the REST API implemented by this @Controller. - * <p/> + * <p> + * * @return a String indicating the REST API version. */ @Override protected String getRestApiVersion() { return REST_API_VERSION; } - + /** * list all registered functions in Gemfire data node + * * @return result as a JSON document. */ - @RequestMapping(method = RequestMethod.GET, produces = { MediaType.APPLICATION_JSON_VALUE }) + @RequestMapping(method = RequestMethod.GET, produces = { MediaType.APPLICATION_JSON_VALUE }) @ApiOperation( - value = "list all functions", - notes = "list all functions available in the GemFire cluster", - response = void.class + value = "list all functions", + notes = "list all functions available in the GemFire cluster", + response = void.class ) - @ApiResponses( { - @ApiResponse( code = 200, message = "OK." ), - @ApiResponse( code = 500, message = "GemFire throws an error or exception." ) - } ) + @ApiResponses({ + @ApiResponse(code = 200, message = "OK."), + @ApiResponse(code = 500, message = "GemFire throws an error or exception.") + }) @ResponseBody @ResponseStatus(HttpStatus.OK) public ResponseEntity<?> list() { - - if(logger.isDebugEnabled()){ + + if (logger.isDebugEnabled()) { logger.debug("Listing all registered Functions in GemFire..."); } - + final Map<String, Function> registeredFunctions = FunctionService.getRegisteredFunctions(); - String listFunctionsAsJson = JSONUtils.formulateJsonForListFunctionsCall(registeredFunctions.keySet()); - final HttpHeaders headers = new HttpHeaders(); + String listFunctionsAsJson = JSONUtils.formulateJsonForListFunctionsCall(registeredFunctions.keySet()); + final HttpHeaders headers = new HttpHeaders(); headers.setLocation(toUri("functions")); return new ResponseEntity<String>(listFunctionsAsJson, headers, HttpStatus.OK); - } - + } + /** * Execute a function on Gemfire data node using REST API call. - * Arguments to the function are passed as JSON string in the request body. + * Arguments to the function are passed as JSON string in the request body. + * * @param functionId represents function to be executed - * @param region list of regions on which function to be executed. - * @param members list of nodes on which function to be executed. - * @param groups list of groups on which function to be executed. + * @param region list of regions on which function to be executed. + * @param members list of nodes on which function to be executed. + * @param groups list of groups on which function to be executed. + * @param filter list of keys which the function will use to determine on which node to execute the function. * @param argsInBody function argument as a JSON document * @return result as a JSON document */ @RequestMapping(method = RequestMethod.POST, value = "/{functionId}", produces = { MediaType.APPLICATION_JSON_VALUE }) @ApiOperation( - value = "execute function", - notes = "Execute function with arguments on regions, members, or group(s). By default function will be executed on all nodes if none of (onRegion, onMembers, onGroups) specified", - response = void.class + value = "execute function", + notes = "Execute function with arguments on regions, members, or group(s). By default function will be executed on all nodes if none of (onRegion, onMembers, onGroups) specified", + response = void.class ) - @ApiResponses( { - @ApiResponse( code = 200, message = "OK." ), - @ApiResponse( code = 500, message = "if GemFire throws an error or exception" ), - @ApiResponse( code = 400, message = "if Function arguments specified as JSON document in the request body is invalid" ) - } ) + @ApiResponses({ + @ApiResponse(code = 200, message = "OK."), + @ApiResponse(code = 500, message = "if GemFire throws an error or exception"), + @ApiResponse(code = 400, message = "if Function arguments specified as JSON document in the request body is invalid") + }) @ResponseBody @ResponseStatus(HttpStatus.OK) public ResponseEntity<String> execute(@PathVariable("functionId") String functionId, - @RequestParam(value = "onRegion", required = false ) String region, - @RequestParam(value = "onMembers", required = false ) final String[] members, - @RequestParam(value = "onGroups", required = false) final String[] groups, - @RequestBody(required = false) final String argsInBody - ) - { + @RequestParam(value = "onRegion", required = false) String region, + @RequestParam(value = "onMembers", required = false) final String[] members, + @RequestParam(value = "onGroups", required = false) final String[] groups, + @RequestParam(value = "filter", required = false) final String[] filter, + @RequestBody(required = false) final String argsInBody + ) { Execution function = null; functionId = decode(functionId); - + if (StringUtils.hasText(region)) { - if(logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.debug("Executing Function ({}) with arguments ({}) on Region ({})...", functionId, ArrayUtils.toString(argsInBody), region); } - + region = decode(region); try { function = FunctionService.onRegion(getRegion(region)); - } catch(FunctionException fe){ + } catch (FunctionException fe) { throw new GemfireRestException(String.format("The Region identified by name (%1$s) could not found!", region), fe); } - } - else if (ArrayUtils.isNotEmpty(members)) { - if(logger.isDebugEnabled()){ + } else if (ArrayUtils.isNotEmpty(members)) { + if (logger.isDebugEnabled()) { logger.debug("Executing Function ({}) with arguments ({}) on Member ({})...", functionId, ArrayUtils.toString(argsInBody), ArrayUtils.toString(members)); } - try { + try { function = FunctionService.onMembers(getMembers(members)); - } catch(FunctionException fe){ - throw new GemfireRestException("Could not found the specified members in disributed system!", fe); + } catch (FunctionException fe) { + throw new GemfireRestException("Could not found the specified members in distributed system!", fe); } - } - else if (ArrayUtils.isNotEmpty(groups)) { - if(logger.isDebugEnabled()){ + } else if (ArrayUtils.isNotEmpty(groups)) { + if (logger.isDebugEnabled()) { logger.debug("Executing Function ({}) with arguments ({}) on Groups ({})...", functionId, ArrayUtils.toString(argsInBody), ArrayUtils.toString(groups)); } try { function = FunctionService.onMembers(groups); - } catch(FunctionException fe){ + } catch (FunctionException fe) { throw new GemfireRestException("no member(s) are found belonging to the provided group(s)!", fe); } - } - else { + } else { //Default case is to execute function on all existing data node in DS, document this. - if(logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.debug("Executing Function ({}) with arguments ({}) on all Members...", functionId, ArrayUtils.toString(argsInBody)); } - + try { function = FunctionService.onMembers(getAllMembersInDS()); - } catch(FunctionException fe) { - throw new GemfireRestException("Disributed system does not contain any valid data node to run the specified function!", fe); + } catch (FunctionException fe) { + throw new GemfireRestException("Distributed system does not contain any valid data node to run the specified function!", fe); + } + } + + if (!ArrayUtils.isEmpty(filter)) { + if (logger.isDebugEnabled()) { + logger.debug("Executing Function ({}) with filter ({})", functionId, + ArrayUtils.toString(filter)); } + Set filter1 = ArrayUtils.asSet(filter); + function = function.withFilter(filter1); } final ResultCollector<?, ?> results; - + try { - if(argsInBody != null) - { + if (argsInBody != null) { Object[] args = jsonToObjectArray(argsInBody); - + //execute function with specified arguments - if(args.length == 1){ + if (args.length == 1) { results = function.withArgs(args[0]).execute(functionId); } else { results = function.withArgs(args).execute(functionId); } - }else { + } else { //execute function with no args results = function.execute(functionId); } - } catch(ClassCastException cce){ + } catch (ClassCastException cce) { throw new GemfireRestException("Key is of an inappropriate type for this region!", cce); - } catch(NullPointerException npe){ + } catch (NullPointerException npe) { throw new GemfireRestException("Specified key is null and this region does not permit null keys!", npe); - } catch(LowMemoryException lme){ + } catch (LowMemoryException lme) { throw new GemfireRestException("Server has encountered low memory condition!", lme); } catch (IllegalArgumentException ie) { throw new GemfireRestException("Input parameter is null! ", ie); - }catch (FunctionException fe){ + } catch (FunctionException fe) { throw new GemfireRestException("Server has encountered error while executing the function!", fe); } - + try { Object functionResult = results.getResult(); - - if(functionResult instanceof List<?>) { + + if (functionResult instanceof List<?>) { final HttpHeaders headers = new HttpHeaders(); headers.setLocation(toUri("functions", functionId)); - + try { @SuppressWarnings("unchecked") - String functionResultAsJson = JSONUtils.convertCollectionToJson((ArrayList<Object>)functionResult); - return new ResponseEntity<String>(functionResultAsJson, headers, HttpStatus.OK); + String functionResultAsJson = JSONUtils.convertCollectionToJson((ArrayList<Object>) functionResult); + return new ResponseEntity<String>(functionResultAsJson, headers, HttpStatus.OK); } catch (JSONException e) { throw new GemfireRestException("Could not convert function results into Restful (JSON) format!", e); } - }else { + } else { throw new GemfireRestException("Function has returned results that could not be converted into Restful (JSON) format!"); } - }catch (FunctionException fe) { + } catch (FunctionException fe) { fe.printStackTrace(); throw new GemfireRestException("Server has encountered an error while processing function execution!", fe); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/util/ArrayUtils.java ---------------------------------------------------------------------- diff --git a/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/util/ArrayUtils.java b/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/util/ArrayUtils.java index 261f9ad..d2d4f2f 100644 --- a/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/util/ArrayUtils.java +++ b/geode-web-api/src/main/java/com/gemstone/gemfire/rest/internal/web/util/ArrayUtils.java @@ -17,6 +17,9 @@ package com.gemstone.gemfire.rest.internal.web.util; +import java.util.LinkedHashSet; +import java.util.Set; + /** * The ArrayUtils class is an abstract utility class for working with Object arrays. * <p/> @@ -56,5 +59,12 @@ public abstract class ArrayUtils { public static String toString(final String... array) { return toString((Object[])array); } - + + public static Set asSet(String[] filter) { + LinkedHashSet linkedHashSet = new LinkedHashSet(filter.length); + for (int i = 0; i < filter.length; i++) { + linkedHashSet.add(filter[i]); + } + return linkedHashSet; + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f2175524/gradle/dependency-versions.properties ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.properties b/gradle/dependency-versions.properties index 8a533f6..f0738d7 100644 --- a/gradle/dependency-versions.properties +++ b/gradle/dependency-versions.properties @@ -45,8 +45,8 @@ hamcrest-all.version = 1.3 hbase.version = 0.94.27 hibernate.version = 3.5.5-Final hibernate-commons-annotations.version = 3.2.0.Final -httpclient.version = 4.3.3 -httpcore.version = 4.3.3 +httpclient.version = 4.5.1 +httpcore.version = 4.4.4 httpunit.version = 1.7.2 hsqldb.version = 2.0.0 jackson.version = 2.2.0