Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/1045#discussion_r156192269 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java --- @@ -17,483 +17,216 @@ */ package org.apache.drill.exec.ops; +import java.io.IOException; +import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.CodeCompiler; -import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.expr.holders.ValueHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.UserClientConnection; -import org.apache.drill.exec.rpc.control.ControlTunnel; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.FragmentOptionManager; -import org.apache.drill.exec.server.options.OptionList; -import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.exec.store.PartitionExplorer; -import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.testing.ExecutionControls; -import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.work.batch.IncomingBuffers; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.work.batch.IncomingBuffers; /** - * Contextual objects required for execution of a particular fragment. - * This is the implementation; use <tt>FragmentContextInterface</tt> - * in code to allow tests to use test-time implementations. + * Fragment context interface: separates implementation from definition. + * Allows unit testing by mocking or reimplementing services with + * test-time versions. The name is awkward, chosen to avoid renaming + * the implementation class which is used in many places in legacy code. + * New code should use this interface, and the names should eventually + * be swapped with {@link FragmentContextImpl} becoming + * <tt>FragmentContextImpl</tt> and this interface becoming + * {@link FragmentContextImpl}. */ -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); - - private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); - private final List<OperatorContextImpl> contexts = Lists.newLinkedList(); - - private final DrillbitContext context; - private final UserClientConnection connection; // is null if this context is for non-root fragment - private final QueryContext queryContext; // is null if this context is for non-root fragment - private final FragmentStats stats; - private final BufferAllocator allocator; - private final PlanFragment fragment; - private final ContextInformation contextInformation; - private IncomingBuffers buffers; - private final OptionManager fragmentOptions; - private final BufferManager bufferManager; - private ExecutorState executorState; - private final ExecutionControls executionControls; - - private final SendingAccountor sendingAccountor = new SendingAccountor(); - private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { - @Override - public void accept(final RpcException e) { - fail(e); - } - - @Override - public void interrupt(final InterruptedException e) { - if (shouldContinue()) { - logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e); - fail(e); - } - } - }; - - private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); - private final AccountingUserConnection accountingUserConnection; - /** Stores constants and their holders by type */ - private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; - +public interface FragmentContext extends UdfUtilities, AutoCloseable { /** - * Create a FragmentContext instance for non-root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Wait for ack that all outgoing batches have been sent */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, - final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, null, funcRegistry); - } + void waitForSendComplete(); /** - * Create a FragmentContext instance for root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param queryContext QueryContext. - * @param connection UserClientConnection. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Returns the UDF registry. + * @return the UDF registry */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, final QueryContext queryContext, - final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) - throws ExecutionSetupException { - super(funcRegistry); - this.context = dbContext; - this.queryContext = queryContext; - this.connection = connection; - this.accountingUserConnection = new AccountingUserConnection(connection, sendingAccountor, statusHandler); - this.fragment = fragment; - contextInformation = new ContextInformation(fragment.getCredentials(), fragment.getContext()); - - logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); - logger.debug("Fragment max allocation: {}", fragment.getMemMax()); - - final OptionList list; - if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) { - list = new OptionList(); - } else { - try { - list = dbContext.getLpPersistence().getMapper().readValue(fragment.getOptionsJson(), OptionList.class); - } catch (final Exception e) { - throw new ExecutionSetupException("Failure while reading plan options.", e); - } - } - fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list); - - executionControls = new ExecutionControls(fragmentOptions, dbContext.getEndpoint()); - - // Add the fragment context to the root allocator. - // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments - try { - allocator = context.getAllocator().newChildAllocator( - "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()), - fragment.getMemInitial(), - fragment.getMemMax()); - Preconditions.checkNotNull(allocator, "Unable to acuqire allocator"); - } catch (final OutOfMemoryException e) { - throw UserException.memoryError(e) - .addContext("Fragment", getHandle().getMajorFragmentId() + ":" + getHandle().getMinorFragmentId()) - .build(logger); - } catch(final Throwable e) { - throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); - } - - stats = new FragmentStats(allocator, fragment.getAssignment()); - bufferManager = new BufferManagerImpl(this.allocator); - constantValueHolderCache = Maps.newHashMap(); - } + FunctionImplementationRegistry getFunctionRegistry(); /** - * TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying - * the long list of test files. + * Returns a read-only version of the session options. + * @return the session options */ - public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, - FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, connection, funcRegistry); - } + OptionSet getOptionSet(); - public OptionManager getOptions() { - return fragmentOptions; - } + PhysicalPlanReader getPlanReader(); - @Override - public OptionSet getOptionSet() { - return fragmentOptions; - } + ClusterCoordinator getClusterCoordinator(); - public void setBuffers(final IncomingBuffers buffers) { - Preconditions.checkArgument(this.buffers == null, "Can only set buffers once."); - this.buffers = buffers; - } + AccountingDataTunnel getDataTunnel(final CoordinationProtos.DrillbitEndpoint endpoint); - public void setExecutorState(final ExecutorState executorState) { - Preconditions.checkArgument(this.executorState == null, "ExecutorState can only be set once."); - this.executorState = executorState; - } + AccountingUserConnection getUserDataTunnel(); - public void fail(final Throwable cause) { - executorState.fail(cause); - } + void setBuffers(final IncomingBuffers buffers); --- End diff -- Network related.
---