DRILL-1118: In case of error, always set QueryState to FAILED + Added a null check in o.a.d.exec.physical.impl.producer.ProducerConsumerBatch.clearQueue()
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/9abffa17 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/9abffa17 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/9abffa17 Branch: refs/heads/master Commit: 9abffa177d017e2b781c48003401196a596987be Parents: d22f325 Author: Aditya Kishore <adi...@maprtech.com> Authored: Tue Jul 8 16:51:04 2014 -0700 Committer: Aditya Kishore <adi...@maprtech.com> Committed: Wed Jul 9 11:55:46 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/client/DrillClient.java | 3 --- .../drill/exec/physical/impl/ScreenCreator.java | 2 ++ .../impl/producer/ProducerConsumerBatch.java | 4 ++- .../drill/exec/rpc/user/QueryResultHandler.java | 26 ++++++++++---------- 4 files changed, 18 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 090a760..3a9d015 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -289,9 +289,6 @@ public class DrillClient implements Closeable, ConnectionThrottle{ @Override public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { // logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result); - if (result.getHeader().getErrorCount() > 0) { - fail(new Exception(result.getHeader().getError(0).getMessage())); - } results.add(result); if(result.getHeader().getIsLastChunk()){ future.set(results); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 146d72d..853969d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer; import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryResult; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -101,6 +102,7 @@ public class ScreenCreator implements RootCreator<Screen>{ QueryResult header = QueryResult.newBuilder() // .setQueryId(context.getHandle().getQueryId()) // .setRowCount(0) // + .setQueryState(QueryState.FAILED) .addError(ErrorHelper.logAndConvertError(context.getIdentity(), "Screen received stop request sent.", context.getFailureCause(), logger)) .setDef(RecordBatchDef.getDefaultInstance()) // .setIsLastChunk(true) // http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 9ec07de..6b43044 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -163,7 +163,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { private void clearQueue() { RecordBatchDataWrapper wrapper; while ((wrapper = queue.poll()) != null) { - wrapper.batch.getContainer().clear(); + if (wrapper.batch != null) { + wrapper.batch.getContainer().clear(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/9abffa17/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index 51a1156..e213c51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -37,26 +37,28 @@ import com.google.common.collect.Queues; * Encapsulates the future management of query submissions. This entails a potential race condition. Normal ordering is: * 1. Submit query to be executed. 2. Receive QueryHandle for buffer management 3. Start receiving results batches for * query. - * + * * However, 3 could potentially occur before 2. As such, we need to handle this case and then do a switcheroo. - * + * */ public class QueryResultHandler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultHandler.class); private ConcurrentMap<QueryId, UserResultsListener> resultsListener = Maps.newConcurrentMap(); - + public RpcOutcomeListener<QueryId> getWrappedListener(UserResultsListener listener){ return new SubmissionListener(listener); } - + public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf dBody) throws RpcException { final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER); final QueryResultBatch batch = new QueryResultBatch(result, dBody); + final boolean failed = (batch.getHeader().getQueryState() == QueryState.FAILED); + + assert failed || batch.getHeader().getErrorCount() == 0 : "Error count for the query batch is non-zero but QueryState != FAILED"; + UserResultsListener l = resultsListener.get(result.getQueryId()); - - boolean failed = batch.getHeader().getQueryState() == QueryState.FAILED; // logger.debug("For QueryId [{}], retrieved result listener {}", result.getQueryId(), l); if (l == null) { BufferingListener bl = new BufferingListener(); @@ -79,7 +81,7 @@ public class QueryResultHandler { l.submissionFailed(new RpcException(e)); } } - + if ( (failed || result.getIsLastChunk()) && @@ -95,8 +97,6 @@ public class QueryResultHandler { } } - - private class BufferingListener implements UserResultsListener { private ConcurrentLinkedQueue<QueryResultBatch> results = Queues.newConcurrentLinkedQueue(); @@ -104,6 +104,7 @@ public class QueryResultHandler { private volatile RpcException ex; private volatile UserResultsListener output; private volatile ConnectionThrottle throttle; + public boolean transferTo(UserResultsListener l) { synchronized (this) { output = l; @@ -120,12 +121,11 @@ public class QueryResultHandler { } } - @Override public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { this.throttle = throttle; if(result.getHeader().getIsLastChunk()) finished = true; - + synchronized (this) { if (output == null) { this.results.add(result); @@ -146,12 +146,11 @@ public class QueryResultHandler { } } } - + public boolean isFinished(){ return finished; } - @Override public void queryIdArrived(QueryId queryId) { } @@ -200,4 +199,5 @@ public class QueryResultHandler { } } + }