GEODE-1760 Sending partial results to users ResultCollector For non-HA functions on the client, results will now be passed to the users result collector, even if some nodes fail due to a bucket movement or cache close.
Signed-off-by: Dan Smith <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/dbf41d1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/dbf41d1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/dbf41d1f Branch: refs/heads/feature/GEODE-420 Commit: dbf41d1facddb2e0f69657b3e0e1e82161f7fa33 Parents: 6a91b9d Author: Suranjan Kumar <[email protected]> Authored: Fri May 29 15:29:28 2015 +0530 Committer: Dan Smith <[email protected]> Committed: Wed Aug 17 14:23:04 2016 -0700 ---------------------------------------------------------------------- .../internal/ExecuteRegionFunctionOp.java | 104 ++++++++++++++++- .../ExecuteRegionFunctionSingleHopOp.java | 19 +++- .../internal/SingleHopClientExecutor.java | 44 +++++--- .../internal/cache/PartitionedRegion.java | 13 ++- .../cache/execute/AbstractExecution.java | 10 ++ .../FunctionStreamingResultCollector.java | 18 ++- .../cache/execute/LocalResultCollectorImpl.java | 8 +- .../PartitionedRegionFunctionResultSender.java | 112 ++++++++++++++++--- .../ServerToClientFunctionResultSender.java | 21 +++- .../ServerToClientFunctionResultSender65.java | 3 + .../PRFunctionStreamingResultCollector.java | 19 +++- .../tier/sockets/command/ExecuteFunction70.java | 3 + .../internal/cache/functions/TestFunction.java | 7 +- 13 files changed, 329 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java index d154020..9d8dd7d 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java @@ -24,16 +24,19 @@ import java.util.Set; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.cache.CacheClosedException; import com.gemstone.gemfire.cache.client.ServerConnectivityException; import com.gemstone.gemfire.cache.client.ServerOperationException; import com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl; import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionException; +import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException; import com.gemstone.gemfire.cache.execute.ResultCollector; import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.execute.AbstractExecution; +import com.gemstone.gemfire.internal.cache.execute.BucketMovedException; import com.gemstone.gemfire.internal.cache.execute.FunctionStats; import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException; import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument; @@ -84,6 +87,7 @@ public class ExecuteRegionFunctionOp { final boolean isDebugEnabled =logger.isDebugEnabled(); do { + try { if (reexecuteForServ) { reexecOp = new ExecuteRegionFunctionOpImpl( @@ -319,8 +323,12 @@ public class ExecuteRegionFunctionOp { private Set<String> failedNodes = new HashSet<String>(); private final String functionId; + private final boolean executeOnBucketSet; + + private final boolean isHA; + private FunctionException functionException; public ExecuteRegionFunctionOpImpl(String region, Function function, ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, @@ -364,6 +372,7 @@ public class ExecuteRegionFunctionOp { this.executor = serverRegionExecutor; this.hasResult = functionState; this.failedNodes = removedNodes; + this.isHA = function.isHA(); } public ExecuteRegionFunctionOpImpl(String region, String function, @@ -408,6 +417,7 @@ public class ExecuteRegionFunctionOp { this.executor = serverRegionExecutor; this.hasResult = functionState; this.failedNodes = removedNodes; + this.isHA = isHA; } public ExecuteRegionFunctionOpImpl( @@ -431,6 +441,7 @@ public class ExecuteRegionFunctionOp { this.hasResult = op.hasResult; this.failedNodes = op.failedNodes; this.executeOnBucketSet = op.executeOnBucketSet; + this.isHA = op.isHA; if (isReExecute == 1) { this.resultCollector.endResults(); this.resultCollector.clearResults(); @@ -501,6 +512,9 @@ public class ExecuteRegionFunctionOp { else { result = resultResponse; } + + // if the function is HA throw exceptions + // if nonHA collect these exceptions and wait till you get last chunk if (result instanceof FunctionException) { FunctionException ex = ((FunctionException)result); if (ex instanceof InternalFunctionException) { @@ -518,14 +532,90 @@ public class ExecuteRegionFunctionOp { InternalFunctionInvocationTargetException ifite = (InternalFunctionInvocationTargetException)ex .getCause(); this.failedNodes.addAll(ifite.getFailedNodeSet()); + if (this.functionException == null) { + this.functionException = (FunctionException)result; + } + this.functionException.addException((FunctionException)result); + } + else if(((FunctionException) result).getCause() instanceof FunctionInvocationTargetException){ + if (this.functionException == null) { + this.functionException = ex; + } + this.functionException.addException(ex.getCause()); + } + else if(result instanceof FunctionInvocationTargetException){ + if (this.functionException == null) { + this.functionException = new FunctionException((FunctionInvocationTargetException)result); + } + this.functionException.addException(ex); + } + else if(result instanceof InternalFunctionInvocationTargetException){ + if (this.functionException == null) { + this.functionException = new FunctionException((InternalFunctionInvocationTargetException)result); + } + this.functionException.addException(ex); + } + else{ + if (this.functionException == null) { + this.functionException = ex; + } + this.functionException.addException(ex); + } + if (isHA) { + executeFunctionResponseMsg.clear(); + throw ex; } - executeFunctionResponseMsg.clear(); - throw ex; } else if (result instanceof Throwable) { - String s = "While performing a remote " + getOpName(); - executeFunctionResponseMsg.clear(); - throw new ServerOperationException(s, (Throwable)result); + Throwable t = (Throwable)result; + boolean throwServerOp = false; + if (this.functionException == null) { + if(result instanceof BucketMovedException){ + FunctionInvocationTargetException fite; + if(isHA){ + fite = new InternalFunctionInvocationTargetException( + ((BucketMovedException)result).getMessage()); + }else { + fite = new FunctionInvocationTargetException( + ((BucketMovedException)result).getMessage()); + } + this.functionException = new FunctionException(fite); + this.functionException.addException(fite); + } + else if (result instanceof CacheClosedException) { + FunctionInvocationTargetException fite; + if(isHA) { + fite = new InternalFunctionInvocationTargetException(((CacheClosedException)result).getMessage()); + } + else{ + fite = new FunctionInvocationTargetException(((CacheClosedException)result).getMessage()); + } + if (resultResponse instanceof ArrayList) { + DistributedMember memberID = (DistributedMember) ((ArrayList) resultResponse) + .get(1); + this.failedNodes.add(memberID.getId()); + } + this.functionException = new FunctionException(fite); + this.functionException.addException(fite); + } + else { + throwServerOp = true; + this.functionException = new FunctionException(t); + this.functionException.addException(t); + } + } else { + this.functionException.addException(t); + } + + if (isHA) { + if (throwServerOp) { + String s = "While performing a remote " + getOpName(); + executeFunctionResponseMsg.clear(); + throw new ServerOperationException(s, this.functionException); + } else { + throw this.functionException; + } + } } else { DistributedMember memberID = (DistributedMember)((ArrayList)resultResponse) @@ -538,6 +628,10 @@ public class ExecuteRegionFunctionOp { if (isDebugEnabled) { logger.debug("ExecuteRegionFunctionOpImpl#processResponse: received all the results from server successfully."); } + // add all the exceptions here. + if (this.functionException != null) { + throw this.functionException; + } this.resultCollector.endResults(); return null; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java index ce7c3f8..768de5f 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java @@ -25,10 +25,12 @@ import java.util.Set; import org.apache.logging.log4j.Logger; import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.cache.CacheClosedException; import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.client.ServerOperationException; import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionException; +import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException; import com.gemstone.gemfire.cache.execute.ResultCollector; import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.ServerLocation; @@ -36,6 +38,7 @@ import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.LocalRegion; import com.gemstone.gemfire.internal.cache.execute.AbstractExecution; +import com.gemstone.gemfire.internal.cache.execute.BucketMovedException; import com.gemstone.gemfire.internal.cache.execute.FunctionStats; import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException; import com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException; @@ -346,7 +349,6 @@ public class ExecuteRegionFunctionSingleHopOp { else { result = resultResponse; } - if (result instanceof FunctionException) { FunctionException ex = ((FunctionException)result); if (isDebugEnabled) { @@ -373,6 +375,21 @@ public class ExecuteRegionFunctionSingleHopOp { return null; } + else if(result instanceof BucketMovedException) { + FunctionInvocationTargetException fite = new InternalFunctionInvocationTargetException( + ((BucketMovedException)result).getMessage()); + throw new FunctionException(fite); + } + else if(result instanceof CacheClosedException) { + FunctionInvocationTargetException fite = new InternalFunctionInvocationTargetException( + ((CacheClosedException)result).getMessage()); + if (resultResponse instanceof ArrayList) { + DistributedMember memberID = (DistributedMember) ((ArrayList) resultResponse) + .get(1); + this.failedNodes.add(memberID.getId()); + } + throw new FunctionException(fite); + } else if (result instanceof Throwable) { String s = "While performing a remote " + getOpName(); throw new ServerOperationException(s, (Throwable)result); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java index 30cac5b..c13cfea 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java @@ -29,9 +29,9 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.logging.log4j.Logger; +import com.gemstone.gemfire.GemFireException; import com.gemstone.gemfire.InternalGemFireException; import com.gemstone.gemfire.cache.CacheClosedException; import com.gemstone.gemfire.cache.client.ServerConnectivityException; @@ -128,6 +128,7 @@ public class SingleHopClientExecutor { throw new InternalGemFireException(e.getMessage()); } if (futures != null) { + GemFireException functionExecutionException = null; Iterator futureItr = futures.iterator(); Iterator taskItr = callableTasks.iterator(); final boolean isDebugEnabled = logger.isDebugEnabled(); @@ -161,31 +162,42 @@ public class SingleHopClientExecutor { reexecute = true; failedNodes.addAll(((InternalFunctionInvocationTargetException)ee .getCause()).getFailedNodeSet()); - rc.clearResults(); - if (!isHA) { + // Clear the results only if isHA so that partial results can be returned. + if (isHA) { + rc.clearResults(); + } else { if (ee.getCause().getCause() != null) { - throw new FunctionInvocationTargetException(ee.getCause() - .getCause()); + functionExecutionException = new FunctionInvocationTargetException( + ee.getCause().getCause()); } else { - throw new FunctionInvocationTargetException( + functionExecutionException = new FunctionInvocationTargetException( new BucketMovedException( LocalizedStrings.FunctionService_BUCKET_MIGRATED_TO_ANOTHER_NODE .toLocalizedString())); } } - } else if (ee.getCause() instanceof FunctionException) { if (isDebugEnabled) { logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.FunctionException : Caused by :{}", ee.getCause()); } - throw (FunctionException)ee.getCause(); + FunctionException fe = (FunctionException)ee.getCause(); + if (isHA) { + throw fe; + } else { + functionExecutionException = fe; + } } else if (ee.getCause() instanceof ServerOperationException) { if (isDebugEnabled) { logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.ServerOperationException : Caused by :{}", ee.getCause()); } - throw (ServerOperationException)ee.getCause(); + ServerOperationException soe = (ServerOperationException)ee.getCause(); + if (isHA) { + throw soe; + } else { + functionExecutionException = soe; + } } else if (ee.getCause() instanceof ServerConnectivityException) { if (isDebugEnabled) { @@ -199,11 +211,12 @@ public class SingleHopClientExecutor { } cms.removeBucketServerLocation(server); cms.scheduleGetPRMetaData(region, false); - reexecute = true; - rc.clearResults(); - if (!isHA) { - reexecute = false; - throw (ServerConnectivityException) ee.getCause(); + // Clear the results only if isHA so that partial results can be returned. + if (isHA) { + reexecute = true; + rc.clearResults(); + } else { + functionExecutionException = (ServerConnectivityException) ee.getCause(); } } else { @@ -211,6 +224,9 @@ public class SingleHopClientExecutor { } } } + if (functionExecutionException != null) { + throw functionExecutionException; + } } } return reexecute; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index 9ac95a1..a87ed9c 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -3546,6 +3546,9 @@ public class PartitionedRegion extends LocalRegion implements localBucketSet, resultSender, execution.isReExecute()); + if (logger.isDebugEnabled()) { + logger.debug("FunctionService: Executing on local node with keys.{}"+ localKeys); + } execution.executeFunctionOnLocalPRNode(function, prContext, resultSender, dm, isTX()); } @@ -3563,7 +3566,9 @@ public class PartitionedRegion extends LocalRegion implements execution.isFnSerializationReqd()); recipMap.put(recip, context); } - + if (logger.isDebugEnabled()) { + logger.debug("FunctionService: Executing on remote nodes with member to keys map.{}" + memberToKeysMap); + } PartitionedRegionFunctionResultWaiter resultReciever = new PartitionedRegionFunctionResultWaiter( getSystem(), this.getPRId(), localResultCollector, function, resultSender); @@ -3674,9 +3679,9 @@ public class PartitionedRegion extends LocalRegion implements .singleton(targetNode); execution.validateExecution(function, singleMember); execution.setExecutionNodes(singleMember); - if (targetNode.equals(localVm)) { - final LocalResultCollector<?, ?> localRC = execution + LocalResultCollector<?, ?> localRC = execution .getLocalResultCollector(function, rc); + if (targetNode.equals(localVm)) { final DM dm = getDistributionManager(); PartitionedRegionFunctionResultSender resultSender = new PartitionedRegionFunctionResultSender( dm, PartitionedRegion.this, 0, localRC, execution.getServerResultSender(), true, false, execution.isForwardExceptions(), function, buckets); @@ -3690,7 +3695,7 @@ public class PartitionedRegion extends LocalRegion implements } else { return executeFunctionOnRemoteNode(targetNode, function, execution - .getArgumentsForMember(targetNode.getId()), routingKeys, rc, buckets, + .getArgumentsForMember(targetNode.getId()), routingKeys, function.isHA()? rc :localRC, buckets, execution.getServerResultSender(), execution); } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java index 2e981a8..852301c 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java @@ -37,6 +37,7 @@ import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException; import com.gemstone.gemfire.cache.execute.FunctionService; import com.gemstone.gemfire.cache.execute.ResultCollector; import com.gemstone.gemfire.cache.execute.ResultSender; +import com.gemstone.gemfire.cache.query.QueryInvalidException; import com.gemstone.gemfire.distributed.internal.DM; import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; @@ -420,6 +421,7 @@ public abstract class AbstractExecution implements InternalExecution { } public final void setWaitOnExceptionFlag(boolean waitOnException) { + this.setForwardExceptions(waitOnException); this.waitOnException = waitOnException; } @@ -604,6 +606,14 @@ public abstract class AbstractExecution implements InternalExecution { stats.endFunctionExecutionWithException(fn.hasResult()); if (fn.hasResult()) { if (waitOnException || forwardExceptions) { + if(functionException instanceof FunctionException + && functionException.getCause() instanceof QueryInvalidException) { + // Handle this exception differently since it can contain + // non-serializable objects. + // java.io.NotSerializableException: antlr.CommonToken + // create a new FunctionException on the original one's message (not cause). + functionException = new FunctionException(functionException.getLocalizedMessage()); + } sender.lastResult(functionException); } else { ((InternalResultSender)sender).setException(functionException); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java index 7ed908e..359ae6b 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java @@ -545,7 +545,8 @@ public class FunctionStreamingResultCollector extends ReplyProcessor21 implement } else { if (execution.forwardExceptions || (execution.waitOnException - && !(m.getException().getCause() instanceof BucketMovedException))) { + /*&& !(m.getException().getCause() instanceof BucketMovedException)*/)) { + // send BucketMovedException forward which will be handled by LocalResultCollectorImpl synchronized (processSingleResult) { this.functionResultWaiter.processData(m.getException().getCause(), true, msg.getSender()); } @@ -607,16 +608,16 @@ public class FunctionStreamingResultCollector extends ReplyProcessor21 implement @Override protected synchronized void processException(ReplyException ex) { // we have already forwarded the exception, no need to keep it here - if (execution.isForwardExceptions()) { + if (execution.isForwardExceptions() || this.execution.waitOnException) { return; } // have to keep all the exception // rest exception will be added to localresultcollector and it will throw // them - if (ex.getCause() instanceof CacheClosedException + if ((ex.getCause() instanceof CacheClosedException || ex.getCause() instanceof ForceReattemptException - || ex.getCause() instanceof BucketMovedException) { + || ex.getCause() instanceof BucketMovedException)) { this.exception = ex; } else if (!execution.getWaitOnExceptionFlag()) { @@ -626,7 +627,14 @@ public class FunctionStreamingResultCollector extends ReplyProcessor21 implement @Override protected boolean stopBecauseOfExceptions() { - if (this.execution.isIgnoreDepartedMembers()) { + if (this.execution.isIgnoreDepartedMembers() ) { + return false; + } + // in case of waitOnException : keep processing + // the reply from other nodes + // this exception will be saved in this.exception + // which will be thrown at the end + if(this.execution.waitOnException) { return false; } return super.stopBecauseOfExceptions(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java index 23eb70a..ce06b78 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java @@ -22,9 +22,11 @@ import java.util.concurrent.TimeUnit; import com.gemstone.gemfire.cache.execute.Execution; import com.gemstone.gemfire.cache.execute.Function; import com.gemstone.gemfire.cache.execute.FunctionException; +import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException; import com.gemstone.gemfire.cache.execute.ResultCollector; import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; public final class LocalResultCollectorImpl implements LocalResultCollector { @@ -45,7 +47,6 @@ public final class LocalResultCollectorImpl implements LocalResultCollector { private AbstractExecution execution = null; - public LocalResultCollectorImpl(Function function, ResultCollector rc, Execution execution) { this.function = function; @@ -69,7 +70,10 @@ public final class LocalResultCollectorImpl implements LocalResultCollector { } else { if (!(t instanceof InternalFunctionException)) { if (this.functionException == null) { - if (resultOfSingleExecution instanceof FunctionException) { + if(resultOfSingleExecution instanceof FunctionInvocationTargetException){ + this.functionException = new FunctionException(t); + } + else if (resultOfSingleExecution instanceof FunctionException) { this.functionException = (FunctionException)resultOfSingleExecution; if (t.getCause() != null) { t = t.getCause(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java index d16e9b5..3a3e6e3 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java @@ -75,6 +75,8 @@ public final class PartitionedRegionFunctionResultSender implements private Set<Integer> bucketSet; + private BucketMovedException bme; + /** * Have to combine next two constructor in one and make a new class which will * send Results back. @@ -120,26 +122,47 @@ public final class PartitionedRegionFunctionResultSender implements this.bucketSet = bucketSet; } + private void checkForBucketMovement(Object oneResult){ + if (!(forwardExceptions && oneResult instanceof Throwable) + && !pr.getDataStore().areAllBucketsHosted(bucketSet)) { + // making sure that we send all the local results first + // before sending this exception to client + bme = new BucketMovedException( + LocalizedStrings.FunctionService_BUCKET_MIGRATED_TO_ANOTHER_NODE + .toLocalizedString()); + if(function.isHA()){ + throw bme; + } + } + + + } + + // this must be getting called directly from function public void lastResult(Object oneResult) { if (!this.function.hasResult()) { throw new IllegalStateException( LocalizedStrings.ExecuteFunction_CANNOT_0_RESULTS_HASRESULT_FALSE .toLocalizedString("send")); } - if (!(forwardExceptions && oneResult instanceof Throwable) - && !pr.getDataStore().areAllBucketsHosted(bucketSet)) { - throw new BucketMovedException( - LocalizedStrings.FunctionService_BUCKET_MIGRATED_TO_ANOTHER_NODE - .toLocalizedString()); - } + // this could be done before doing end result + // so that client receives all the results before + if (this.serverSender != null) { // Client-Server if(this.localLastResultRecieved){ return; } if (onlyLocal) { - lastClientSend(dm.getDistributionManagerId(), oneResult); + checkForBucketMovement(oneResult); + if (bme != null) { + clientSend(oneResult, dm.getDistributionManagerId()); + lastClientSend(dm.getDistributionManagerId(), bme); + }else{ + lastClientSend(dm.getDistributionManagerId(), oneResult); + } this.rc.endResults(); this.localLastResultRecieved = true; + } else { //call a synchronized method as local node is also waiting to send lastResult @@ -149,8 +172,15 @@ public final class PartitionedRegionFunctionResultSender implements else { // P2P if (this.msg != null) { + checkForBucketMovement(oneResult); try { - this.msg.sendReplyForOneResult(dm, pr, time, oneResult, true, enableOrderedResultStreming); + if(this.bme !=null){ + this.msg.sendReplyForOneResult(dm, pr, time, oneResult, false, enableOrderedResultStreming); + throw bme; + }else{ + this.msg.sendReplyForOneResult(dm, pr, time, oneResult, true, enableOrderedResultStreming); + } + } catch (ForceReattemptException e) { throw new FunctionException(e); @@ -164,7 +194,14 @@ public final class PartitionedRegionFunctionResultSender implements return; } if (onlyLocal) { - this.rc.addResult(dm.getDistributionManagerId(), oneResult); + checkForBucketMovement(oneResult); + if (bme != null) { + this.rc.addResult(dm.getDistributionManagerId(), oneResult); + this.rc.addResult(dm.getDistributionManagerId(), bme); + }else{ + this.rc.addResult(dm.getDistributionManagerId(), oneResult); + } + // exception thrown will do end result this.rc.endResults(); this.localLastResultRecieved = true; } @@ -199,31 +236,74 @@ public final class PartitionedRegionFunctionResultSender implements if (this.serverSender != null) { // Client-Server if (this.completelyDoneFromRemote && this.localLastResultRecieved) { - lastClientSend(memberID, oneResult); + if(lastLocalResult){ + checkForBucketMovement(oneResult); + if (bme != null) { + clientSend(oneResult, dm.getDistributionManagerId()); + lastClientSend(dm.getDistributionManagerId(), bme); + }else{ + lastClientSend(memberID, oneResult); + } + }else{ + lastClientSend(memberID, oneResult); + } + collector.endResults(); } else { - clientSend(oneResult, memberID); + if(lastLocalResult){ + checkForBucketMovement(oneResult); + if (bme != null) { + clientSend(oneResult, memberID); + clientSend(bme,memberID); + }else{ + clientSend(oneResult, memberID); + } + }else{ + clientSend(oneResult, memberID); + } + } } else { // P2P if (this.completelyDoneFromRemote && this.localLastResultRecieved) { - collector.addResult(memberID, oneResult); + if(lastLocalResult){ + checkForBucketMovement(oneResult); + if(bme!=null){ + collector.addResult(memberID, oneResult); + collector.addResult(memberID, bme); + }else{ + collector.addResult(memberID, oneResult); + } + }else{ + collector.addResult(memberID, oneResult); + } collector.endResults(); } else { - collector.addResult(memberID, oneResult); + if(lastLocalResult){ + checkForBucketMovement(oneResult); + if(bme!=null){ + collector.addResult(memberID, oneResult); + collector.addResult(memberID, bme); + }else{ + collector.addResult(memberID, oneResult); + } + } + else{ + collector.addResult(memberID, oneResult); + } } } } - public void lastResult(Object oneResult, boolean completelyDone, + public synchronized void lastResult(Object oneResult, boolean completelyDoneFromRemote, ResultCollector reply, DistributedMember memberID) { logger.debug("PartitionedRegionFunctionResultSender Sending lastResult {}", oneResult); if (this.serverSender != null) { // Client-Server - if (completelyDone) { + if (completelyDoneFromRemote) { if (this.onlyRemote) { lastClientSend(memberID, oneResult); reply.endResults(); @@ -238,7 +318,7 @@ public final class PartitionedRegionFunctionResultSender implements } } else{ - if (completelyDone) { + if (completelyDoneFromRemote) { if (this.onlyRemote) { reply.addResult(memberID, oneResult); reply.endResults(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java index 9a3b5d8..36c79c0 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java @@ -80,9 +80,10 @@ public class ServerToClientFunctionResultSender implements ResultSender { this.fn = function; this.authContext = authzContext; this.isSelector = sc.getAcceptor().isSelector(); + if(this.isSelector){ this.commBuffer = msg.getCommBuffer(); - } + } } public synchronized void lastResult(Object oneResult) { @@ -96,6 +97,9 @@ public class ServerToClientFunctionResultSender implements ResultSender { if(this.lastResultReceived){ return; } + if (logger.isDebugEnabled()) { + logger.debug("ServerToClientFunctionResultSender sending last result1 {} " + oneResult); + } try { authorizeResult(oneResult); if (!this.fn.hasResult()) { @@ -137,6 +141,9 @@ public class ServerToClientFunctionResultSender implements ResultSender { } return; } + if (logger.isDebugEnabled()) { + logger.debug("ServerToClientFunctionResultSender sending last result2 {} " + oneResult); + } try { authorizeResult(oneResult); if (!this.fn.hasResult()) { @@ -175,6 +182,9 @@ public class ServerToClientFunctionResultSender implements ResultSender { } return; } + if (logger.isDebugEnabled()) { + logger.debug("ServerToClientFunctionResultSender sending result1 {} " + oneResult); + } try { authorizeResult(oneResult); if (!this.fn.hasResult()) { @@ -211,6 +221,9 @@ public class ServerToClientFunctionResultSender implements ResultSender { } return; } + if (logger.isDebugEnabled()) { + logger.debug("ServerToClientFunctionResultSender sending result2 {} " + oneResult); + } try { authorizeResult(oneResult); if (!this.fn.hasResult()) { @@ -263,6 +276,9 @@ public class ServerToClientFunctionResultSender implements ResultSender { } protected void sendHeader() throws IOException { + if (logger.isDebugEnabled()) { + logger.debug("ServerToClientFunctionResultSender sending header"); + } this.setBuffer(); this.msg.setMessageType(messageType); this.msg.setLastChunk(false); @@ -280,6 +296,9 @@ public class ServerToClientFunctionResultSender implements ResultSender { public synchronized void setException(Throwable exception) { this.lastResultReceived = true; + if (logger.isDebugEnabled()) { + logger.debug("ServerToClientFunctionResultSender setting exception {} ", exception); + } synchronized (this.msg) { if (!this.sc.getTransientFlag(Command.RESPONDED)) { alreadySendException.set(true); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java index 4cc80a3..5ac75c6 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java @@ -120,6 +120,9 @@ public class ServerToClientFunctionResultSender65 extends return; } try { + if (logger.isDebugEnabled()) { + logger.debug("ServerToClientFunctionResultSender sending last result2 {} " + oneResult); + } authorizeResult(oneResult); if (!this.fn.hasResult()) { throw new IllegalStateException( http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java index 5fe869a..7bbf8e7 100755 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java @@ -93,7 +93,6 @@ public class PRFunctionStreamingResultCollector extends FunctionStreamingResult if (this.hasResult) { try { this.waitForCacheOrFunctionException(0); - if (!this.execution.getFailedNodes().isEmpty() && !this.execution.isClientServerMode()) { // end the rc and clear it @@ -116,6 +115,7 @@ public class PRFunctionStreamingResultCollector extends FunctionStreamingResult catch (FunctionInvocationTargetException fite) { // this is case of WrapperException which enforce the re execution of // the function. + if(!execution.getWaitOnExceptionFlag()) { if (!this.fn.isHA()) { throw new FunctionException(fite); } @@ -137,8 +137,10 @@ public class PRFunctionStreamingResultCollector extends FunctionStreamingResult } return newRc.getResult(); } + } } catch (BucketMovedException e) { + if(!execution.getWaitOnExceptionFlag()){ if (!this.fn.isHA()) { //endResults(); FunctionInvocationTargetException fite = new FunctionInvocationTargetException( @@ -165,8 +167,10 @@ public class PRFunctionStreamingResultCollector extends FunctionStreamingResult } return newRc.getResult(); } + } } catch (CacheClosedException e) { + if(!execution.getWaitOnExceptionFlag()) { if (!this.fn.isHA()) { //endResults(); FunctionInvocationTargetException fite = new FunctionInvocationTargetException(e.getMessage()); @@ -192,6 +196,7 @@ public class PRFunctionStreamingResultCollector extends FunctionStreamingResult } return newRc.getResult(); } + } } catch (CacheException e) { //endResults(); @@ -417,11 +422,19 @@ public class PRFunctionStreamingResultCollector extends FunctionStreamingResult logger.debug("StreamingPartitionResponseWithResultCollector received exception {} from member {}", ex.getCause(), msg.getSender()); // we have already forwarded the exception, no need to keep it here - if (execution.isForwardExceptions()) { + if (execution.isForwardExceptions() || execution.getWaitOnExceptionFlag()) { return; } - if (ex.getCause() instanceof CacheClosedException) { + /** + * Below two cases should also be handled + * and not thrown exception + * Saving the exception + * ForeceReattempt can also be added here? + * Also, if multipel nodes throw exception, one may override another + * TODO: Wrap exception among each other or create a list of exceptions like this.fite. + */ + if ( ex.getCause() instanceof CacheClosedException) { ((PartitionedRegionFunctionExecutor)this.execution).addFailedNode(msg .getSender().getId()); this.exception = ex; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java index 528e462..c7defdc 100644 --- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java +++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java @@ -134,6 +134,9 @@ public class ExecuteFunction70 extends ExecuteFunction66 { } ((AbstractExecution)execution).setIgnoreDepartedMembers(true); } + if(!functionObject.isHA()) { + ((AbstractExecution)execution).setWaitOnExceptionFlag(true); + } if (function instanceof String) { execution.execute(functionObject.getId()).getResult(); } else { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java index d1b1aa5..5fb6427 100755 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java @@ -71,6 +71,7 @@ public class TestFunction extends FunctionAdapter implements Declarable2 { public static final String TEST_FUNCTION_SOCKET_TIMEOUT = "SocketTimeOutFunction"; public static final String TEST_FUNCTION_TIMEOUT = "executeTimeOut"; public static final String TEST_FUNCTION_HA = "executeFunctionHA"; + public static final String TEST_FUNCTION_NONHA = "executeFunctionNonHA"; public static final String TEST_FUNCTION_HA_SERVER = "executeFunctionHAOnServer"; public static final String TEST_FUNCTION_NONHA_SERVER = "executeFunctionNonHAOnServer"; public static final String TEST_FUNCTION_NONHA_REGION = "executeFunctionNonHAOnRegion"; @@ -165,6 +166,9 @@ public class TestFunction extends FunctionAdapter implements Declarable2 { else if(id.equals(TEST_FUNCTION_HA)){ executeHA(context); } + else if(id.equals(TEST_FUNCTION_NONHA)){ + executeHA(context); + } else if(id.equals(TEST_FUNCTION_HA_SERVER) || id.equals(TEST_FUNCTION_NONHA_SERVER)){ executeHAAndNonHAOnServer(context); } @@ -1136,10 +1140,11 @@ public class TestFunction extends FunctionAdapter implements Declarable2 { @Override public boolean isHA() { + if (getId().equals(TEST_FUNCTION10)) { return true; } - if (getId().equals(TEST_FUNCTION_NONHA_SERVER) || getId().equals(TEST_FUNCTION_NONHA_REGION) || getId().equals(TEST_FUNCTION_NONHA_NOP)) { + if (getId().equals(TEST_FUNCTION_NONHA_SERVER) || getId().equals(TEST_FUNCTION_NONHA_REGION) || getId().equals(TEST_FUNCTION_NONHA_NOP) || getId().equals(TEST_FUNCTION_NONHA)) { return false; } return Boolean.valueOf(this.props.getProperty(HAVE_RESULTS)).booleanValue();
