[ 
https://issues.apache.org/jira/browse/DRILL-4132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15228779#comment-15228779
 ] 

ASF GitHub Bot commented on DRILL-4132:
---------------------------------------

Github user yufeldman commented on a diff in the pull request:

    https://github.com/apache/drill/pull/368#discussion_r58753681
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizerMultiPlans.java
 ---
    @@ -0,0 +1,222 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.planner.fragment;
    +
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +import org.apache.drill.common.exceptions.ExecutionSetupException;
    +import org.apache.drill.common.util.DrillStringUtils;
    +import org.apache.drill.exec.ops.QueryContext;
    +import org.apache.drill.exec.physical.base.Exchange;
    +import org.apache.drill.exec.physical.base.FragmentRoot;
    +import org.apache.drill.exec.physical.base.PhysicalOperator;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import 
org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
    +import org.apache.drill.exec.proto.BitControl.PlanFragment;
    +import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
    +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    +import org.apache.drill.exec.proto.UserBitShared.QueryId;
    +import org.apache.drill.exec.rpc.user.UserSession;
    +import org.apache.drill.exec.server.options.OptionList;
    +import org.apache.drill.exec.work.QueryWorkUnit;
    +import org.apache.drill.exec.work.foreman.ForemanSetupException;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +
    +/**
    + * SimpleParallelizerMultiPlans class is an extension to SimpleParallelizer
    + * to help with getting PlanFragments for split plan.
    + * Split plan is essentially ability to create multiple physical plans 
from a single logical plan
    + * to be able to run them separately.
    + * Moving functionality specific to splitting the plan to this class
    + * allows not to pollute parent class with non-authentic functionality
    + *
    + */
    +public class SimpleParallelizerMultiPlans extends SimpleParallelizer {
    +
    +  public SimpleParallelizerMultiPlans(QueryContext context) {
    +    super(context);
    +  }
    +
    +  /**
    +   * Create multiple physical plans from original query planning, it will 
allow execute them eventually independently
    +   * @param options
    +   * @param foremanNode
    +   * @param queryId
    +   * @param activeEndpoints
    +   * @param reader
    +   * @param rootFragment
    +   * @param session
    +   * @param queryContextInfo
    +   * @return
    +   * @throws ExecutionSetupException
    +   */
    +  public List<QueryWorkUnit> getSplitFragments(OptionList options, 
DrillbitEndpoint foremanNode, QueryId queryId,
    +      Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader 
reader, Fragment rootFragment,
    +      UserSession session, QueryContextInformation queryContextInfo) 
throws ExecutionSetupException {
    +
    +    final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, 
rootFragment);
    +
    +    return generateWorkUnits(
    +        options, foremanNode, queryId, reader, rootFragment, planningSet, 
session, queryContextInfo);
    +  }
    +
    +  /**
    +   * Split plan into multiple plans based on parallelization
    +   * Ideally it is applicable only to plans with two major fragments: 
Screen and UnionExchange
    +   * But there could be cases where we can remove even multiple exchanges 
like in case of "order by"
    +   * End goal is to get single major fragment: Screen with chain that ends 
up with a single minor fragment
    +   * from Leaf Exchange. This way each plan can run independently without 
any exchange involvement
    +   * @param options
    +   * @param foremanNode - not really applicable
    +   * @param queryId
    +   * @param reader
    +   * @param rootNode
    +   * @param planningSet
    +   * @param session
    +   * @param queryContextInfo
    +   * @return
    +   * @throws ExecutionSetupException
    +   */
    +  private List<QueryWorkUnit> generateWorkUnits(OptionList options, 
DrillbitEndpoint foremanNode, QueryId queryId,
    +      PhysicalPlanReader reader, Fragment rootNode, PlanningSet 
planningSet,
    +      UserSession session, QueryContextInformation queryContextInfo) 
throws ExecutionSetupException {
    +
    +    // now we generate all the individual plan fragments and associated 
assignments. Note, we need all endpoints
    +    // assigned before we can materialize, so we start a new loop here 
rather than utilizing the previous one.
    +
    +    List<QueryWorkUnit> workUnits = Lists.newArrayList();
    +    int plansCount = 0;
    +    DrillbitEndpoint[] endPoints = null;
    +    long initialAllocation = 0;
    +    long maxAllocation = 0;
    +
    +    final Iterator<Wrapper> iter = planningSet.iterator();
    +    while (iter.hasNext()) {
    +      Wrapper wrapper = iter.next();
    +      Fragment node = wrapper.getNode();
    +      boolean isLeafFragment = node.getReceivingExchangePairs().size() == 
0;
    +      final PhysicalOperator physicalOperatorRoot = node.getRoot();
    +      // get all the needed info from leaf fragment
    +      if ( (physicalOperatorRoot instanceof Exchange) &&  isLeafFragment) {
    +        // need to get info about
    +        // number of minor fragments
    +        // assignedEndPoints
    +        // allocation
    +        plansCount = wrapper.getWidth();
    +        initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? 
wrapper.getInitialAllocation()/plansCount : 0;
    +        maxAllocation = (wrapper.getMaxAllocation() != 0 ) ? 
wrapper.getMaxAllocation()/plansCount : 0;
    +        endPoints = new DrillbitEndpoint[plansCount];
    +        for (int mfId = 0; mfId < plansCount; mfId++) {
    +          endPoints[mfId] = wrapper.getAssignedEndpoint(mfId);
    +        }
    +      }
    +    }
    +    if ( plansCount == 0 ) {
    +      // no exchange, return list of single QueryWorkUnit
    +      workUnits.add(generateWorkUnit(options, foremanNode, queryId, 
reader, rootNode, planningSet, session, queryContextInfo));
    +      return workUnits;
    +    }
    +
    +    for (Wrapper wrapper : planningSet) {
    +      Fragment node = wrapper.getNode();
    +      final PhysicalOperator physicalOperatorRoot = node.getRoot();
    +      if ( physicalOperatorRoot instanceof Exchange ) {
    +        // get to 0 MajorFragment
    +        continue;
    +      }
    +      boolean isRootNode = rootNode == node;
    +
    +      if (isRootNode && wrapper.getWidth() != 1) {
    +        throw new ForemanSetupException(String.format("Failure while 
trying to setup fragment. " +
    +                "The root fragment must always have parallelization one. 
In the current case, the width was set to %d.",
    +                wrapper.getWidth()));
    +      }
    +      // this fragment is always leaf, as we are removing all the exchanges
    +      boolean isLeafFragment = true;
    +
    +      // Create a minorFragment for each major fragment.
    +      for (int minorFragmentId = 0; minorFragmentId < plansCount; 
minorFragmentId++) {
    +        // those fragments should be empty
    +        List<PlanFragment> fragments = Lists.newArrayList();
    +
    +        PlanFragment rootFragment = null;
    +        FragmentRoot rootOperator = null;
    +
    +        IndexedFragmentNode iNode = new 
IndexedFragmentNode(minorFragmentId, wrapper);
    +        wrapper.resetAllocation();
    +        // two visitors here
    +        // 1. To remove exchange
    +        // 2. To reset operator IDs as exchanges were removed
    +        PhysicalOperator op = 
physicalOperatorRoot.accept(ExchangeManipulatorMaterializerVisitor.INSTANCE, 
iNode).
    --- End diff --
    
    Why ExchangeRemoverVisitor would not drop any exchange? It does drop in any 
case. Decision on whether to drop or not is done before that visitor is called


> Ability to submit simple type of physical plan directly to EndPoint DrillBit 
> for execution
> ------------------------------------------------------------------------------------------
>
>                 Key: DRILL-4132
>                 URL: https://issues.apache.org/jira/browse/DRILL-4132
>             Project: Apache Drill
>          Issue Type: New Feature
>          Components: Execution - Flow, Execution - RPC, Query Planning & 
> Optimization
>            Reporter: Yuliya Feldman
>            Assignee: Yuliya Feldman
>
> Today Drill Query execution is optimistic and stateful (at least due to data 
> exchanges) - if any of the stages of query execution fails whole query fails. 
> If query is just simple scan, filter push down and project where no data 
> exchange happens between DrillBits there is no need to fail whole query when 
> one DrillBit fails, as minor fragments running on that DrillBit can be rerun 
> on the other DrillBit. There are probably multiple ways to achieve this. This 
> JIRA is to open discussion on: 
> 1. agreement that we need to support above use case 
> 2. means of achieving it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to