Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/1051#discussion_r159131327
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
@@ -502,33 +438,82 @@ private void runFragment(List<PlanFragment>
fragmentsList) throws ExecutionSetup
}
}
+ assert rootFragment != null;
+
final FragmentRoot rootOperator;
try {
rootOperator =
drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse
FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
queryRM.setCost(rootOperator.getCost());
- admit(null);
- drillbitContext.getWorkBus().addFragmentStatusListener(queryId,
queryManager.getFragmentStatusListener());
-
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
- logger.debug("Submitting fragments to run.");
+ fragmentsRunner.setFragmentsInfo(planFragments, rootFragment,
rootOperator);
- // set up the root fragment first so we'll have incoming buffers
available.
- setupRootFragment(rootFragment, rootOperator);
+ startQueryProcessing();
+ }
- setupNonRootFragments(planFragments);
+ /**
+ * Enqueues the query and once enqueued, starts sending out query
fragments for further execution.
+ * Moves query to RUNNING state.
+ */
+ private void startQueryProcessing() {
+ enqueue();
+ runFragments();
+ queryStateProcessor.moveToState(QueryState.RUNNING, null);
+ }
- moveToState(QueryState.RUNNING, null);
- logger.debug("Fragments running.");
+ /**
+ * Move query to ENQUEUED state. Enqueues query if queueing is enabled.
+ * Foreman run will be blocked until query is enqueued.
+ * In case of failures (ex: queue timeout exception) will move query to
FAILED state.
+ */
+ private void enqueue() {
+ queryStateProcessor.moveToState(QueryState.ENQUEUED, null);
+
+ try {
+ queryRM.admit();
+ queryStateProcessor.moveToState(QueryState.STARTING, null);
+ } catch (QueueTimeoutException | QueryQueueException e) {
+ queryStateProcessor.moveToState(QueryState.FAILED, e);
+ } finally {
+ String queueName = queryRM.queueName();
+ queryManager.setQueueName(queueName == null ? "Unknown" : queueName);
--- End diff --
Alright. A better design is to have a state machine with events, each
state/event pair is tied to a function that checks/implements that transition.
The code now is a bit of a muddle. But, since doing that is just an
improvement, not a bug fix, I agree with considering such changes as a separate
JIRA.
---