Github user ilooner commented on a diff in the pull request: https://github.com/apache/drill/pull/1045#discussion_r156463711 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java --- @@ -17,483 +17,216 @@ */ package org.apache.drill.exec.ops; +import java.io.IOException; +import java.util.Collection; import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.CodeCompiler; -import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; -import org.apache.drill.exec.expr.holders.ValueHolder; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.planner.physical.PlannerSettings; -import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.UserClientConnection; -import org.apache.drill.exec.rpc.control.ControlTunnel; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.server.options.FragmentOptionManager; -import org.apache.drill.exec.server.options.OptionList; -import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos; +import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.server.options.OptionSet; -import org.apache.drill.exec.store.PartitionExplorer; -import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.testing.ExecutionControls; -import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.work.batch.IncomingBuffers; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.work.batch.IncomingBuffers; /** - * Contextual objects required for execution of a particular fragment. - * This is the implementation; use <tt>FragmentContextInterface</tt> - * in code to allow tests to use test-time implementations. + * Fragment context interface: separates implementation from definition. + * Allows unit testing by mocking or reimplementing services with + * test-time versions. The name is awkward, chosen to avoid renaming + * the implementation class which is used in many places in legacy code. + * New code should use this interface, and the names should eventually + * be swapped with {@link FragmentContextImpl} becoming + * <tt>FragmentContextImpl</tt> and this interface becoming + * {@link FragmentContextImpl}. */ -public class FragmentContext extends BaseFragmentContext implements AutoCloseable, UdfUtilities, FragmentContextInterface { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); - - private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); - private final List<OperatorContextImpl> contexts = Lists.newLinkedList(); - - private final DrillbitContext context; - private final UserClientConnection connection; // is null if this context is for non-root fragment - private final QueryContext queryContext; // is null if this context is for non-root fragment - private final FragmentStats stats; - private final BufferAllocator allocator; - private final PlanFragment fragment; - private final ContextInformation contextInformation; - private IncomingBuffers buffers; - private final OptionManager fragmentOptions; - private final BufferManager bufferManager; - private ExecutorState executorState; - private final ExecutionControls executionControls; - - private final SendingAccountor sendingAccountor = new SendingAccountor(); - private final Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { - @Override - public void accept(final RpcException e) { - fail(e); - } - - @Override - public void interrupt(final InterruptedException e) { - if (shouldContinue()) { - logger.error("Received an unexpected interrupt while waiting for the data send to complete.", e); - fail(e); - } - } - }; - - private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); - private final AccountingUserConnection accountingUserConnection; - /** Stores constants and their holders by type */ - private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; - +public interface FragmentContext extends UdfUtilities, AutoCloseable { /** - * Create a FragmentContext instance for non-root fragment. - * - * @param dbContext DrillbitContext. - * @param fragment Fragment implementation. - * @param funcRegistry FunctionImplementationRegistry. - * @throws ExecutionSetupException + * Wait for ack that all outgoing batches have been sent */ - public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, - final FunctionImplementationRegistry funcRegistry) throws ExecutionSetupException { - this(dbContext, fragment, null, null, funcRegistry); - } + void waitForSendComplete(); --- End diff -- Thanks for the explanation. Sounds very reasonable to me. I will work on splitting things up into 3 interfaces. The FragmentContextImpl class could probably implement all three interfaces and be cast to the appropriate interface depending on where it is passed to.
---