This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 221484bf8f5ce82d5b2b003d02f69553e6ecf516 Author: Paul Rogers <[email protected]> AuthorDate: Wed Jan 1 21:52:56 2020 -0800 DRILL-7507: Convert fragment interrupts to exceptions Modifies fragment interrupt handling to throw a specialized exception, rather than relying on the complex and cumbersome STOP iterator status. closes #1949 --- .../org/apache/drill/exec/ops/FragmentContext.java | 29 +++- .../drill/exec/ops/QueryCancelledException.java | 32 ++++ .../drill/exec/physical/impl/BaseRootExec.java | 9 +- .../impl/aggregate/SpilledRecordbatch.java | 15 +- .../impl/mergereceiver/MergingRecordBatch.java | 173 ++++++++++----------- .../OrderedPartitionRecordBatch.java | 16 +- .../impl/partitionsender/PartitionerDecorator.java | 6 +- .../impl/producer/ProducerConsumerBatch.java | 10 +- .../unorderedreceiver/UnorderedReceiverBatch.java | 4 +- .../physical/impl/xsort/ExternalSortBatch.java | 5 +- .../exec/physical/impl/xsort/MSortTemplate.java | 4 +- .../drill/exec/record/AbstractRecordBatch.java | 20 ++- .../drill/exec/work/fragment/FragmentExecutor.java | 10 ++ .../drill/exec/physical/impl/SimpleRootExec.java | 7 + .../unnest/TestUnnestWithLateralCorrectness.java | 2 +- .../org/apache/drill/test/OperatorFixture.java | 21 +-- 16 files changed, 207 insertions(+), 156 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 2107094..619f2d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -213,15 +213,36 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable { interface ExecutorState { /** - * Tells individual operations whether they should continue. In some cases, an external event (typically cancellation) - * will mean that the fragment should prematurely exit execution. Long running operations should check this every so - * often so that Drill is responsive to cancellation operations. + * Tells individual operations whether they should continue. In some cases, + * an external event (typically cancellation) will mean that the fragment + * should prematurely exit execution. Long running operations should check + * this every so often so that Drill is responsive to cancellation + * operations. * - * @return False if the action should terminate immediately, true if everything is okay. + * @return False if the action should terminate immediately, true if + * everything is okay. */ boolean shouldContinue(); /** + * Check if an operation should continue. In some cases, + * an external event (typically cancellation) will mean that the fragment + * should prematurely exit execution. Long running operations should check + * this every so often so that Drill is responsive to cancellation + * operations. + * <p> + * Throws QueryCancelledException if the query (fragment) should stop. + * The fragment executor interprets this as an exception it, itself, + * requested, and will call the operator's close() method to release + * resources. Operators should not catch and handle this exception, + * and should only call this method when the operator holds no + * transient resources (such as local variables.) + * + * @throws QueryCancelledException if the query (fragment) should stop. + */ + void checkContinue(); + + /** * Inform the executor if a exception occurs and fragment should be failed. * * @param t diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryCancelledException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryCancelledException.java new file mode 100644 index 0000000..0d5a5a5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryCancelledException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +/** + * Indicates that an external source has cancelled the query. + * Thrown by the operator that detects the cancellation. Bubbles + * up the operator tree like other exceptions. However, this one + * tells the fragment executor that the exception is one that + * the fragment executor itself initiated and so should not + * be reported as an error. + */ +@SuppressWarnings("serial") +public class QueryCancelledException extends RuntimeException { + + // No need for messages; this exception is silently ignored. +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 04ea6b6..24597ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -32,9 +32,11 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class BaseRootExec implements RootExec { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRootExec.class); + private static final Logger logger = LoggerFactory.getLogger(BaseRootExec.class); public static final String ENABLE_BATCH_DUMP_CONFIG = "drill.exec.debug.dump_batches"; protected OperatorStats stats; @@ -85,11 +87,8 @@ public abstract class BaseRootExec implements RootExec { @Override public final boolean next() { - // Stats should have been initialized assert stats != null; - if (!fragmentContext.getExecutorState().shouldContinue()) { - return false; - } + fragmentContext.getExecutorState().checkContinue(); try { stats.startProcessing(); return innerNext(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java index 822a810..56adae2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -47,12 +47,12 @@ public class SpilledRecordbatch implements CloseableRecordBatch { private VectorContainer container; private InputStream spillStream; private int spilledBatches; - private FragmentContext context; - private BatchSchema schema; - private SpillSet spillSet; - private String spillFile; + private final FragmentContext context; + private final BatchSchema schema; + private final SpillSet spillSet; + private final String spillFile; VectorAccessibleSerializable vas; - private IterOutcome initialOutcome; + private final IterOutcome initialOutcome; // Represents last outcome of next(). If an Exception is thrown // during the method's execution a value IterOutcome.STOP will be assigned. private IterOutcome lastOutcome; @@ -134,10 +134,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch { @Override public IterOutcome next() { - if (!context.getExecutorState().shouldContinue()) { - lastOutcome = IterOutcome.STOP; - return lastOutcome; - } + context.getExecutorState().checkContinue(); if ( spilledBatches <= 0 ) { // no more batches to read in this partition this.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 420b61a..9443882 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -43,6 +43,7 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.QueryCancelledException; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; @@ -151,7 +152,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // interruption and respond to it if it wants to. Thread.currentThread().interrupt(); - return null; + throw new QueryCancelledException(); } finally { stats.stopWait(); } @@ -194,97 +195,100 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // set up each (non-empty) incoming record batch final List<RawFragmentBatch> rawBatches = Lists.newArrayList(); - int p = 0; - for (@SuppressWarnings("unused") final RawFragmentBatchProvider provider : fragProviders) { - RawFragmentBatch rawBatch; - // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema - if (tempBatchHolder[p] != null) { - rawBatch = tempBatchHolder[p]; - tempBatchHolder[p] = null; - } else { - try { - rawBatch = getNext(p); - } catch (final IOException e) { - context.getExecutorState().fail(e); - return IterOutcome.STOP; + try { + int p = 0; + for (@SuppressWarnings("unused") final RawFragmentBatchProvider provider : fragProviders) { + RawFragmentBatch rawBatch; + // check if there is a batch in temp holder before calling getNext(), as it may have been used when building schema + if (tempBatchHolder[p] != null) { + rawBatch = tempBatchHolder[p]; + tempBatchHolder[p] = null; + } else { + try { + rawBatch = getNext(p); + } catch (final IOException e) { + context.getExecutorState().fail(e); + return IterOutcome.STOP; + } } - } - if (rawBatch == null && !context.getExecutorState().shouldContinue()) { - clearBatches(rawBatches); - return IterOutcome.STOP; - } + checkContinue(); - // If rawBatch is null, go ahead and add it to the list. We will create dummy batches - // for all null batches later. - if (rawBatch == null) { - createDummyBatch = true; - rawBatches.add(rawBatch); - p++; // move to next sender - continue; - } + // If rawBatch is null, go ahead and add it to the list. We will create dummy batches + // for all null batches later. + if (rawBatch == null) { + checkContinue(); + createDummyBatch = true; + rawBatches.add(rawBatch); + p++; // move to next sender + continue; + } - if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() != 0) { - // save the schema to fix up empty batches with no schema if needed. - fieldList = rawBatch.getHeader().getDef().getFieldList(); - } + if (fieldList == null && rawBatch.getHeader().getDef().getFieldCount() != 0) { + // save the schema to fix up empty batches with no schema if needed. + fieldList = rawBatch.getHeader().getDef().getFieldList(); + } - if (rawBatch.getHeader().getDef().getRecordCount() != 0) { - rawBatches.add(rawBatch); - } else { - // keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null - try { - while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { - // Do nothing - } - if (rawBatch == null && !context.getExecutorState().shouldContinue()) { + if (rawBatch.getHeader().getDef().getRecordCount() != 0) { + rawBatches.add(rawBatch); + } else { + // keep reading till we get a batch with record count > 0 or we have no more batches to read i.e. we get null + try { + while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { + // Do nothing + } + if (rawBatch == null) { + checkContinue(); + createDummyBatch = true; + } + } catch (final IOException e) { + context.getExecutorState().fail(e); clearBatches(rawBatches); return IterOutcome.STOP; } - } catch (final IOException e) { - context.getExecutorState().fail(e); - clearBatches(rawBatches); - return IterOutcome.STOP; - } - if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { - createDummyBatch = true; + if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { + createDummyBatch = true; + } + // Even if rawBatch is null, go ahead and add it to the list. + // We will create dummy batches for all null batches later. + rawBatches.add(rawBatch); } - // Even if rawBatch is null, go ahead and add it to the list. - // We will create dummy batches for all null batches later. - rawBatches.add(rawBatch); + p++; } - p++; - } - // If no batch arrived with schema from any of the providers, just return NONE. - if (fieldList == null) { - return IterOutcome.NONE; - } + // If no batch arrived with schema from any of the providers, just return NONE. + if (fieldList == null) { + return IterOutcome.NONE; + } - // Go through and fix schema for empty batches. - if (createDummyBatch) { - // Create dummy record batch definition with 0 record count - UserBitShared.RecordBatchDef dummyDef = UserBitShared.RecordBatchDef.newBuilder() - // we cannot use/modify the original field list as that is used by - // valid record batch. - // create a copy of field list with valuecount = 0 for all fields. - // This is for dummy schema generation. - .addAllField(createDummyFieldList(fieldList)) - .setRecordCount(0) - .build(); + // Go through and fix schema for empty batches. + if (createDummyBatch) { + // Create dummy record batch definition with 0 record count + UserBitShared.RecordBatchDef dummyDef = UserBitShared.RecordBatchDef.newBuilder() + // we cannot use/modify the original field list as that is used by + // valid record batch. + // create a copy of field list with valuecount = 0 for all fields. + // This is for dummy schema generation. + .addAllField(createDummyFieldList(fieldList)) + .setRecordCount(0) + .build(); - // Create dummy header - BitData.FragmentRecordBatch dummyHeader = BitData.FragmentRecordBatch.newBuilder() - .setIsLastBatch(true) - .setDef(dummyDef) - .build(); + // Create dummy header + BitData.FragmentRecordBatch dummyHeader = BitData.FragmentRecordBatch.newBuilder() + .setIsLastBatch(true) + .setDef(dummyDef) + .build(); - for (int i = 0; i < p; i++) { - RawFragmentBatch rawBatch = rawBatches.get(i); - if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { - rawBatch = new RawFragmentBatch(dummyHeader, null, null); - rawBatches.set(i, rawBatch); + for (int i = 0; i < p; i++) { + RawFragmentBatch rawBatch = rawBatches.get(i); + if (rawBatch == null || rawBatch.getHeader().getDef().getFieldCount() == 0) { + rawBatch = new RawFragmentBatch(dummyHeader, null, null); + rawBatches.set(i, rawBatch); + } } } + } catch (Throwable t) { + clearBatches(rawBatches); + throw t; } // allocate the incoming record batch loaders @@ -375,9 +379,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } else { batchLoaders[b].clear(); batchLoaders[b] = null; - if (!context.getExecutorState().shouldContinue()) { - return IterOutcome.STOP; - } + checkContinue(); } } catch (IOException | SchemaChangeException e) { context.getExecutorState().fail(e); @@ -413,8 +415,8 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId] : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); - if (nextBatch == null && !context.getExecutorState().shouldContinue()) { - return IterOutcome.STOP; + if (nextBatch == null) { + checkContinue(); } } catch (final IOException e) { context.getExecutorState().fail(e); @@ -544,12 +546,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } final RawFragmentBatch batch = getNext(i); if (batch == null) { - if (!context.getExecutorState().shouldContinue()) { - state = BatchState.STOP; - } else { - state = BatchState.DONE; - } - break; + checkContinue(); } if (batch.getHeader().getDef().getFieldCount() == 0) { i++; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index c309e8c..d67ae42 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -52,6 +52,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.QueryCancelledException; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.impl.sort.SortBatch; import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder; @@ -259,15 +260,12 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart * @return True if the given timeout is expired. False when interrupted and * the fragment status is not runnable. */ - private boolean waitUntilTimeOut(final long timeout) { + private void waitUntilTimeOut(final long timeout) { while(true) { try { Thread.sleep(timeout); - return true; } catch (final InterruptedException e) { - if (!context.getExecutorState().shouldContinue()) { - return false; - } + throw new QueryCancelledException(); } } } @@ -307,18 +305,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // TODO: this should be polling. if (val < fragmentsBeforeProceed) { - if (!waitUntilTimeOut(10)) { - return false; - } + waitUntilTimeOut(10); } for (int i = 0; i < 100 && finalTable == null; i++) { finalTable = tableMap.get(finalTableKey); if (finalTable != null) { break; } - if (!waitUntilTimeOut(10)) { - return false; - } + waitUntilTimeOut(10); } if (finalTable == null) { buildTable(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java index 04371d2..8646192 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java @@ -30,6 +30,7 @@ import java.util.concurrent.locks.LockSupport; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.QueryCancelledException; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; @@ -51,7 +52,7 @@ public final class PartitionerDecorator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class); - private List<Partitioner> partitioners; + private final List<Partitioner> partitioners; private final OperatorStats stats; private final ExecutorService executor; private final FragmentContext context; @@ -155,6 +156,7 @@ public final class PartitionerDecorator { testCountDownLatch.countDown(); } catch (InterruptedException e) { logger.warn("fragment thread interrupted", e); + throw new QueryCancelledException(); } catch (RejectedExecutionException e) { logger.warn("Failed to execute partitioner tasks. Execution service down?", e); executionException = new ExecutionException(e); @@ -309,7 +311,7 @@ public final class PartitionerDecorator { private final GeneralExecuteIface iface; private final Partitioner partitioner; - private CountDownLatchInjection testCountDownLatch; + private final CountDownLatchInjection testCountDownLatch; private volatile ExecutionException exception; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 633920e..9d67b5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.QueryCancelledException; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.AbstractRecordBatch; @@ -68,11 +69,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> wrapper = queue.take(); logger.debug("Got batch from queue"); } catch (final InterruptedException e) { - if (context.getExecutorState().shouldContinue()) { - context.getExecutorState().fail(e); - } - return IterOutcome.STOP; - // TODO InterruptedException + throw new QueryCancelledException(); } finally { stats.stopWait(); } @@ -155,7 +152,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> } } catch (final InterruptedException e) { logger.warn("Producer thread is interrupted.", e); - // TODO InterruptedException + throw new QueryCancelledException(); } finally { if (stop) { try { @@ -203,6 +200,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> } catch (final InterruptedException e) { logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e); // TODO we should retry to wait for the latch + throw new QueryCancelledException(); } finally { super.close(); clearQueue(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 8b3c2e9..f6304aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -188,9 +188,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { if (batch == null) { lastOutcome = IterOutcome.NONE; batchLoader.zero(); - if (!context.getExecutorState().shouldContinue()) { - lastOutcome = IterOutcome.STOP; - } + context.getExecutorState().checkContinue(); return lastOutcome; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index b4ebc1d..17a634d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -423,11 +423,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // No records to sort sortState = SortState.DONE; return NONE; - } else if (!context.getExecutorState().shouldContinue()) { - // Interrupted - sortState = SortState.DONE; - return STOP; } else { + checkContinue(); // There is some data to be returned downstream. // We have to prepare output container diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 8a31e3e..e503c5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -150,9 +150,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { int totalCount = vector4.getTotalCount(); // check if we're cancelled/failed recently - if (!context.getExecutorState().shouldContinue()) { - return; - } + context.getExecutorState().checkContinue(); int outIndex = 0; Queue<Integer> newRunStarts = Queues.newLinkedBlockingQueue(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 1548ec5..94ee022 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -109,19 +109,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } public final IterOutcome next(RecordBatch b) { - if(!context.getExecutorState().shouldContinue()) { - return IterOutcome.STOP; - } + checkContinue(); return next(0, b); } public final IterOutcome next(int inputIndex, RecordBatch b) { IterOutcome next; - stats.stopProcessing(); try { - if (!context.getExecutorState().shouldContinue()) { - return IterOutcome.STOP; - } + stats.stopProcessing(); + checkContinue(); next = b.next(); } finally { stats.startProcessing(); @@ -267,4 +263,14 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements public boolean isRecordBatchStatsLoggingEnabled() { return batchStatsContext.isEnableBatchSzLogging(); } + + /** + * Checks if the query should continue. Throws a UserException if not. + * Operators should call this periodically to detect cancellation + * requests. The operator need not catch the exception: it will bubble + * up the operator tree and be handled like any other fatal error. + */ + public void checkContinue() { + context.getExecutorState().checkContinue(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 79884e8..161fa83 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.QueryCancelledException; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; @@ -332,6 +333,8 @@ public class FragmentExecutor implements Runnable { } }); + } catch (QueryCancelledException e) { + // Ignore: indicates query cancelled by this executor } catch (OutOfMemoryError | OutOfMemoryException e) { if (FailureUtils.isDirectMemoryOOM(e)) { fail(UserException.memoryError(e).build(logger)); @@ -528,6 +531,13 @@ public class FragmentExecutor implements Runnable { public Throwable getFailureCause(){ return deferredException.getException(); } + + @Override + public void checkContinue() { + if (!shouldContinue()) { + throw new QueryCancelledException(); + } + } } private class FragmentDrillbitStatusListener implements DrillbitStatusListener { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index a176646..68cf18f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.drill.common.DeferredException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.QueryCancelledException; import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -74,6 +75,12 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> { return ex.getException(); } + @Override + public void checkContinue() { + if (!shouldContinue()) { + throw new QueryCancelledException(); + } + } } public RootFragmentContext getContext() { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java index 8969265..b3e1d8e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/unnest/TestUnnestWithLateralCorrectness.java @@ -57,7 +57,7 @@ import org.junit.experimental.categories.Category; import java.util.ArrayList; import java.util.List; -import static junit.framework.TestCase.fail; +import static org.junit.Assert.fail; @Category(OperatorTest.class) public class TestUnnestWithLateralCorrectness extends SubOperatorTest { diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java index 1e2df25..b9f4b43 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java @@ -464,27 +464,22 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable { } } - public static class MockExecutorState implements FragmentContext.ExecutorState - { + public static class MockExecutorState + implements FragmentContext.ExecutorState { @Override - public boolean shouldContinue() { - return true; - } + public boolean shouldContinue() { return true; } @Override - public void fail(Throwable t) { + public void fail(Throwable t) { } - } + @Override + public boolean isFailed() { return false; } @Override - public boolean isFailed() { - return false; - } + public Throwable getFailureCause() { return null; } @Override - public Throwable getFailureCause() { - return null; - } + public void checkContinue() { } } public OperatorContext newOperatorContext(PhysicalOperator popConfig) {
