GEODE-1760 Sending partial results to users ResultCollector

For non-HA functions on the client, results will now be passed to the
users result collector, even if some nodes fail due to a bucket movement
or cache close.

Signed-off-by: Dan Smith <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/dbf41d1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/dbf41d1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/dbf41d1f

Branch: refs/heads/feature/GEODE-420
Commit: dbf41d1facddb2e0f69657b3e0e1e82161f7fa33
Parents: 6a91b9d
Author: Suranjan Kumar <[email protected]>
Authored: Fri May 29 15:29:28 2015 +0530
Committer: Dan Smith <[email protected]>
Committed: Wed Aug 17 14:23:04 2016 -0700

----------------------------------------------------------------------
 .../internal/ExecuteRegionFunctionOp.java       | 104 ++++++++++++++++-
 .../ExecuteRegionFunctionSingleHopOp.java       |  19 +++-
 .../internal/SingleHopClientExecutor.java       |  44 +++++---
 .../internal/cache/PartitionedRegion.java       |  13 ++-
 .../cache/execute/AbstractExecution.java        |  10 ++
 .../FunctionStreamingResultCollector.java       |  18 ++-
 .../cache/execute/LocalResultCollectorImpl.java |   8 +-
 .../PartitionedRegionFunctionResultSender.java  | 112 ++++++++++++++++---
 .../ServerToClientFunctionResultSender.java     |  21 +++-
 .../ServerToClientFunctionResultSender65.java   |   3 +
 .../PRFunctionStreamingResultCollector.java     |  19 +++-
 .../tier/sockets/command/ExecuteFunction70.java |   3 +
 .../internal/cache/functions/TestFunction.java  |   7 +-
 13 files changed, 329 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
index d154020..9d8dd7d 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -24,16 +24,19 @@ import java.util.Set;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.client.ServerConnectivityException;
 import com.gemstone.gemfire.cache.client.ServerOperationException;
 import 
com.gemstone.gemfire.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
 import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
 import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
 import com.gemstone.gemfire.internal.cache.execute.MemberMappedArgument;
@@ -84,6 +87,7 @@ public class ExecuteRegionFunctionOp {
     
     final boolean isDebugEnabled =logger.isDebugEnabled();
     do {
+      
     try {
         if (reexecuteForServ) {
           reexecOp = new ExecuteRegionFunctionOpImpl(
@@ -319,8 +323,12 @@ public class ExecuteRegionFunctionOp {
     private Set<String> failedNodes = new HashSet<String>();
 
     private final String functionId;
+
     private final boolean executeOnBucketSet;
+    
+    private final boolean isHA;
 
+    private FunctionException functionException;
 
     public ExecuteRegionFunctionOpImpl(String region, Function function,
         ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc,
@@ -364,6 +372,7 @@ public class ExecuteRegionFunctionOp {
       this.executor = serverRegionExecutor;
       this.hasResult = functionState;
       this.failedNodes = removedNodes;
+      this.isHA = function.isHA();
     }
     
     public ExecuteRegionFunctionOpImpl(String region, String function,
@@ -408,6 +417,7 @@ public class ExecuteRegionFunctionOp {
       this.executor = serverRegionExecutor;
       this.hasResult = functionState;
       this.failedNodes = removedNodes;
+      this.isHA = isHA;
     }
 
     public ExecuteRegionFunctionOpImpl(
@@ -431,6 +441,7 @@ public class ExecuteRegionFunctionOp {
       this.hasResult = op.hasResult;
       this.failedNodes = op.failedNodes;
       this.executeOnBucketSet = op.executeOnBucketSet;
+      this.isHA = op.isHA;
       if (isReExecute == 1) {
         this.resultCollector.endResults();
         this.resultCollector.clearResults();
@@ -501,6 +512,9 @@ public class ExecuteRegionFunctionOp {
               else {
                 result = resultResponse;
               }
+              
+              // if the function is HA throw exceptions
+              // if nonHA collect these exceptions and wait till you get last 
chunk
               if (result instanceof FunctionException) {
                 FunctionException ex = ((FunctionException)result);
                 if (ex instanceof InternalFunctionException) {
@@ -518,14 +532,90 @@ public class ExecuteRegionFunctionOp {
                   InternalFunctionInvocationTargetException ifite = 
(InternalFunctionInvocationTargetException)ex
                       .getCause();
                   this.failedNodes.addAll(ifite.getFailedNodeSet());
+                  if (this.functionException == null) {
+                    this.functionException = (FunctionException)result;
+                  }
+                  
this.functionException.addException((FunctionException)result);
+                }
+                else if(((FunctionException) result).getCause() instanceof 
FunctionInvocationTargetException){
+                  if (this.functionException == null) {
+                    this.functionException = ex;
+                  }
+                  this.functionException.addException(ex.getCause());
+                }
+                else if(result instanceof FunctionInvocationTargetException){
+                  if (this.functionException == null) {
+                    this.functionException = new 
FunctionException((FunctionInvocationTargetException)result);
+                  }
+                  this.functionException.addException(ex);
+                }
+                else if(result instanceof 
InternalFunctionInvocationTargetException){
+                  if (this.functionException == null) {
+                    this.functionException = new 
FunctionException((InternalFunctionInvocationTargetException)result);
+                  }
+                  this.functionException.addException(ex);
+                }
+                else{
+                  if (this.functionException == null) {
+                    this.functionException = ex;
+                  }
+                  this.functionException.addException(ex);
+                }
+                if (isHA) {
+                  executeFunctionResponseMsg.clear();
+                  throw ex;
                 }
-                executeFunctionResponseMsg.clear();
-                throw ex;
               }
               else if (result instanceof Throwable) {
-                String s = "While performing a remote " + getOpName();
-                executeFunctionResponseMsg.clear();
-                throw new ServerOperationException(s, (Throwable)result);
+                Throwable t = (Throwable)result;
+                boolean throwServerOp = false;
+                  if (this.functionException == null) {
+                    if(result instanceof BucketMovedException){
+                      FunctionInvocationTargetException fite;
+                      if(isHA){
+                        fite = new InternalFunctionInvocationTargetException(
+                                              
((BucketMovedException)result).getMessage());
+                      }else {
+                        fite = new FunctionInvocationTargetException(
+                            ((BucketMovedException)result).getMessage());
+                      }
+                      this.functionException =  new FunctionException(fite);
+                      this.functionException.addException(fite);
+                    }
+                    else if (result instanceof CacheClosedException) {
+                      FunctionInvocationTargetException fite;
+                      if(isHA) {
+                        fite = new 
InternalFunctionInvocationTargetException(((CacheClosedException)result).getMessage());
+                      }
+                      else{
+                        fite = new 
FunctionInvocationTargetException(((CacheClosedException)result).getMessage());
+                      }
+                      if (resultResponse instanceof ArrayList) {
+                        DistributedMember memberID = (DistributedMember) 
((ArrayList) resultResponse)
+                            .get(1);
+                        this.failedNodes.add(memberID.getId());
+                      }                   
+                      this.functionException = new FunctionException(fite);
+                      this.functionException.addException(fite);
+                    }
+                    else {
+                      throwServerOp = true;
+                      this.functionException = new FunctionException(t);
+                      this.functionException.addException(t);
+                    }
+                  } else {
+                    this.functionException.addException(t);
+                  }
+                  
+                  if (isHA) {
+                    if (throwServerOp) {
+                      String s = "While performing a remote " + getOpName();
+                      executeFunctionResponseMsg.clear();
+                      throw new ServerOperationException(s, 
this.functionException);
+                    } else {
+                       throw this.functionException;
+                    }
+                  }
               }
               else {
                 DistributedMember memberID = 
(DistributedMember)((ArrayList)resultResponse)
@@ -538,6 +628,10 @@ public class ExecuteRegionFunctionOp {
             if (isDebugEnabled) {
               logger.debug("ExecuteRegionFunctionOpImpl#processResponse: 
received all the results from server successfully.");
             }
+            // add all the exceptions here.
+            if (this.functionException != null) {
+              throw this.functionException;
+            }
             this.resultCollector.endResults();
             return null;
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
index ce7c3f8..768de5f 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -25,10 +25,12 @@ import java.util.Set;
 import org.apache.logging.log4j.Logger;
 
 import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.client.ServerOperationException;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.ServerLocation;
@@ -36,6 +38,7 @@ import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.cache.LocalRegion;
 import com.gemstone.gemfire.internal.cache.execute.AbstractExecution;
+import com.gemstone.gemfire.internal.cache.execute.BucketMovedException;
 import com.gemstone.gemfire.internal.cache.execute.FunctionStats;
 import com.gemstone.gemfire.internal.cache.execute.InternalFunctionException;
 import 
com.gemstone.gemfire.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -346,7 +349,6 @@ public class ExecuteRegionFunctionSingleHopOp {
               else {
                 result = resultResponse;
               }
-
               if (result instanceof FunctionException) {
                 FunctionException ex = ((FunctionException)result);
                 if (isDebugEnabled) {
@@ -373,6 +375,21 @@ public class ExecuteRegionFunctionSingleHopOp {
 
                 return null;
               }
+              else if(result instanceof BucketMovedException) {
+                FunctionInvocationTargetException fite = new 
InternalFunctionInvocationTargetException(
+                    ((BucketMovedException)result).getMessage());
+                throw new FunctionException(fite);
+              }
+              else if(result instanceof CacheClosedException) {
+                FunctionInvocationTargetException fite = new 
InternalFunctionInvocationTargetException(
+                    ((CacheClosedException)result).getMessage());
+                if (resultResponse instanceof ArrayList) {
+                  DistributedMember memberID = (DistributedMember) 
((ArrayList) resultResponse)
+                      .get(1);
+                  this.failedNodes.add(memberID.getId());
+                }                
+                throw new FunctionException(fite);
+              }
               else if (result instanceof Throwable) {
                 String s = "While performing a remote " + getOpName();
                 throw new ServerOperationException(s, (Throwable)result);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
index 30cac5b..c13cfea 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/SingleHopClientExecutor.java
@@ -29,9 +29,9 @@ import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.logging.log4j.Logger;
 
+import com.gemstone.gemfire.GemFireException;
 import com.gemstone.gemfire.InternalGemFireException;
 import com.gemstone.gemfire.cache.CacheClosedException;
 import com.gemstone.gemfire.cache.client.ServerConnectivityException;
@@ -128,6 +128,7 @@ public class SingleHopClientExecutor {
         throw new InternalGemFireException(e.getMessage());
       }
       if (futures != null) {
+        GemFireException functionExecutionException = null;
         Iterator futureItr = futures.iterator();
         Iterator taskItr = callableTasks.iterator();
         final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -161,31 +162,42 @@ public class SingleHopClientExecutor {
               reexecute = true;
               failedNodes.addAll(((InternalFunctionInvocationTargetException)ee
                   .getCause()).getFailedNodeSet());
-              rc.clearResults();
-              if (!isHA) {
+              // Clear the results only if isHA so that partial results can be 
returned.
+              if (isHA) {
+                rc.clearResults();
+              } else {
                 if (ee.getCause().getCause() != null) {
-                  throw new FunctionInvocationTargetException(ee.getCause()
-                      .getCause());
+                  functionExecutionException = new 
FunctionInvocationTargetException(
+                      ee.getCause().getCause());
                 } else {
-                  throw new FunctionInvocationTargetException(
+                  functionExecutionException = new 
FunctionInvocationTargetException(
                       new BucketMovedException(
                           
LocalizedStrings.FunctionService_BUCKET_MIGRATED_TO_ANOTHER_NODE
                               .toLocalizedString()));
                 }
               }
-                
             }
             else if (ee.getCause() instanceof FunctionException) {
               if (isDebugEnabled) {
                 
logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.FunctionException
 : Caused by :{}", ee.getCause());
               }
-              throw (FunctionException)ee.getCause();
+              FunctionException fe = (FunctionException)ee.getCause();
+              if (isHA) {
+                throw fe;
+              } else {
+                functionExecutionException = fe;
+              }
             }
             else if (ee.getCause() instanceof ServerOperationException) {
               if (isDebugEnabled) {
                 
logger.debug("ExecuteRegionFunctionSingleHopOp#ExecutionException.ServerOperationException
 : Caused by :{}", ee.getCause());
               }
-              throw (ServerOperationException)ee.getCause();
+              ServerOperationException soe = 
(ServerOperationException)ee.getCause();
+              if (isHA) {
+                throw soe;
+              } else {
+                functionExecutionException = soe;
+              }
             }
             else if (ee.getCause() instanceof ServerConnectivityException) {
               if (isDebugEnabled) {
@@ -199,11 +211,12 @@ public class SingleHopClientExecutor {
               }
               cms.removeBucketServerLocation(server);
               cms.scheduleGetPRMetaData(region, false);
-              reexecute = true;
-              rc.clearResults();
-              if (!isHA) {
-                reexecute = false;
-                throw (ServerConnectivityException) ee.getCause();
+              // Clear the results only if isHA so that partial results can be 
returned.
+              if (isHA) {
+                reexecute = true;
+                rc.clearResults();
+              } else {
+                functionExecutionException = (ServerConnectivityException) 
ee.getCause();
               }
             }
             else {
@@ -211,6 +224,9 @@ public class SingleHopClientExecutor {
             }
           }
         }
+        if (functionExecutionException != null) {
+          throw functionExecutionException;
+        }
       }
     }
     return reexecute;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index 9ac95a1..a87ed9c 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -3546,6 +3546,9 @@ public class PartitionedRegion extends LocalRegion 
implements
                                           localBucketSet,
                                           resultSender,
                                           execution.isReExecute());
+      if (logger.isDebugEnabled()) {
+        logger.debug("FunctionService: Executing on local node with keys.{}"+  
localKeys);
+      }
       execution.executeFunctionOnLocalPRNode(function, prContext, 
resultSender, dm, isTX());
     }
 
@@ -3563,7 +3566,9 @@ public class PartitionedRegion extends LocalRegion 
implements
                 execution.isFnSerializationReqd());
         recipMap.put(recip, context);
       }
-
+      if (logger.isDebugEnabled()) {
+        logger.debug("FunctionService: Executing on remote nodes with member 
to keys map.{}" + memberToKeysMap);
+      }
       PartitionedRegionFunctionResultWaiter resultReciever = new 
PartitionedRegionFunctionResultWaiter(
           getSystem(), this.getPRId(), localResultCollector, function,
           resultSender);
@@ -3674,9 +3679,9 @@ public class PartitionedRegion extends LocalRegion 
implements
         .singleton(targetNode);
     execution.validateExecution(function, singleMember);
     execution.setExecutionNodes(singleMember);
-    if (targetNode.equals(localVm)) {
-      final LocalResultCollector<?, ?> localRC = execution
+    LocalResultCollector<?, ?> localRC = execution
           .getLocalResultCollector(function, rc);
+    if (targetNode.equals(localVm)) {
       final DM dm = getDistributionManager();
       PartitionedRegionFunctionResultSender resultSender = new 
PartitionedRegionFunctionResultSender(
           dm, PartitionedRegion.this, 0, localRC, 
execution.getServerResultSender(), true, false, 
execution.isForwardExceptions(), function, buckets);
@@ -3690,7 +3695,7 @@ public class PartitionedRegion extends LocalRegion 
implements
     }
     else {
       return executeFunctionOnRemoteNode(targetNode, function, execution
-          .getArgumentsForMember(targetNode.getId()), routingKeys, rc, buckets,
+          .getArgumentsForMember(targetNode.getId()), routingKeys, 
function.isHA()? rc :localRC, buckets,
           execution.getServerResultSender(), execution);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java
index 2e981a8..852301c 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/AbstractExecution.java
@@ -37,6 +37,7 @@ import 
com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
 import com.gemstone.gemfire.cache.execute.FunctionService;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
 import com.gemstone.gemfire.cache.execute.ResultSender;
+import com.gemstone.gemfire.cache.query.QueryInvalidException;
 import com.gemstone.gemfire.distributed.internal.DM;
 import com.gemstone.gemfire.distributed.internal.DistributionManager;
 import 
com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
@@ -420,6 +421,7 @@ public abstract class AbstractExecution implements 
InternalExecution {
   }
 
   public final void setWaitOnExceptionFlag(boolean waitOnException) {
+    this.setForwardExceptions(waitOnException);
     this.waitOnException = waitOnException;
   }
 
@@ -604,6 +606,14 @@ public abstract class AbstractExecution implements 
InternalExecution {
     stats.endFunctionExecutionWithException(fn.hasResult());
     if (fn.hasResult()) {
       if (waitOnException || forwardExceptions) {
+        if(functionException instanceof FunctionException
+            && functionException.getCause() instanceof QueryInvalidException) {
+          // Handle this exception differently since it can contain
+          // non-serializable objects.
+          // java.io.NotSerializableException: antlr.CommonToken
+          // create a new FunctionException on the original one's message (not 
cause).
+          functionException = new 
FunctionException(functionException.getLocalizedMessage());
+        }
         sender.lastResult(functionException);
       } else {
         ((InternalResultSender)sender).setException(functionException);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java
index 7ed908e..359ae6b 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/FunctionStreamingResultCollector.java
@@ -545,7 +545,8 @@ public class FunctionStreamingResultCollector extends 
ReplyProcessor21 implement
       }
       else {
         if (execution.forwardExceptions || (execution.waitOnException 
-            && !(m.getException().getCause() instanceof 
BucketMovedException))) {
+            /*&& !(m.getException().getCause() instanceof 
BucketMovedException)*/)) {
+          // send BucketMovedException forward which will be handled by 
LocalResultCollectorImpl
           synchronized (processSingleResult) {
             this.functionResultWaiter.processData(m.getException().getCause(), 
true, msg.getSender());
           }
@@ -607,16 +608,16 @@ public class FunctionStreamingResultCollector extends 
ReplyProcessor21 implement
   @Override
   protected synchronized void processException(ReplyException ex) {
     // we have already forwarded the exception, no need to keep it here
-    if (execution.isForwardExceptions()) {
+    if (execution.isForwardExceptions() || this.execution.waitOnException) {
       return;
     }
     
     // have to keep all the exception
     // rest exception will be added to localresultcollector and it will throw
     // them
-    if (ex.getCause() instanceof CacheClosedException
+    if ((ex.getCause() instanceof CacheClosedException
         || ex.getCause() instanceof ForceReattemptException
-        || ex.getCause() instanceof BucketMovedException) {
+        || ex.getCause() instanceof BucketMovedException)) {
       this.exception = ex;
     }
     else if (!execution.getWaitOnExceptionFlag()) {
@@ -626,7 +627,14 @@ public class FunctionStreamingResultCollector extends 
ReplyProcessor21 implement
 
   @Override
   protected boolean stopBecauseOfExceptions() {
-    if (this.execution.isIgnoreDepartedMembers()) {
+    if (this.execution.isIgnoreDepartedMembers() ) {
+      return false;
+    }
+    // in case of waitOnException : keep processing
+    // the reply from other nodes
+    // this exception will be saved in this.exception 
+    // which will be thrown at the end
+    if(this.execution.waitOnException) {
       return false;
     }
     return super.stopBecauseOfExceptions();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java
index 23eb70a..ce06b78 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/LocalResultCollectorImpl.java
@@ -22,9 +22,11 @@ import java.util.concurrent.TimeUnit;
 import com.gemstone.gemfire.cache.execute.Execution;
 import com.gemstone.gemfire.cache.execute.Function;
 import com.gemstone.gemfire.cache.execute.FunctionException;
+import com.gemstone.gemfire.cache.execute.FunctionInvocationTargetException;
 import com.gemstone.gemfire.cache.execute.ResultCollector;
 import com.gemstone.gemfire.distributed.DistributedMember;
 import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 
 public final class LocalResultCollectorImpl implements LocalResultCollector {
@@ -45,7 +47,6 @@ public final class LocalResultCollectorImpl implements 
LocalResultCollector {
   
   private AbstractExecution execution = null;
 
-
   public LocalResultCollectorImpl(Function function, ResultCollector rc,
       Execution execution) {
     this.function = function;
@@ -69,7 +70,10 @@ public final class LocalResultCollectorImpl implements 
LocalResultCollector {
         } else {
           if (!(t instanceof InternalFunctionException)) {
             if (this.functionException == null) {
-              if (resultOfSingleExecution instanceof FunctionException) {
+              if(resultOfSingleExecution instanceof 
FunctionInvocationTargetException){
+                this.functionException = new FunctionException(t);
+              }
+              else if (resultOfSingleExecution instanceof FunctionException) {
                 this.functionException = 
(FunctionException)resultOfSingleExecution;
                 if (t.getCause() != null) {
                   t = t.getCause();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java
index d16e9b5..3a3e6e3 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/PartitionedRegionFunctionResultSender.java
@@ -75,6 +75,8 @@ public final class PartitionedRegionFunctionResultSender 
implements
 
   private Set<Integer> bucketSet;
   
+  private BucketMovedException bme;
+  
   /**
    * Have to combine next two constructor in one and make a new class which 
will 
    * send Results back.
@@ -120,26 +122,47 @@ public final class PartitionedRegionFunctionResultSender 
implements
     this.bucketSet = bucketSet;
   }
 
+  private void checkForBucketMovement(Object oneResult){
+    if (!(forwardExceptions && oneResult instanceof Throwable) 
+        && !pr.getDataStore().areAllBucketsHosted(bucketSet)) {
+      // making sure that we send all the local results first
+      // before sending this exception to client
+      bme =  new BucketMovedException(
+          LocalizedStrings.FunctionService_BUCKET_MIGRATED_TO_ANOTHER_NODE
+              .toLocalizedString());
+      if(function.isHA()){
+        throw bme;
+      }
+    }
+    
+    
+  }
+  
+  // this must be getting called directly from function
   public void lastResult(Object oneResult) {
     if (!this.function.hasResult()) {
       throw new IllegalStateException(
           LocalizedStrings.ExecuteFunction_CANNOT_0_RESULTS_HASRESULT_FALSE
               .toLocalizedString("send"));
     }
-    if (!(forwardExceptions && oneResult instanceof Throwable) 
-        && !pr.getDataStore().areAllBucketsHosted(bucketSet)) {
-      throw new BucketMovedException(
-          LocalizedStrings.FunctionService_BUCKET_MIGRATED_TO_ANOTHER_NODE
-              .toLocalizedString());
-    }
+    // this could be done before doing end result
+    // so that client receives all the results before
+
     if (this.serverSender != null) { // Client-Server
       if(this.localLastResultRecieved){
         return;
       }
       if (onlyLocal) {
-        lastClientSend(dm.getDistributionManagerId(), oneResult);
+        checkForBucketMovement(oneResult);
+        if (bme != null) {
+          clientSend(oneResult, dm.getDistributionManagerId());
+          lastClientSend(dm.getDistributionManagerId(), bme);
+        }else{
+          lastClientSend(dm.getDistributionManagerId(), oneResult);
+        }
         this.rc.endResults();
         this.localLastResultRecieved = true;
+        
       }
       else {
       //call a synchronized method as local node is also waiting to send 
lastResult 
@@ -149,8 +172,15 @@ public final class PartitionedRegionFunctionResultSender 
implements
     else { // P2P
 
       if (this.msg != null) {
+        checkForBucketMovement(oneResult);
         try {          
-          this.msg.sendReplyForOneResult(dm, pr, time, oneResult, true, 
enableOrderedResultStreming);
+          if(this.bme !=null){
+            this.msg.sendReplyForOneResult(dm, pr, time, oneResult, false, 
enableOrderedResultStreming);
+            throw bme;
+          }else{
+            this.msg.sendReplyForOneResult(dm, pr, time, oneResult, true, 
enableOrderedResultStreming);  
+          }
+          
         }
         catch (ForceReattemptException e) {
           throw new FunctionException(e);
@@ -164,7 +194,14 @@ public final class PartitionedRegionFunctionResultSender 
implements
           return;
         }
         if (onlyLocal) {
-          this.rc.addResult(dm.getDistributionManagerId(), oneResult);
+          checkForBucketMovement(oneResult);
+          if (bme != null) {
+            this.rc.addResult(dm.getDistributionManagerId(), oneResult);
+            this.rc.addResult(dm.getDistributionManagerId(), bme);
+          }else{
+            this.rc.addResult(dm.getDistributionManagerId(), oneResult);
+          }
+          // exception thrown will do end result
           this.rc.endResults();
           this.localLastResultRecieved = true;
         }
@@ -199,31 +236,74 @@ public final class PartitionedRegionFunctionResultSender 
implements
     
     if (this.serverSender != null) { // Client-Server
       if (this.completelyDoneFromRemote && this.localLastResultRecieved) {
-        lastClientSend(memberID, oneResult);
+        if(lastLocalResult){
+          checkForBucketMovement(oneResult);
+          if (bme != null) {
+            clientSend(oneResult, dm.getDistributionManagerId());
+            lastClientSend(dm.getDistributionManagerId(), bme);
+          }else{
+            lastClientSend(memberID, oneResult);
+          }
+        }else{
+          lastClientSend(memberID, oneResult);
+        }
+            
         collector.endResults();
       }
       else {
-        clientSend(oneResult, memberID);
+        if(lastLocalResult){
+          checkForBucketMovement(oneResult);
+          if (bme != null) {
+            clientSend(oneResult, memberID);
+            clientSend(bme,memberID);
+          }else{
+            clientSend(oneResult, memberID);
+          }
+        }else{
+          clientSend(oneResult, memberID);  
+        }
+        
       }
     }
     else { // P2P
       if (this.completelyDoneFromRemote && this.localLastResultRecieved) {
-        collector.addResult(memberID, oneResult);
+        if(lastLocalResult){
+          checkForBucketMovement(oneResult);
+          if(bme!=null){
+            collector.addResult(memberID, oneResult);
+            collector.addResult(memberID, bme);
+          }else{
+            collector.addResult(memberID, oneResult);
+          }
+        }else{
+          collector.addResult(memberID, oneResult);
+        }
         collector.endResults();
       }
       else {
-        collector.addResult(memberID, oneResult);
+        if(lastLocalResult){
+          checkForBucketMovement(oneResult);
+          if(bme!=null){
+            collector.addResult(memberID, oneResult);
+            collector.addResult(memberID, bme);
+          }else{
+            collector.addResult(memberID, oneResult);
+          }
+        }
+        else{
+          collector.addResult(memberID, oneResult);
+        }
       }
     }
   }
 
-  public void lastResult(Object oneResult, boolean completelyDone,
+  public synchronized void lastResult(Object oneResult, boolean 
completelyDoneFromRemote,
       ResultCollector reply, DistributedMember memberID) {
     logger.debug("PartitionedRegionFunctionResultSender Sending lastResult 
{}", oneResult);
 
     if (this.serverSender != null) { // Client-Server
       
-      if (completelyDone) {
+      if (completelyDoneFromRemote) {
         if (this.onlyRemote) {
           lastClientSend(memberID, oneResult);
           reply.endResults();
@@ -238,7 +318,7 @@ public final class PartitionedRegionFunctionResultSender 
implements
       }
     }
     else{
-      if (completelyDone) {
+      if (completelyDoneFromRemote) {
         if (this.onlyRemote) {
           reply.addResult(memberID, oneResult);
           reply.endResults();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java
index 9a3b5d8..36c79c0 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender.java
@@ -80,9 +80,10 @@ public class ServerToClientFunctionResultSender implements 
ResultSender {
     this.fn = function;
     this.authContext = authzContext;
     this.isSelector = sc.getAcceptor().isSelector();
+    
     if(this.isSelector){
       this.commBuffer = msg.getCommBuffer();  
-    }     
+    }
   }
 
   public synchronized void lastResult(Object oneResult) {
@@ -96,6 +97,9 @@ public class ServerToClientFunctionResultSender implements 
ResultSender {
     if(this.lastResultReceived){
       return;
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("ServerToClientFunctionResultSender sending last result1 {} 
" + oneResult);
+    }
     try {
       authorizeResult(oneResult);
       if (!this.fn.hasResult()) {
@@ -137,6 +141,9 @@ public class ServerToClientFunctionResultSender implements 
ResultSender {
       }
       return;
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("ServerToClientFunctionResultSender sending last result2 {} 
" + oneResult);
+    }
     try {
       authorizeResult(oneResult);
       if (!this.fn.hasResult()) {
@@ -175,6 +182,9 @@ public class ServerToClientFunctionResultSender implements 
ResultSender {
       }
       return;
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("ServerToClientFunctionResultSender sending result1 {} " + 
oneResult);
+    }
     try {
       authorizeResult(oneResult);
       if (!this.fn.hasResult()) {
@@ -211,6 +221,9 @@ public class ServerToClientFunctionResultSender implements 
ResultSender {
       }
       return;
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("ServerToClientFunctionResultSender sending result2 {} " + 
oneResult);
+    }
     try {
       authorizeResult(oneResult);
       if (!this.fn.hasResult()) {
@@ -263,6 +276,9 @@ public class ServerToClientFunctionResultSender implements 
ResultSender {
   }
 
   protected void sendHeader() throws IOException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("ServerToClientFunctionResultSender sending header");
+    }
     this.setBuffer();
     this.msg.setMessageType(messageType);
     this.msg.setLastChunk(false);
@@ -280,6 +296,9 @@ public class ServerToClientFunctionResultSender implements 
ResultSender {
   
   public synchronized void setException(Throwable exception) {
     this.lastResultReceived = true;
+    if (logger.isDebugEnabled()) {
+      logger.debug("ServerToClientFunctionResultSender setting exception {} ", 
exception);
+    }
     synchronized (this.msg) {
       if (!this.sc.getTransientFlag(Command.RESPONDED)) {
         alreadySendException.set(true);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java
index 4cc80a3..5ac75c6 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/execute/ServerToClientFunctionResultSender65.java
@@ -120,6 +120,9 @@ public class ServerToClientFunctionResultSender65 extends
       return;
     }
     try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("ServerToClientFunctionResultSender sending last result2 
{} " + oneResult);
+      }
       authorizeResult(oneResult);
       if (!this.fn.hasResult()) {
         throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java
index 5fe869a..7bbf8e7 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/partitioned/PRFunctionStreamingResultCollector.java
@@ -93,7 +93,6 @@ public class PRFunctionStreamingResultCollector extends  
FunctionStreamingResult
     if (this.hasResult) {
       try {
         this.waitForCacheOrFunctionException(0);
-
         if (!this.execution.getFailedNodes().isEmpty()
             && !this.execution.isClientServerMode()) {
           // end the rc and clear it
@@ -116,6 +115,7 @@ public class PRFunctionStreamingResultCollector extends  
FunctionStreamingResult
       catch (FunctionInvocationTargetException fite) {
         // this is case of WrapperException which enforce the re execution of
         // the function.
+        if(!execution.getWaitOnExceptionFlag()) {
         if (!this.fn.isHA()) {
           throw new FunctionException(fite);
         }
@@ -137,8 +137,10 @@ public class PRFunctionStreamingResultCollector extends  
FunctionStreamingResult
           }
           return newRc.getResult();
         }
+        }
       }
       catch (BucketMovedException e) {
+        if(!execution.getWaitOnExceptionFlag()){
         if (!this.fn.isHA()) {
           //endResults();
           FunctionInvocationTargetException fite = new 
FunctionInvocationTargetException(
@@ -165,8 +167,10 @@ public class PRFunctionStreamingResultCollector extends  
FunctionStreamingResult
           }
           return newRc.getResult();
         }
+        }
       }
       catch (CacheClosedException e) {
+        if(!execution.getWaitOnExceptionFlag()) {
         if (!this.fn.isHA()) {
           //endResults();
           FunctionInvocationTargetException fite = new 
FunctionInvocationTargetException(e.getMessage());
@@ -192,6 +196,7 @@ public class PRFunctionStreamingResultCollector extends  
FunctionStreamingResult
           }
           return newRc.getResult();
         }
+        }
       }
       catch (CacheException e) {
         //endResults();
@@ -417,11 +422,19 @@ public class PRFunctionStreamingResultCollector extends  
FunctionStreamingResult
     logger.debug("StreamingPartitionResponseWithResultCollector received 
exception {} from member {}", ex.getCause(), msg.getSender());
     
     // we have already forwarded the exception, no need to keep it here
-    if (execution.isForwardExceptions()) {
+    if (execution.isForwardExceptions() || execution.getWaitOnExceptionFlag()) 
{
       return;
     }
     
-    if (ex.getCause() instanceof CacheClosedException) {
+    /** 
+     * Below two cases should also be handled
+     * and not thrown exception
+     * Saving the exception
+     * ForeceReattempt can also be added here? 
+     * Also, if multipel nodes throw exception, one may override another
+     * TODO: Wrap exception among each other or create a list of exceptions 
like this.fite.
+     */
+    if ( ex.getCause() instanceof CacheClosedException) {
       ((PartitionedRegionFunctionExecutor)this.execution).addFailedNode(msg
           .getSender().getId());
       this.exception = ex;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java
index 528e462..c7defdc 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/ExecuteFunction70.java
@@ -134,6 +134,9 @@ public class ExecuteFunction70 extends ExecuteFunction66 {
       }
       ((AbstractExecution)execution).setIgnoreDepartedMembers(true);
     }
+    if(!functionObject.isHA()) {
+      ((AbstractExecution)execution).setWaitOnExceptionFlag(true);
+    }
     if (function instanceof String) {
       execution.execute(functionObject.getId()).getResult();
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dbf41d1f/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java
 
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java
index d1b1aa5..5fb6427 100755
--- 
a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java
+++ 
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/functions/TestFunction.java
@@ -71,6 +71,7 @@ public class TestFunction extends FunctionAdapter implements 
Declarable2 {
   public static final String TEST_FUNCTION_SOCKET_TIMEOUT = 
"SocketTimeOutFunction";
   public static final String TEST_FUNCTION_TIMEOUT = "executeTimeOut";
   public static final String TEST_FUNCTION_HA = "executeFunctionHA";
+  public static final String TEST_FUNCTION_NONHA = "executeFunctionNonHA";
   public static final String TEST_FUNCTION_HA_SERVER = 
"executeFunctionHAOnServer";
   public static final String TEST_FUNCTION_NONHA_SERVER = 
"executeFunctionNonHAOnServer";
   public static final String TEST_FUNCTION_NONHA_REGION = 
"executeFunctionNonHAOnRegion";
@@ -165,6 +166,9 @@ public class TestFunction extends FunctionAdapter 
implements Declarable2 {
     else if(id.equals(TEST_FUNCTION_HA)){
       executeHA(context);
     }
+    else if(id.equals(TEST_FUNCTION_NONHA)){
+      executeHA(context);
+    }
     else if(id.equals(TEST_FUNCTION_HA_SERVER) || 
id.equals(TEST_FUNCTION_NONHA_SERVER)){
       executeHAAndNonHAOnServer(context);
     }
@@ -1136,10 +1140,11 @@ public class TestFunction extends FunctionAdapter 
implements Declarable2 {
   
   @Override
   public boolean isHA() {
+    
     if (getId().equals(TEST_FUNCTION10)) {
       return true;
     }
-    if (getId().equals(TEST_FUNCTION_NONHA_SERVER) || 
getId().equals(TEST_FUNCTION_NONHA_REGION) || 
getId().equals(TEST_FUNCTION_NONHA_NOP)) {
+    if (getId().equals(TEST_FUNCTION_NONHA_SERVER) || 
getId().equals(TEST_FUNCTION_NONHA_REGION) || 
getId().equals(TEST_FUNCTION_NONHA_NOP) || getId().equals(TEST_FUNCTION_NONHA)) 
{
       return false;
     }
     return 
Boolean.valueOf(this.props.getProperty(HAVE_RESULTS)).booleanValue();

Reply via email to