GEODE-1776: Always read all of the results in ExecuteRegionFunctionOp

Read all of the results from the wire in
ExecuteRegionFunctionOp.processResponse, even if we read an exception as
one of the responses. This ensures that all data is read off the wire.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/a5daa924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/a5daa924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/a5daa924

Branch: refs/heads/feature/GEODE-420
Commit: a5daa924a5921fbe6fac4bd5f5b88c9bd3678c8d
Parents: dbf41d1
Author: Dan Smith <[email protected]>
Authored: Fri Aug 12 17:12:36 2016 -0700
Committer: Dan Smith <[email protected]>
Committed: Wed Aug 17 14:27:58 2016 -0700

----------------------------------------------------------------------
 .../client/internal/ExecuteFunctionOp.java      | 10 +++-
 .../internal/ExecuteRegionFunctionOp.java       | 61 ++++++--------------
 .../ExecuteRegionFunctionSingleHopOp.java       | 16 +++--
 3 files changed, 35 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a5daa924/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
index 278f1f7..105638a 100755
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteFunctionOp.java
@@ -531,6 +531,7 @@ public class ExecuteFunctionOp {
             if (logger.isDebugEnabled()) {
               logger.debug("ExecuteFunctionOpImpl#processResponse: received 
message of type EXECUTE_FUNCTION_RESULT.");
             }
+            Exception exception = null;
             // Read the chunk
             do{
               executeFunctionResponseMsg.receiveChunk();
@@ -556,11 +557,11 @@ public class ExecuteFunctionOp {
                   continue;
                 }
                 else {
-                  throw ex;
+                  exception = ex;
                 }
               }else if (result instanceof Throwable) {
                 String s = "While performing a remote " + getOpName();
-                throw new ServerOperationException(s, (Throwable)result);
+                exception = new ServerOperationException(s, (Throwable)result);
                 // Get the exception toString part.
                 // This was added for c++ thin client and not used in java
                 //Part exceptionToStringPart = msg.getPart(1);
@@ -575,6 +576,11 @@ public class ExecuteFunctionOp {
                     .incResultsReceived();
               }
             }while(!executeFunctionResponseMsg.isLastChunk());
+
+            if(exception != null) {
+              throw exception;
+            }
+
             if (logger.isDebugEnabled()) {
               logger.debug("ExecuteFunctionOpImpl#processResponse: received 
all the results from server successfully.");
             }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a5daa924/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
index 9d8dd7d..0260f77 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -501,6 +501,7 @@ public class ExecuteRegionFunctionOp {
               logger.debug("ExecuteRegionFunctionOpImpl#processResponse: 
received message of type EXECUTE_REGION_FUNCTION_RESULT. The number of parts 
are : {}", executeFunctionResponseMsg.getNumberOfParts());
             }
             // Read the chunk
+            boolean throwServerOp = false;
             do {
               executeFunctionResponseMsg.receiveChunk();
               Object resultResponse = executeFunctionResponseMsg.getPart(0)
@@ -532,43 +533,14 @@ public class ExecuteRegionFunctionOp {
                   InternalFunctionInvocationTargetException ifite = 
(InternalFunctionInvocationTargetException)ex
                       .getCause();
                   this.failedNodes.addAll(ifite.getFailedNodeSet());
-                  if (this.functionException == null) {
-                    this.functionException = (FunctionException)result;
-                  }
-                  
this.functionException.addException((FunctionException)result);
-                }
-                else if(((FunctionException) result).getCause() instanceof 
FunctionInvocationTargetException){
-                  if (this.functionException == null) {
-                    this.functionException = ex;
-                  }
-                  this.functionException.addException(ex.getCause());
-                }
-                else if(result instanceof FunctionInvocationTargetException){
-                  if (this.functionException == null) {
-                    this.functionException = new 
FunctionException((FunctionInvocationTargetException)result);
-                  }
-                  this.functionException.addException(ex);
-                }
-                else if(result instanceof 
InternalFunctionInvocationTargetException){
-                  if (this.functionException == null) {
-                    this.functionException = new 
FunctionException((InternalFunctionInvocationTargetException)result);
-                  }
-                  this.functionException.addException(ex);
+                  addFunctionException((FunctionException) result);
                 }
                 else{
-                  if (this.functionException == null) {
-                    this.functionException = ex;
-                  }
-                  this.functionException.addException(ex);
-                }
-                if (isHA) {
-                  executeFunctionResponseMsg.clear();
-                  throw ex;
+                  addFunctionException((FunctionException) result);
                 }
               }
               else if (result instanceof Throwable) {
                 Throwable t = (Throwable)result;
-                boolean throwServerOp = false;
                   if (this.functionException == null) {
                     if(result instanceof BucketMovedException){
                       FunctionInvocationTargetException fite;
@@ -606,16 +578,6 @@ public class ExecuteRegionFunctionOp {
                   } else {
                     this.functionException.addException(t);
                   }
-                  
-                  if (isHA) {
-                    if (throwServerOp) {
-                      String s = "While performing a remote " + getOpName();
-                      executeFunctionResponseMsg.clear();
-                      throw new ServerOperationException(s, 
this.functionException);
-                    } else {
-                       throw this.functionException;
-                    }
-                  }
               }
               else {
                 DistributedMember memberID = 
(DistributedMember)((ArrayList)resultResponse)
@@ -628,6 +590,13 @@ public class ExecuteRegionFunctionOp {
             if (isDebugEnabled) {
               logger.debug("ExecuteRegionFunctionOpImpl#processResponse: 
received all the results from server successfully.");
             }
+
+
+            if (isHA && throwServerOp) {
+              String s = "While performing a remote " + getOpName();
+              throw new ServerOperationException(s, this.functionException);
+            }
+
             // add all the exceptions here.
             if (this.functionException != null) {
               throw this.functionException;
@@ -651,11 +620,9 @@ public class ExecuteRegionFunctionOp {
                     .getCause();
                 this.failedNodes.addAll(ifite.getFailedNodeSet());
               }
-              executeFunctionResponseMsg.clear();
               throw ex;
             }
             else if (obj instanceof Throwable) {
-              executeFunctionResponseMsg.clear();
               String s = "While performing a remote " + getOpName();
               throw new ServerOperationException(s, (Throwable)obj);
             }
@@ -668,7 +635,6 @@ public class ExecuteRegionFunctionOp {
             executeFunctionResponseMsg.receiveChunk();
             String errorMessage = executeFunctionResponseMsg.getPart(0)
                 .getString();
-            executeFunctionResponseMsg.clear();
             throw new ServerOperationException(errorMessage);
           default:
             throw new InternalGemFireError("Unknown message type "
@@ -681,6 +647,13 @@ public class ExecuteRegionFunctionOp {
       return null;
     }
 
+    private void addFunctionException(final FunctionException result) {
+      if (this.functionException == null) {
+        this.functionException = result;
+      }
+      this.functionException.addException(result);
+    }
+
     @Override
     protected boolean isErrorResponse(int msgType) {
       return msgType == MessageType.EXECUTE_REGION_FUNCTION_ERROR;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/a5daa924/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
index 768de5f..c917d4d 100644
--- 
a/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
+++ 
b/geode-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -338,6 +338,7 @@ public class ExecuteRegionFunctionSingleHopOp {
             if (isDebugEnabled) {
               
logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received 
message of type EXECUTE_REGION_FUNCTION_RESULT.");
             }
+            Exception exception = null;
             do {
               executeFunctionResponseMsg.receiveChunk();
               Object resultResponse = executeFunctionResponseMsg.getPart(0)
@@ -371,14 +372,12 @@ public class ExecuteRegionFunctionSingleHopOp {
                   this.failedNodes.addAll(ifite.getFailedNodeSet());
                 }
                 if (!ex.getMessage().equals("Buckets are null"))
-                  throw ex;
-
-                return null;
+                  exception = ex;
               }
               else if(result instanceof BucketMovedException) {
                 FunctionInvocationTargetException fite = new 
InternalFunctionInvocationTargetException(
                     ((BucketMovedException)result).getMessage());
-                throw new FunctionException(fite);
+                exception = new FunctionException(fite);
               }
               else if(result instanceof CacheClosedException) {
                 FunctionInvocationTargetException fite = new 
InternalFunctionInvocationTargetException(
@@ -388,11 +387,11 @@ public class ExecuteRegionFunctionSingleHopOp {
                       .get(1);
                   this.failedNodes.add(memberID.getId());
                 }                
-                throw new FunctionException(fite);
+                exception = new FunctionException(fite);
               }
               else if (result instanceof Throwable) {
                 String s = "While performing a remote " + getOpName();
-                throw new ServerOperationException(s, (Throwable)result);
+                exception = new ServerOperationException(s, (Throwable)result);
               }
               else {
                 DistributedMember memberID = 
(DistributedMember)((ArrayList)resultResponse)
@@ -405,6 +404,11 @@ public class ExecuteRegionFunctionSingleHopOp {
                     
this.executor.getRegion().getSystem()).incResultsReceived();
               }
             } while (!executeFunctionResponseMsg.isLastChunk());
+
+            if (exception != null) {
+              throw exception;
+            }
+
             if (isDebugEnabled) {
               
logger.debug("ExecuteRegionFunctionSingleHopOpImpl#processResponse: received 
all the results from server successfully.");
             }

Reply via email to