Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156191898
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements 
AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = 
Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this 
context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is 
for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new 
Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for 
the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new 
StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> 
constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final 
PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws 
ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final 
PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final 
FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new 
AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), 
fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", 
fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) 
{
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = 
dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), 
OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan 
options.", e);
    -      }
    -    }
    -    fragmentOptions = new 
FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, 
dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all 
the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + 
getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory 
allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec 
(DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, 
UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws 
ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
    --- End diff --
    
    Only in network version. Actually, probably should not be part of the 
fragment context itself, but rather part of the mechanism that boots up a 
fragment. (Once the fragment is running, we won't read the plan again.)


---

Reply via email to