Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1045#discussion_r156192365
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---
    @@ -17,483 +17,216 @@
      */
     package org.apache.drill.exec.ops;
     
    +import java.io.IOException;
    +import java.util.Collection;
     import java.util.List;
    -import java.util.Map;
    -import java.util.concurrent.Executor;
    +import java.util.concurrent.ExecutorService;
     
    +import com.google.common.annotations.VisibleForTesting;
     import org.apache.calcite.schema.SchemaPlus;
     import org.apache.drill.common.config.DrillConfig;
    -import org.apache.drill.common.exceptions.ExecutionSetupException;
    -import org.apache.drill.common.exceptions.UserException;
    -import org.apache.drill.common.types.TypeProtos.MinorType;
    -import org.apache.drill.exec.ExecConstants;
     import org.apache.drill.exec.compile.CodeCompiler;
    -import org.apache.drill.exec.exception.OutOfMemoryException;
    +import org.apache.drill.exec.coord.ClusterCoordinator;
    +import org.apache.drill.exec.exception.ClassTransformationException;
    +import org.apache.drill.exec.expr.ClassGenerator;
    +import org.apache.drill.exec.expr.CodeGenerator;
     import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
    -import org.apache.drill.exec.expr.holders.ValueHolder;
     import org.apache.drill.exec.memory.BufferAllocator;
     import org.apache.drill.exec.physical.base.PhysicalOperator;
    -import org.apache.drill.exec.planner.physical.PlannerSettings;
    -import org.apache.drill.exec.proto.BitControl.PlanFragment;
    -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
    -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
    -import org.apache.drill.exec.proto.helper.QueryIdHelper;
    -import org.apache.drill.exec.rpc.RpcException;
    -import org.apache.drill.exec.rpc.RpcOutcomeListener;
    -import org.apache.drill.exec.rpc.UserClientConnection;
    -import org.apache.drill.exec.rpc.control.ControlTunnel;
    -import org.apache.drill.exec.rpc.control.WorkEventBus;
    -import org.apache.drill.exec.server.DrillbitContext;
    -import org.apache.drill.exec.server.options.FragmentOptionManager;
    -import org.apache.drill.exec.server.options.OptionList;
    -import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
    +import org.apache.drill.exec.planner.PhysicalPlanReader;
    +import org.apache.drill.exec.proto.CoordinationProtos;
    +import org.apache.drill.exec.proto.ExecProtos;
    +import org.apache.drill.exec.rpc.control.Controller;
     import org.apache.drill.exec.server.options.OptionSet;
    -import org.apache.drill.exec.store.PartitionExplorer;
    -import org.apache.drill.exec.store.SchemaConfig;
     import org.apache.drill.exec.testing.ExecutionControls;
    -import org.apache.drill.exec.util.ImpersonationUtil;
    -import org.apache.drill.exec.work.batch.IncomingBuffers;
    -
    -import com.google.common.annotations.VisibleForTesting;
    -import com.google.common.base.Function;
    -import com.google.common.base.Preconditions;
    -import com.google.common.collect.Lists;
    -import com.google.common.collect.Maps;
     
     import io.netty.buffer.DrillBuf;
    +import org.apache.drill.exec.work.batch.IncomingBuffers;
     
     /**
    - * Contextual objects required for execution of a particular fragment.
    - * This is the implementation; use <tt>FragmentContextInterface</tt>
    - * in code to allow tests to use test-time implementations.
    + * Fragment context interface: separates implementation from definition.
    + * Allows unit testing by mocking or reimplementing services with
    + * test-time versions. The name is awkward, chosen to avoid renaming
    + * the implementation class which is used in many places in legacy code.
    + * New code should use this interface, and the names should eventually
    + * be swapped with {@link FragmentContextImpl} becoming
    + * <tt>FragmentContextImpl</tt> and this interface becoming
    + * {@link FragmentContextImpl}.
      */
     
    -public class FragmentContext extends BaseFragmentContext implements 
AutoCloseable, UdfUtilities, FragmentContextInterface {
    -  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
    -
    -  private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = 
Maps.newHashMap();
    -  private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
    -
    -  private final DrillbitContext context;
    -  private final UserClientConnection connection; // is null if this 
context is for non-root fragment
    -  private final QueryContext queryContext; // is null if this context is 
for non-root fragment
    -  private final FragmentStats stats;
    -  private final BufferAllocator allocator;
    -  private final PlanFragment fragment;
    -  private final ContextInformation contextInformation;
    -  private IncomingBuffers buffers;
    -  private final OptionManager fragmentOptions;
    -  private final BufferManager bufferManager;
    -  private ExecutorState executorState;
    -  private final ExecutionControls executionControls;
    -
    -  private final SendingAccountor sendingAccountor = new SendingAccountor();
    -  private final Consumer<RpcException> exceptionConsumer = new 
Consumer<RpcException>() {
    -    @Override
    -    public void accept(final RpcException e) {
    -      fail(e);
    -    }
    -
    -    @Override
    -    public void interrupt(final InterruptedException e) {
    -      if (shouldContinue()) {
    -        logger.error("Received an unexpected interrupt while waiting for 
the data send to complete.", e);
    -        fail(e);
    -      }
    -    }
    -  };
    -
    -  private final RpcOutcomeListener<Ack> statusHandler = new 
StatusHandler(exceptionConsumer, sendingAccountor);
    -  private final AccountingUserConnection accountingUserConnection;
    -  /** Stores constants and their holders by type */
    -  private final Map<String, Map<MinorType, ValueHolder>> 
constantValueHolderCache;
    -
    +public interface FragmentContext extends UdfUtilities, AutoCloseable {
       /**
    -   * Create a FragmentContext instance for non-root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Wait for ack that all outgoing batches have been sent
        */
    -  public FragmentContext(final DrillbitContext dbContext, final 
PlanFragment fragment,
    -      final FunctionImplementationRegistry funcRegistry) throws 
ExecutionSetupException {
    -    this(dbContext, fragment, null, null, funcRegistry);
    -  }
    +  void waitForSendComplete();
     
       /**
    -   * Create a FragmentContext instance for root fragment.
    -   *
    -   * @param dbContext DrillbitContext.
    -   * @param fragment Fragment implementation.
    -   * @param queryContext QueryContext.
    -   * @param connection UserClientConnection.
    -   * @param funcRegistry FunctionImplementationRegistry.
    -   * @throws ExecutionSetupException
    +   * Returns the UDF registry.
    +   * @return the UDF registry
        */
    -  public FragmentContext(final DrillbitContext dbContext, final 
PlanFragment fragment, final QueryContext queryContext,
    -      final UserClientConnection connection, final 
FunctionImplementationRegistry funcRegistry)
    -    throws ExecutionSetupException {
    -    super(funcRegistry);
    -    this.context = dbContext;
    -    this.queryContext = queryContext;
    -    this.connection = connection;
    -    this.accountingUserConnection = new 
AccountingUserConnection(connection, sendingAccountor, statusHandler);
    -    this.fragment = fragment;
    -    contextInformation = new ContextInformation(fragment.getCredentials(), 
fragment.getContext());
    -
    -    logger.debug("Getting initial memory allocation of {}", 
fragment.getMemInitial());
    -    logger.debug("Fragment max allocation: {}", fragment.getMemMax());
    -
    -    final OptionList list;
    -    if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) 
{
    -      list = new OptionList();
    -    } else {
    -      try {
    -        list = 
dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), 
OptionList.class);
    -      } catch (final Exception e) {
    -        throw new ExecutionSetupException("Failure while reading plan 
options.", e);
    -      }
    -    }
    -    fragmentOptions = new 
FragmentOptionManager(context.getOptionManager(), list);
    -
    -    executionControls = new ExecutionControls(fragmentOptions, 
dbContext.getEndpoint());
    -
    -    // Add the fragment context to the root allocator.
    -    // The QueryManager will call the root allocator to recalculate all 
the memory limits for all the fragments
    -    try {
    -      allocator = context.getAllocator().newChildAllocator(
    -          "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
    -          fragment.getMemInitial(),
    -          fragment.getMemMax());
    -      Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
    -    } catch (final OutOfMemoryException e) {
    -      throw UserException.memoryError(e)
    -        .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + 
getHandle().getMinorFragmentId())
    -        .build(logger);
    -    } catch(final Throwable e) {
    -      throw new ExecutionSetupException("Failure while getting memory 
allocator for fragment.", e);
    -    }
    -
    -    stats = new FragmentStats(allocator, fragment.getAssignment());
    -    bufferManager = new BufferManagerImpl(this.allocator);
    -    constantValueHolderCache = Maps.newHashMap();
    -  }
    +  FunctionImplementationRegistry getFunctionRegistry();
     
       /**
    -   * TODO: Remove this constructor when removing the SimpleRootExec 
(DRILL-2097). This is kept only to avoid modifying
    -   * the long list of test files.
    +   * Returns a read-only version of the session options.
    +   * @return the session options
        */
    -  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, 
UserClientConnection connection,
    -      FunctionImplementationRegistry funcRegistry) throws 
ExecutionSetupException {
    -    this(dbContext, fragment, null, connection, funcRegistry);
    -  }
    +  OptionSet getOptionSet();
     
    -  public OptionManager getOptions() {
    -    return fragmentOptions;
    -  }
    +  PhysicalPlanReader getPlanReader();
     
    -  @Override
    -  public OptionSet getOptionSet() {
    -    return fragmentOptions;
    -  }
    +  ClusterCoordinator getClusterCoordinator();
     
    -  public void setBuffers(final IncomingBuffers buffers) {
    -    Preconditions.checkArgument(this.buffers == null, "Can only set 
buffers once.");
    -    this.buffers = buffers;
    -  }
    +  AccountingDataTunnel getDataTunnel(final 
CoordinationProtos.DrillbitEndpoint endpoint);
     
    -  public void setExecutorState(final ExecutorState executorState) {
    -    Preconditions.checkArgument(this.executorState == null, "ExecutorState 
can only be set once.");
    -    this.executorState = executorState;
    -  }
    +  AccountingUserConnection getUserDataTunnel();
     
    -  public void fail(final Throwable cause) {
    -    executorState.fail(cause);
    -  }
    +  void setBuffers(final IncomingBuffers buffers);
    +
    +  boolean isImpersonationEnabled();
     
       /**
    -   * Tells individual operations whether they should continue. In some 
cases, an external event (typically cancellation)
    -   * will mean that the fragment should prematurely exit execution. Long 
running operations should check this every so
    -   * often so that Drill is responsive to cancellation operations.
    +   * Generates code for a class given a {@link ClassGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
        *
    -   * @return false if the action should terminate immediately, true if 
everything is okay.
    +   * @param cg the class generator
    +   * @return an instance of the generated class
        */
    -  @Override
    -  public boolean shouldContinue() {
    -    return executorState.shouldContinue();
    -  }
    -
    -  @Override
    -  public DrillbitContext getDrillbitContext() {
    -    return context;
    -  }
    +  <T> T getImplementationClass(final ClassGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * This method is only used to construt InfoSchemaReader, it is for the 
reader to get full schema, so here we
    -   * are going to return a fully initialized schema tree.
    -   * @return root schema's plus
    +   * Generates code for a class given a {@link CodeGenerator},
    +   * and returns a single instance of the generated class. (Note
    +   * that the name is a misnomer, it would be better called
    +   * <tt>getImplementationInstance</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return an instance of the generated class
        */
    -  public SchemaPlus getFullRootSchema() {
    -    if (queryContext == null) {
    -      fail(new UnsupportedOperationException("Schema tree can only be 
created in root fragment. " +
    -          "This is a non-root fragment."));
    -      return null;
    -    }
    -
    -    final boolean isImpersonationEnabled = isImpersonationEnabled();
    -    // If impersonation is enabled, we want to view the schema as query 
user and suppress authorization errors. As for
    -    // InfoSchema purpose we want to show tables the user has permissions 
to list or query. If  impersonation is
    -    // disabled view the schema as Drillbit process user and throw 
authorization errors to client.
    -    SchemaConfig schemaConfig = SchemaConfig
    -        .newBuilder(
    -            isImpersonationEnabled ? queryContext.getQueryUserName() : 
ImpersonationUtil.getProcessUserName(),
    -            queryContext)
    -        .setIgnoreAuthErrors(isImpersonationEnabled)
    -        .build();
    -
    -    return queryContext.getFullRootSchema(schemaConfig);
    -  }
    +  <T> T getImplementationClass(final CodeGenerator<T> cg)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this node's identity.
    -   * @return A DrillbitEndpoint object.
    +   * Generates code for a class given a {@link ClassGenerator}, and 
returns the
    +   * specified number of instances of the generated class. (Note that the 
name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the class generator
    +   * @return list of instances of the generated class
        */
    -  public DrillbitEndpoint getIdentity() {
    -    return context.getEndpoint();
    -  }
    -
    -  public FragmentStats getStats() {
    -    return stats;
    -  }
    -
    -  @Override
    -  public ContextInformation getContextInformation() {
    -    return contextInformation;
    -  }
    -
    -  public DrillbitEndpoint getForemanEndpoint() {
    -    return fragment.getForeman();
    -  }
    +  <T> List<T> getImplementationClass(final ClassGenerator<T> cg, final int 
instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * The FragmentHandle for this Fragment
    -   * @return FragmentHandle
    +   * Generates code for a class given a {@link CodeGenerator}, and returns 
the
    +   * specified number of instances of the generated class. (Note that the 
name
    +   * is a misnomer, it would be better called
    +   * <tt>getImplementationInstances</tt>.)
    +   *
    +   * @param cg the code generator
    +   * @return list of instances of the generated class
        */
    -  public FragmentHandle getHandle() {
    -    return fragment.getHandle();
    -  }
    -
    -  public String getFragIdString() {
    -    final FragmentHandle handle = getHandle();
    -    final String frag = handle != null ? handle.getMajorFragmentId() + ":" 
+ handle.getMinorFragmentId() : "0:0";
    -    return frag;
    -  }
    +  <T> List<T> getImplementationClass(final CodeGenerator<T> cg, final int 
instanceCount)
    +      throws ClassTransformationException, IOException;
     
       /**
    -   * Get this fragment's allocator.
    -   * @return the allocator
    +   * Return the set of execution controls used to inject faults into 
running
    +   * code for testing.
    +   *
    +   * @return the execution controls
        */
    -  @Deprecated
    -  public BufferAllocator getAllocator() {
    -    if (allocator == null) {
    -      logger.debug("Fragment: " + getFragIdString() + " Allocator is 
NULL");
    -    }
    -    return allocator;
    -  }
    +  ExecutionControls getExecutionControls();
     
    -  public BufferAllocator getNewChildAllocator(final String operatorName,
    -      final int operatorId,
    -      final long initialReservation,
    -      final long maximumReservation) throws OutOfMemoryException {
    -    return allocator.newChildAllocator(
    -        "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + 
operatorId + ":" + operatorName,
    -        initialReservation,
    -        maximumReservation
    -        );
    -  }
    +  /**
    +   * Returns the Drill configuration for this run. Note that the config is
    +   * global and immutable.
    +   *
    +   * @return the Drill configuration
    +   */
    +  DrillConfig getConfig();
     
    -  public boolean isOverMemoryLimit() {
    -    return allocator.isOverLimit();
    -  }
    +  FragmentStats getStats();
     
    -  @Override
    -  protected CodeCompiler getCompiler() {
    -    return context.getCompiler();
    -  }
    +  CodeCompiler getCompiler();
     
    -  public AccountingUserConnection getUserDataTunnel() {
    -    Preconditions.checkState(connection != null, "Only Root fragment can 
get UserDataTunnel");
    -    return accountingUserConnection;
    -  }
    +  Collection<CoordinationProtos.DrillbitEndpoint> getBits();
     
    -  public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
    -    return context.getController().getTunnel(endpoint);
    -  }
    +  CoordinationProtos.DrillbitEndpoint getForemanEndpoint();
     
    -  public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint 
endpoint) {
    -    AccountingDataTunnel tunnel = tunnels.get(endpoint);
    -    if (tunnel == null) {
    -      tunnel = new 
AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), 
sendingAccountor, statusHandler);
    -      tunnels.put(endpoint, tunnel);
    -    }
    -    return tunnel;
    -  }
    +  CoordinationProtos.DrillbitEndpoint getEndpoint();
    --- End diff --
    
    Above three are network related.


---

Reply via email to