DRILL-1118: In case of error, always set QueryState to FAILED

+ Added a null check in 
o.a.d.exec.physical.impl.producer.ProducerConsumerBatch.clearQueue()


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9abffa17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9abffa17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9abffa17

Branch: refs/heads/master
Commit: 9abffa177d017e2b781c48003401196a596987be
Parents: d22f325
Author: Aditya Kishore <adi...@maprtech.com>
Authored: Tue Jul 8 16:51:04 2014 -0700
Committer: Aditya Kishore <adi...@maprtech.com>
Committed: Wed Jul 9 11:55:46 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/client/DrillClient.java   |  3 ---
 .../drill/exec/physical/impl/ScreenCreator.java |  2 ++
 .../impl/producer/ProducerConsumerBatch.java    |  4 ++-
 .../drill/exec/rpc/user/QueryResultHandler.java | 26 ++++++++++----------
 4 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 090a760..3a9d015 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -289,9 +289,6 @@ public class DrillClient implements Closeable, 
ConnectionThrottle{
     @Override
     public void resultArrived(QueryResultBatch result, ConnectionThrottle 
throttle) {
 //      logger.debug("Result arrived.  Is Last Chunk: {}.  Full Result: {}", 
result.getHeader().getIsLastChunk(), result);
-      if (result.getHeader().getErrorCount() > 0) {
-        fail(new Exception(result.getHeader().getError(0).getMessage()));
-      }
       results.add(result);
       if(result.getHeader().getIsLastChunk()){
         future.set(results);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 146d72d..853969d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -32,6 +32,7 @@ import 
org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
 import 
org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
+import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -101,6 +102,7 @@ public class ScreenCreator implements RootCreator<Screen>{
           QueryResult header = QueryResult.newBuilder() //
               .setQueryId(context.getHandle().getQueryId()) //
               .setRowCount(0) //
+              .setQueryState(QueryState.FAILED)
               .addError(ErrorHelper.logAndConvertError(context.getIdentity(), 
"Screen received stop request sent.", context.getFailureCause(), logger))
               .setDef(RecordBatchDef.getDefaultInstance()) //
               .setIsLastChunk(true) //

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 9ec07de..6b43044 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -163,7 +163,9 @@ public class ProducerConsumerBatch extends 
AbstractRecordBatch {
   private void clearQueue() {
     RecordBatchDataWrapper wrapper;
     while ((wrapper = queue.poll()) != null) {
-      wrapper.batch.getContainer().clear();
+      if (wrapper.batch != null) {
+        wrapper.batch.getContainer().clear();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 51a1156..e213c51 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -37,26 +37,28 @@ import com.google.common.collect.Queues;
  * Encapsulates the future management of query submissions. This entails a 
potential race condition. Normal ordering is:
  * 1. Submit query to be executed. 2. Receive QueryHandle for buffer 
management 3. Start receiving results batches for
  * query.
- * 
+ *
  * However, 3 could potentially occur before 2. As such, we need to handle 
this case and then do a switcheroo.
- * 
+ *
  */
 public class QueryResultHandler {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class);
 
   private ConcurrentMap<QueryId, UserResultsListener> resultsListener = 
Maps.newConcurrentMap();
 
-  
+
   public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener 
listener){
     return new SubmissionListener(listener);
   }
-  
+
   public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf 
dBody) throws RpcException {
     final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
     final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+    final boolean failed = (batch.getHeader().getQueryState() == 
QueryState.FAILED);
+
+    assert failed || batch.getHeader().getErrorCount() == 0 : "Error count for 
the query batch is non-zero but QueryState != FAILED";
+
     UserResultsListener l = resultsListener.get(result.getQueryId());
-    
-    boolean failed = batch.getHeader().getQueryState() == QueryState.FAILED;
     // logger.debug("For QueryId [{}], retrieved result listener {}", 
result.getQueryId(), l);
     if (l == null) {
       BufferingListener bl = new BufferingListener();
@@ -79,7 +81,7 @@ public class QueryResultHandler {
         l.submissionFailed(new RpcException(e));
       }
     }
-    
+
     if (
         (failed || result.getIsLastChunk())
         &&
@@ -95,8 +97,6 @@ public class QueryResultHandler {
     }
   }
 
-  
-  
   private class BufferingListener implements UserResultsListener {
 
     private ConcurrentLinkedQueue<QueryResultBatch> results = 
Queues.newConcurrentLinkedQueue();
@@ -104,6 +104,7 @@ public class QueryResultHandler {
     private volatile RpcException ex;
     private volatile UserResultsListener output;
     private volatile ConnectionThrottle throttle;
+
     public boolean transferTo(UserResultsListener l) {
       synchronized (this) {
         output = l;
@@ -120,12 +121,11 @@ public class QueryResultHandler {
       }
     }
 
-    
     @Override
     public void resultArrived(QueryResultBatch result, ConnectionThrottle 
throttle) {
       this.throttle = throttle;
       if(result.getHeader().getIsLastChunk()) finished = true;
-      
+
       synchronized (this) {
         if (output == null) {
           this.results.add(result);
@@ -146,12 +146,11 @@ public class QueryResultHandler {
         }
       }
     }
-    
+
     public boolean isFinished(){
       return finished;
     }
 
-
     @Override
     public void queryIdArrived(QueryId queryId) {
     }
@@ -200,4 +199,5 @@ public class QueryResultHandler {
     }
 
   }
+
 }

Reply via email to