Github user ppadma commented on a diff in the pull request:
https://github.com/apache/drill/pull/1121#discussion_r168337843
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/protocol/OperatorDriver.java
---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.protocol;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+
+/**
+ * State machine that drives the operator executable. Converts
+ * between the iterator protocol and the operator executable protocol.
+ * Implemented as a separate class in anticipation of eventually
+ * changing the record batch (iterator) protocol.
+ */
+
+public class OperatorDriver {
+ public enum State { START, SCHEMA, RUN, END, FAILED, CLOSED }
+
+ private OperatorDriver.State state = State.START;
+
+ /**
+ * Operator context. The driver "owns" the context and is responsible
+ * for closing it.
+ */
+
+ private final OperatorContext opContext;
+ private final OperatorExec operatorExec;
+ private final BatchAccessor batchAccessor;
+ private int schemaVersion;
+
+ public OperatorDriver(OperatorContext opServicees, OperatorExec opExec) {
+ this.opContext = opServicees;
+ this.operatorExec = opExec;
+ batchAccessor = operatorExec.batchAccessor();
+ }
+
+ /**
+ * Get the next batch. Performs initialization on the first call.
+ * @return the iteration outcome to send downstream
+ */
+
+ public IterOutcome next() {
+ try {
+ switch (state) {
+ case START:
+ return start();
+ case RUN:
+ return doNext();
+ default:
+ OperatorRecordBatch.logger.debug("Extra call to next() in state "
+ state + ": " + operatorLabel());
+ return IterOutcome.NONE;
+ }
+ } catch (UserException e) {
+ cancelSilently();
+ state = State.FAILED;
+ throw e;
+ } catch (Throwable t) {
+ cancelSilently();
+ state = State.FAILED;
+ throw UserException.executionError(t)
+ .addContext("Exception thrown from", operatorLabel())
+ .build(OperatorRecordBatch.logger);
+ }
+ }
+
+ /**
+ * Cancels the operator before reaching EOF.
+ */
+
+ public void cancel() {
+ try {
+ switch (state) {
+ case START:
+ case RUN:
+ cancelSilently();
+ break;
+ default:
+ break;
+ }
+ } finally {
+ state = State.FAILED;
+ }
+ }
+
+ /**
+ * Start the operator executor. Bind it to the various contexts.
+ * Then start the executor and fetch the first schema.
+ * @return result of the first batch, which should contain
+ * only a schema, or EOF
+ */
+
+ private IterOutcome start() {
--- End diff --
would be good if we can capture what exceptions each of these functions
might throw.
---