Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/1121#discussion_r169203509
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
---
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+ public enum State {
+
+ /**
+ * Before the first call to next().
+ */
+
+ START,
+
+ /**
+ * The first call to next() has been made and schema (only)
+ * was returned. On the subsequent call to next(), return any
+ * data that might have accompanied that first batch.
+ */
+
+ SCHEMA,
+
+ /**
+ * The second call to next() has been made and there is more
+ * data to deliver on subsequent calls.
+ */
+
+ RUN,
+
+ /**
+ * No more data to deliver.
+ */
+
+ END,
+
+ /**
+ * An error occurred. Operation was cancelled.
+ */
+
+ FAILED,
+
+ /**
+ * close() called and resources are released.
+ */
+
+ CLOSED }
+
+ private OperatorDriver.State state = State.START;
+
+ /**
+ * Operator context. The driver "owns" the context and is responsible
+ * for closing it.
+ */
+
+ private final OperatorContext opContext;
+ private final OperatorExec operatorExec;
+ private final BatchAccessor batchAccessor;
+ private int schemaVersion;
+
+ public OperatorDriver(OperatorContext opContext, OperatorExec opExec) {
+ this.opContext = opContext;
+ this.operatorExec = opExec;
+ batchAccessor = operatorExec.batchAccessor();
+ }
+
+ /**
+ * Get the next batch. Performs initialization on the first call.
+ * @return the iteration outcome to send downstream
+ */
+
+ public IterOutcome next() {
+ try {
+ switch (state) {
+ case START:
+ return start();
+ case RUN:
+ return doNext();
+ default:
+ OperatorRecordBatch.logger.debug("Extra call to next() in state "
+ state + ": " + operatorLabel());
+ return IterOutcome.NONE;
+ }
+ } catch (UserException e) {
+ cancelSilently();
+ state = State.FAILED;
+ throw e;
+ } catch (Throwable t) {
+ cancelSilently();
+ state = State.FAILED;
+ throw UserException.executionError(t)
+ .addContext("Exception thrown from", operatorLabel())
+ .build(OperatorRecordBatch.logger);
+ }
+ }
+
+ /**
+ * Cancels the operator before reaching EOF.
+ */
+
+ public void cancel() {
+ try {
+ switch (state) {
+ case START:
+ case RUN:
+ cancelSilently();
+ break;
+ default:
+ break;
+ }
+ } finally {
+ state = State.FAILED;
--- End diff --
Added a Cancelled state. But, nothing ever reads that state. The point of
FAILED is just to avoid confusion when calling `next()` after a failure or
cancellation.
We definitely *do not* want cancellation to move to the `CLOSED` state.
This is a bug that exists in several operators. If `cancel()` closes the
operator, then the operator is closed twice: once when the downstream operator
says it wants no more rows, and a second time when the fragment executor does
the real close.
So, the `FAILED` state marks that a) we won't return any more rows, but
that b) `close()` has not been called yet. The new `CANCELED` state has the
same semantics.
---