http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java new file mode 100644 index 0000000..2443139 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStateProcessor.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import com.codahale.metrics.Counter; +import org.apache.drill.common.EventProcessor; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.metrics.DrillMetrics; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.work.foreman.Foreman.ForemanResult; + +/** + * Is responsible for query transition from one state to another, + * incrementing / decrementing query statuses counters. + */ +public class QueryStateProcessor implements AutoCloseable { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStateProcessor.class); + + private static final Counter planningQueries = DrillMetrics.getRegistry().counter("drill.queries.planning"); + private static final Counter enqueuedQueries = DrillMetrics.getRegistry().counter("drill.queries.enqueued"); + private static final Counter runningQueries = DrillMetrics.getRegistry().counter("drill.queries.running"); + private static final Counter completedQueries = DrillMetrics.getRegistry().counter("drill.queries.completed"); + private static final Counter succeededQueries = DrillMetrics.getRegistry().counter("drill.queries.succeeded"); + private static final Counter failedQueries = DrillMetrics.getRegistry().counter("drill.queries.failed"); + private static final Counter canceledQueries = DrillMetrics.getRegistry().counter("drill.queries.canceled"); + + private final StateSwitch stateSwitch = new StateSwitch(); + + private final String queryIdString; + private final QueryManager queryManager; + private final DrillbitContext drillbitContext; + private final ForemanResult foremanResult; + + private volatile QueryState state; + + public QueryStateProcessor(String queryIdString, QueryManager queryManager, DrillbitContext drillbitContext, ForemanResult foremanResult) { + this.queryIdString = queryIdString; + this.queryManager = queryManager; + this.drillbitContext = drillbitContext; + this.foremanResult = foremanResult; + // initial query state is PREPARING + this.state = QueryState.PREPARING; + } + + /** + * @return current query state + */ + public QueryState getState() { + return state; + } + + /** + * Moves one query state to another, will fail when requested query state transition is not allowed. + * + * @param newState new query state + * @param exception exception if failure occurred + */ + public synchronized void moveToState(QueryState newState, Exception exception) { + logger.debug(queryIdString + ": State change requested {} --> {}", state, newState); + + switch (state) { + case PREPARING: + preparing(newState, exception); + return; + case PLANNING: + planning(newState, exception); + return; + case ENQUEUED: + enqueued(newState, exception); + return; + case STARTING: + starting(newState, exception); + return; + case RUNNING: + running(newState, exception); + return; + case CANCELLATION_REQUESTED: + cancellationRequested(newState, exception); + return; + case CANCELED: + case COMPLETED: + case FAILED: + logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state); + return; + } + + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); + } + + /** + * Directly moves query from one state to another and updates ephemeral query store. + * + * @param newState new query state + */ + public void recordNewState(final QueryState newState) { + state = newState; + queryManager.updateEphemeralState(newState); + } + + /** + * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be terminated. + * For preparing, planning and enqueued states we cancel immediately since these states are done locally. + * + * Note this can be called from outside of run() on another thread, or after run() completes + */ + public void cancel() { + switch (state) { + case PREPARING: + case PLANNING: + case ENQUEUED: + moveToState(QueryState.CANCELLATION_REQUESTED, null); + return; + + case STARTING: + case RUNNING: + addToEventQueue(QueryState.CANCELLATION_REQUESTED, null); + return; + + case CANCELLATION_REQUESTED: + case CANCELED: + case COMPLETED: + case FAILED: + // nothing to do + return; + + default: + throw new IllegalStateException("Unable to cancel the query. Unexpected query state -> " + state); + } + } + + /** + * Tells the foreman to move to a new state.<br> + * This will be added to the end of the event queue and will be processed once the foreman is ready + * to accept external events. + * + * @param newState the state to move to + * @param exception if not null, the exception that drove this state transition (usually a failure) + */ + public void addToEventQueue(final QueryState newState, final Exception exception) { + stateSwitch.addEvent(newState, exception); + } + + /** + * Starts processing all events that were enqueued while all fragments were sending out. + */ + public void startProcessingEvents() { + try { + stateSwitch.start(); + } catch (Exception ex) { + moveToState(QueryState.FAILED, ex); + } + } + + /** + * On close set proper increment / decrement counters depending on final query state. + */ + @Override + public void close() { + queryManager.markEndTime(); + + switch (state) { + case FAILED: + failedQueries.inc(); + break; + case CANCELED: + canceledQueries.inc(); + break; + case COMPLETED: + succeededQueries.inc(); + break; + } + + runningQueries.dec(); + completedQueries.inc(); + } + + + private void preparing(final QueryState newState, final Exception exception) { + switch (newState) { + case PLANNING: + queryManager.markStartTime(); + runningQueries.inc(); + + recordNewState(newState); + planningQueries.inc(); + return; + case CANCELLATION_REQUESTED: + wrapUpCancellation(); + return; + } + checkCommonStates(newState, exception); + } + + private void planning(final QueryState newState, final Exception exception) { + planningQueries.dec(); + queryManager.markPlanningEndTime(); + switch (newState) { + case ENQUEUED: + recordNewState(newState); + enqueuedQueries.inc(); + return; + case CANCELLATION_REQUESTED: + wrapUpCancellation(); + return; + } + checkCommonStates(newState, exception); + } + + private void enqueued(final QueryState newState, final Exception exception) { + enqueuedQueries.dec(); + queryManager.markQueueWaitEndTime(); + switch (newState) { + case STARTING: + recordNewState(newState); + return; + case CANCELLATION_REQUESTED: + wrapUpCancellation(); + return; + } + checkCommonStates(newState, exception); + } + + private void starting(final QueryState newState, final Exception exception) { + switch (newState) { + case RUNNING: + recordNewState(QueryState.RUNNING); + return; + case COMPLETED: + wrapUpCompletion(); + case CANCELLATION_REQUESTED: + // since during starting state fragments are sent to the remote nodes, + // we don't want to cancel until they all are sent out + addToEventQueue(QueryState.CANCELLATION_REQUESTED, null); + return; + } + + checkCommonStates(newState, exception); + } + + private void running(final QueryState newState, final Exception exception) { + /* + * For cases that cancel executing fragments, we have to record the new + * state first, because the cancellation of the local root fragment will + * cause this to be called recursively. + */ + switch (newState) { + case CANCELLATION_REQUESTED: { + assert exception == null; + recordNewState(QueryState.CANCELLATION_REQUESTED); + queryManager.cancelExecutingFragments(drillbitContext); + foremanResult.setCompleted(QueryState.CANCELED); + /* + * We don't close the foremanResult until we've gotten + * acknowledgments, which happens below in the case for current state + * == CANCELLATION_REQUESTED. + */ + return; + } + + case COMPLETED: { + wrapUpCompletion(); + return; + } + } + checkCommonStates(newState, exception); + } + + private void cancellationRequested(final QueryState newState, final Exception exception) { + switch (newState) { + case FAILED: + if (drillbitContext.getConfig().getBoolean(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS)) { + assert exception != null; + recordNewState(QueryState.FAILED); + foremanResult.setForceFailure(exception); + } + + // proceed + + case CANCELED: + case COMPLETED: + /* + * These amount to a completion of the cancellation requests' cleanup; + * now we can clean up and send the result. + */ + foremanResult.close(); + return; + } + + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); + } + + private void wrapUpCancellation() { + recordNewState(QueryState.CANCELLATION_REQUESTED); + foremanResult.setCompleted(QueryState.CANCELED); + } + + private void wrapUpCompletion() { + recordNewState(QueryState.COMPLETED); + foremanResult.setCompleted(QueryState.COMPLETED); + foremanResult.close(); + } + + private void checkCommonStates(final QueryState newState, final Exception exception) { + switch (newState) { + case FAILED: + assert exception != null; + recordNewState(QueryState.FAILED); + queryManager.cancelExecutingFragments(drillbitContext); + foremanResult.setFailed(exception); + foremanResult.close(); + return; + } + + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); + } + + private class StateEvent { + final QueryState newState; + final Exception exception; + + StateEvent(final QueryState newState, final Exception exception) { + this.newState = newState; + this.exception = exception; + } + } + + private class StateSwitch extends EventProcessor<StateEvent> { + public void addEvent(final QueryState newState, final Exception exception) { + sendEvent(new StateEvent(newState, exception)); + } + + @Override + protected void processEvent(final StateEvent event) { + moveToState(event.newState, event.exception); + } + } + +}
http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index a0cf643..cb66ca3 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -306,7 +306,7 @@ drill.exec: { size: 2, // Maximum wait time in the queue before the query times out and // fails. - timeout: 5000 // 5 seconds + timeout_ms: 5000 // 5 seconds } } memory: { http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java index 956cfc4..ec101d8 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.math3.util.Pair; +import org.apache.drill.exec.work.foreman.FragmentsRunner; import org.apache.drill.test.BaseTestQuery; import org.apache.drill.test.QueryTestUtil; import org.apache.drill.SingleRowListener; @@ -757,7 +758,7 @@ public class TestDrillbitResilience extends DrillTest { final String exceptionDesc = "send-fragments"; final Class<? extends Throwable> exceptionClass = ForemanException.class; final String controls = Controls.newBuilder() - .addException(Foreman.class, exceptionDesc, exceptionClass) + .addException(FragmentsRunner.class, exceptionDesc, exceptionClass) .build(); assertFailsWithException(controls, exceptionClass, exceptionDesc); http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index 51cdab7..edc401c 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -10377,6 +10377,22 @@ public final class UserBitShared { * </pre> */ ENQUEUED(6, 6), + /** + * <code>PREPARING = 7;</code> + * + * <pre> + * query is at preparation stage, foreman is initializing + * </pre> + */ + PREPARING(7, 7), + /** + * <code>PLANNING = 8;</code> + * + * <pre> + * query is at planning stage (includes logical or / and physical planning) + * </pre> + */ + PLANNING(8, 8), ; /** @@ -10427,6 +10443,22 @@ public final class UserBitShared { * </pre> */ public static final int ENQUEUED_VALUE = 6; + /** + * <code>PREPARING = 7;</code> + * + * <pre> + * query is at preparation stage, foreman is initializing + * </pre> + */ + public static final int PREPARING_VALUE = 7; + /** + * <code>PLANNING = 8;</code> + * + * <pre> + * query is at planning stage (includes logical or / and physical planning) + * </pre> + */ + public static final int PLANNING_VALUE = 8; public final int getNumber() { return value; } @@ -10440,6 +10472,8 @@ public final class UserBitShared { case 4: return FAILED; case 5: return CANCELLATION_REQUESTED; case 6: return ENQUEUED; + case 7: return PREPARING; + case 8: return PLANNING; default: return null; } } @@ -23942,92 +23976,93 @@ public final class UserBitShared { "ield\022\023\n\013value_count\030\004 \001(\005\022\027\n\017var_byte_le" + "ngth\030\005 \001(\005\022\025\n\rbuffer_length\030\007 \001(\005\"7\n\nNod" + "eStatus\022\017\n\007node_id\030\001 \001(\005\022\030\n\020memory_footp" + - "rint\030\002 \001(\003\"\225\002\n\013QueryResult\0228\n\013query_stat" + + "rint\030\002 \001(\003\"\263\002\n\013QueryResult\0228\n\013query_stat" + "e\030\001 \001(\0162#.exec.shared.QueryResult.QueryS", "tate\022&\n\010query_id\030\002 \001(\0132\024.exec.shared.Que" + "ryId\022(\n\005error\030\003 \003(\0132\031.exec.shared.DrillP" + - "BError\"z\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007RU" + - "NNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n\006" + - "FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n\010" + - "ENQUEUED\020\006\"p\n\tQueryData\022&\n\010query_id\030\001 \001(" + - "\0132\024.exec.shared.QueryId\022\021\n\trow_count\030\002 \001" + - "(\005\022(\n\003def\030\003 \001(\0132\033.exec.shared.RecordBatc" + - "hDef\"\330\001\n\tQueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005sta" + - "rt\030\002 \001(\003\0222\n\005state\030\003 \001(\0162#.exec.shared.Qu", - "eryResult.QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n" + - "\007foreman\030\005 \001(\0132\026.exec.DrillbitEndpoint\022\024" + - "\n\014options_json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001" + - "\022\025\n\nqueue_name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile" + - "\022 \n\002id\030\001 \001(\0132\024.exec.shared.QueryId\022$\n\004ty" + - "pe\030\002 \001(\0162\026.exec.shared.QueryType\022\r\n\005star" + - "t\030\003 \001(\003\022\013\n\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004p" + - "lan\030\006 \001(\t\022\'\n\007foreman\030\007 \001(\0132\026.exec.Drillb" + - "itEndpoint\0222\n\005state\030\010 \001(\0162#.exec.shared." + - "QueryResult.QueryState\022\027\n\017total_fragment", - "s\030\t \001(\005\022\032\n\022finished_fragments\030\n \001(\005\022;\n\020f" + - "ragment_profile\030\013 \003(\0132!.exec.shared.Majo" + - "rFragmentProfile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005err" + - "or\030\r \001(\t\022\024\n\014verboseError\030\016 \001(\t\022\020\n\010error_" + - "id\030\017 \001(\t\022\022\n\nerror_node\030\020 \001(\t\022\024\n\014options_" + - "json\030\021 \001(\t\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWait" + - "End\030\023 \001(\003\022\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_n" + - "ame\030\025 \001(\t:\001-\"t\n\024MajorFragmentProfile\022\031\n\021" + - "major_fragment_id\030\001 \001(\005\022A\n\026minor_fragmen" + - "t_profile\030\002 \003(\0132!.exec.shared.MinorFragm", - "entProfile\"\350\002\n\024MinorFragmentProfile\022)\n\005s" + - "tate\030\001 \001(\0162\032.exec.shared.FragmentState\022(" + - "\n\005error\030\002 \001(\0132\031.exec.shared.DrillPBError" + - "\022\031\n\021minor_fragment_id\030\003 \001(\005\0226\n\020operator_" + - "profile\030\004 \003(\0132\034.exec.shared.OperatorProf" + - "ile\022\022\n\nstart_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(" + - "\003\022\023\n\013memory_used\030\007 \001(\003\022\027\n\017max_memory_use" + - "d\030\010 \001(\003\022(\n\010endpoint\030\t \001(\0132\026.exec.Drillbi" + - "tEndpoint\022\023\n\013last_update\030\n \001(\003\022\025\n\rlast_p" + - "rogress\030\013 \001(\003\"\377\001\n\017OperatorProfile\0221\n\rinp", - "ut_profile\030\001 \003(\0132\032.exec.shared.StreamPro" + - "file\022\023\n\013operator_id\030\003 \001(\005\022\025\n\roperator_ty" + - "pe\030\004 \001(\005\022\023\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess" + - "_nanos\030\006 \001(\003\022#\n\033peak_local_memory_alloca" + - "ted\030\007 \001(\003\022(\n\006metric\030\010 \003(\0132\030.exec.shared." + - "MetricValue\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStrea" + - "mProfile\022\017\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001" + - "(\003\022\017\n\007schemas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tme" + - "tric_id\030\001 \001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014dou" + - "ble_value\030\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\013", - "2\020.exec.shared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022" + - "\032\n\022function_signature\030\002 \003(\t\"W\n\013SaslMessa" + - "ge\022\021\n\tmechanism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006s" + - "tatus\030\003 \001(\0162\027.exec.shared.SaslStatus*5\n\n" + - "RpcChannel\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020" + - "\001\022\010\n\004USER\020\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOG" + - "ICAL\020\002\022\014\n\010PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022P" + - "REPARED_STATEMENT\020\005*\207\001\n\rFragmentState\022\013\n" + - "\007SENDING\020\000\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007R" + - "UNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n", - "\006FAILED\020\005\022\032\n\026CANCELLATION_REQUESTED\020\006*\360\005" + - "\n\020CoreOperatorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n" + - "\020BROADCAST_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_" + - "AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN" + - "\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007" + - "\022\024\n\020MERGING_RECEIVER\020\010\022\034\n\030ORDERED_PARTIT" + - "ION_SENDER\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_R" + - "ECEIVER\020\013\022\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022" + - "\034\n\030SELECTION_VECTOR_REMOVER\020\016\022\027\n\023STREAMI" + - "NG_AGGREGATE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERN", - "AL_SORT\020\021\022\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_S" + - "ORT\020\024\022\032\n\026PARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIV" + - "E_SUB_SCAN\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rM" + - "OCK_SUB_SCAN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DI" + - "RECT_SUB_SCAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT" + - "_SUB_SCAN\020\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_S" + - "CHEMA_SUB_SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n" + - "\021PRODUCER_CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!" + - "\022\n\n\006WINDOW\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAV" + - "RO_SUB_SCAN\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSasl", - "Status\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001" + - "\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003" + - "\022\017\n\013SASL_FAILED\020\004B.\n\033org.apache.drill.ex" + - "ec.protoB\rUserBitSharedH\001" + "BError\"\227\001\n\nQueryState\022\014\n\010STARTING\020\000\022\013\n\007R" + + "UNNING\020\001\022\r\n\tCOMPLETED\020\002\022\014\n\010CANCELED\020\003\022\n\n" + + "\006FAILED\020\004\022\032\n\026CANCELLATION_REQUESTED\020\005\022\014\n" + + "\010ENQUEUED\020\006\022\r\n\tPREPARING\020\007\022\014\n\010PLANNING\020\010" + + "\"p\n\tQueryData\022&\n\010query_id\030\001 \001(\0132\024.exec.s" + + "hared.QueryId\022\021\n\trow_count\030\002 \001(\005\022(\n\003def\030" + + "\003 \001(\0132\033.exec.shared.RecordBatchDef\"\330\001\n\tQ" + + "ueryInfo\022\r\n\005query\030\001 \001(\t\022\r\n\005start\030\002 \001(\003\0222", + "\n\005state\030\003 \001(\0162#.exec.shared.QueryResult." + + "QueryState\022\017\n\004user\030\004 \001(\t:\001-\022\'\n\007foreman\030\005" + + " \001(\0132\026.exec.DrillbitEndpoint\022\024\n\014options_" + + "json\030\006 \001(\t\022\022\n\ntotal_cost\030\007 \001(\001\022\025\n\nqueue_" + + "name\030\010 \001(\t:\001-\"\242\004\n\014QueryProfile\022 \n\002id\030\001 \001" + + "(\0132\024.exec.shared.QueryId\022$\n\004type\030\002 \001(\0162\026" + + ".exec.shared.QueryType\022\r\n\005start\030\003 \001(\003\022\013\n" + + "\003end\030\004 \001(\003\022\r\n\005query\030\005 \001(\t\022\014\n\004plan\030\006 \001(\t\022" + + "\'\n\007foreman\030\007 \001(\0132\026.exec.DrillbitEndpoint" + + "\0222\n\005state\030\010 \001(\0162#.exec.shared.QueryResul", + "t.QueryState\022\027\n\017total_fragments\030\t \001(\005\022\032\n" + + "\022finished_fragments\030\n \001(\005\022;\n\020fragment_pr" + + "ofile\030\013 \003(\0132!.exec.shared.MajorFragmentP" + + "rofile\022\017\n\004user\030\014 \001(\t:\001-\022\r\n\005error\030\r \001(\t\022\024" + + "\n\014verboseError\030\016 \001(\t\022\020\n\010error_id\030\017 \001(\t\022\022" + + "\n\nerror_node\030\020 \001(\t\022\024\n\014options_json\030\021 \001(\t" + + "\022\017\n\007planEnd\030\022 \001(\003\022\024\n\014queueWaitEnd\030\023 \001(\003\022" + + "\022\n\ntotal_cost\030\024 \001(\001\022\025\n\nqueue_name\030\025 \001(\t:" + + "\001-\"t\n\024MajorFragmentProfile\022\031\n\021major_frag" + + "ment_id\030\001 \001(\005\022A\n\026minor_fragment_profile\030", + "\002 \003(\0132!.exec.shared.MinorFragmentProfile" + + "\"\350\002\n\024MinorFragmentProfile\022)\n\005state\030\001 \001(\016" + + "2\032.exec.shared.FragmentState\022(\n\005error\030\002 " + + "\001(\0132\031.exec.shared.DrillPBError\022\031\n\021minor_" + + "fragment_id\030\003 \001(\005\0226\n\020operator_profile\030\004 " + + "\003(\0132\034.exec.shared.OperatorProfile\022\022\n\nsta" + + "rt_time\030\005 \001(\003\022\020\n\010end_time\030\006 \001(\003\022\023\n\013memor" + + "y_used\030\007 \001(\003\022\027\n\017max_memory_used\030\010 \001(\003\022(\n" + + "\010endpoint\030\t \001(\0132\026.exec.DrillbitEndpoint\022" + + "\023\n\013last_update\030\n \001(\003\022\025\n\rlast_progress\030\013 ", + "\001(\003\"\377\001\n\017OperatorProfile\0221\n\rinput_profile" + + "\030\001 \003(\0132\032.exec.shared.StreamProfile\022\023\n\013op" + + "erator_id\030\003 \001(\005\022\025\n\roperator_type\030\004 \001(\005\022\023" + + "\n\013setup_nanos\030\005 \001(\003\022\025\n\rprocess_nanos\030\006 \001" + + "(\003\022#\n\033peak_local_memory_allocated\030\007 \001(\003\022" + + "(\n\006metric\030\010 \003(\0132\030.exec.shared.MetricValu" + + "e\022\022\n\nwait_nanos\030\t \001(\003\"B\n\rStreamProfile\022\017" + + "\n\007records\030\001 \001(\003\022\017\n\007batches\030\002 \001(\003\022\017\n\007sche" + + "mas\030\003 \001(\003\"J\n\013MetricValue\022\021\n\tmetric_id\030\001 " + + "\001(\005\022\022\n\nlong_value\030\002 \001(\003\022\024\n\014double_value\030", + "\003 \001(\001\")\n\010Registry\022\035\n\003jar\030\001 \003(\0132\020.exec.sh" + + "ared.Jar\"/\n\003Jar\022\014\n\004name\030\001 \001(\t\022\032\n\022functio" + + "n_signature\030\002 \003(\t\"W\n\013SaslMessage\022\021\n\tmech" + + "anism\030\001 \001(\t\022\014\n\004data\030\002 \001(\014\022\'\n\006status\030\003 \001(" + + "\0162\027.exec.shared.SaslStatus*5\n\nRpcChannel" + + "\022\017\n\013BIT_CONTROL\020\000\022\014\n\010BIT_DATA\020\001\022\010\n\004USER\020" + + "\002*V\n\tQueryType\022\007\n\003SQL\020\001\022\013\n\007LOGICAL\020\002\022\014\n\010" + + "PHYSICAL\020\003\022\r\n\tEXECUTION\020\004\022\026\n\022PREPARED_ST" + + "ATEMENT\020\005*\207\001\n\rFragmentState\022\013\n\007SENDING\020\000" + + "\022\027\n\023AWAITING_ALLOCATION\020\001\022\013\n\007RUNNING\020\002\022\014", + "\n\010FINISHED\020\003\022\r\n\tCANCELLED\020\004\022\n\n\006FAILED\020\005\022" + + "\032\n\026CANCELLATION_REQUESTED\020\006*\360\005\n\020CoreOper" + + "atorType\022\021\n\rSINGLE_SENDER\020\000\022\024\n\020BROADCAST" + + "_SENDER\020\001\022\n\n\006FILTER\020\002\022\022\n\016HASH_AGGREGATE\020" + + "\003\022\r\n\tHASH_JOIN\020\004\022\016\n\nMERGE_JOIN\020\005\022\031\n\025HASH" + + "_PARTITION_SENDER\020\006\022\t\n\005LIMIT\020\007\022\024\n\020MERGIN" + + "G_RECEIVER\020\010\022\034\n\030ORDERED_PARTITION_SENDER" + + "\020\t\022\013\n\007PROJECT\020\n\022\026\n\022UNORDERED_RECEIVER\020\013\022" + + "\020\n\014RANGE_SENDER\020\014\022\n\n\006SCREEN\020\r\022\034\n\030SELECTI" + + "ON_VECTOR_REMOVER\020\016\022\027\n\023STREAMING_AGGREGA", + "TE\020\017\022\016\n\nTOP_N_SORT\020\020\022\021\n\rEXTERNAL_SORT\020\021\022" + + "\t\n\005TRACE\020\022\022\t\n\005UNION\020\023\022\014\n\010OLD_SORT\020\024\022\032\n\026P" + + "ARQUET_ROW_GROUP_SCAN\020\025\022\021\n\rHIVE_SUB_SCAN" + + "\020\026\022\025\n\021SYSTEM_TABLE_SCAN\020\027\022\021\n\rMOCK_SUB_SC" + + "AN\020\030\022\022\n\016PARQUET_WRITER\020\031\022\023\n\017DIRECT_SUB_S" + + "CAN\020\032\022\017\n\013TEXT_WRITER\020\033\022\021\n\rTEXT_SUB_SCAN\020" + + "\034\022\021\n\rJSON_SUB_SCAN\020\035\022\030\n\024INFO_SCHEMA_SUB_" + + "SCAN\020\036\022\023\n\017COMPLEX_TO_JSON\020\037\022\025\n\021PRODUCER_" + + "CONSUMER\020 \022\022\n\016HBASE_SUB_SCAN\020!\022\n\n\006WINDOW" + + "\020\"\022\024\n\020NESTED_LOOP_JOIN\020#\022\021\n\rAVRO_SUB_SCA", + "N\020$\022\021\n\rPCAP_SUB_SCAN\020%*g\n\nSaslStatus\022\020\n\014" + + "SASL_UNKNOWN\020\000\022\016\n\nSASL_START\020\001\022\024\n\020SASL_I" + + "N_PROGRESS\020\002\022\020\n\014SASL_SUCCESS\020\003\022\017\n\013SASL_F" + + "AILED\020\004B.\n\033org.apache.drill.exec.protoB\r" + + "UserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java ---------------------------------------------------------------------- diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java index 7b2a273..a53dc42 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/QueryResult.java @@ -43,7 +43,9 @@ public final class QueryResult implements Externalizable, Message<QueryResult>, CANCELED(3), FAILED(4), CANCELLATION_REQUESTED(5), - ENQUEUED(6); + ENQUEUED(6), + PREPARING(7), + PLANNING(8); public final int number; @@ -68,6 +70,8 @@ public final class QueryResult implements Externalizable, Message<QueryResult>, case 4: return FAILED; case 5: return CANCELLATION_REQUESTED; case 6: return ENQUEUED; + case 7: return PREPARING; + case 8: return PLANNING; default: return null; } } http://git-wip-us.apache.org/repos/asf/drill/blob/03435183/protocol/src/main/protobuf/UserBitShared.proto ---------------------------------------------------------------------- diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index 086b98a..205611b 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -167,6 +167,8 @@ message QueryResult { FAILED = 4; CANCELLATION_REQUESTED = 5; // cancellation has been requested, and is being processed ENQUEUED = 6; // query has been enqueued. this is pre-starting. + PREPARING = 7; // query is at preparation stage, foreman is initializing + PLANNING = 8; // query is at planning stage (includes logical or / and physical planning) } optional QueryState query_state = 1;
