Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1051#discussion_r159131327
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---
    @@ -502,33 +438,82 @@ private void runFragment(List<PlanFragment> 
fragmentsList) throws ExecutionSetup
           }
         }
     
    +    assert rootFragment != null;
    +
         final FragmentRoot rootOperator;
         try {
           rootOperator = 
drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
         } catch (IOException e) {
           throw new ExecutionSetupException(String.format("Unable to parse 
FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
         }
         queryRM.setCost(rootOperator.getCost());
    -    admit(null);
    -    drillbitContext.getWorkBus().addFragmentStatusListener(queryId, 
queryManager.getFragmentStatusListener());
    -    
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
     
    -    logger.debug("Submitting fragments to run.");
    +    fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, 
rootOperator);
     
    -    // set up the root fragment first so we'll have incoming buffers 
available.
    -    setupRootFragment(rootFragment, rootOperator);
    +    startQueryProcessing();
    +  }
     
    -    setupNonRootFragments(planFragments);
    +  /**
    +   * Enqueues the query and once enqueued, starts sending out query 
fragments for further execution.
    +   * Moves query to RUNNING state.
    +   */
    +  private void startQueryProcessing() {
    +    enqueue();
    +    runFragments();
    +    queryStateProcessor.moveToState(QueryState.RUNNING, null);
    +  }
     
    -    moveToState(QueryState.RUNNING, null);
    -    logger.debug("Fragments running.");
    +  /**
    +   * Move query to ENQUEUED state. Enqueues query if queueing is enabled.
    +   * Foreman run will be blocked until query is enqueued.
    +   * In case of failures (ex: queue timeout exception) will move query to 
FAILED state.
    +   */
    +  private void enqueue() {
    +    queryStateProcessor.moveToState(QueryState.ENQUEUED, null);
    +
    +    try {
    +      queryRM.admit();
    +      queryStateProcessor.moveToState(QueryState.STARTING, null);
    +    } catch (QueueTimeoutException | QueryQueueException e) {
    +      queryStateProcessor.moveToState(QueryState.FAILED, e);
    +    } finally {
    +      String queueName = queryRM.queueName();
    +      queryManager.setQueueName(queueName == null ? "Unknown" : queueName);
    --- End diff --
    
    Alright. A better design is to have a state machine with events, each 
state/event pair is tied to a function that checks/implements that transition. 
The code now is a bit of a muddle. But, since doing that is just an 
improvement, not a bug fix, I agree with considering such changes as a separate 
JIRA.


---

Reply via email to