http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 7a0e501..3e1393c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -21,24 +21,28 @@ import io.netty.buffer.ByteBuf; import java.io.Closeable; import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import net.hydromatic.optiq.tools.ValidationException; - import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.DistributedSemaphore; import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; -import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.exception.OptimizerException; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.opt.BasicOptimizer; import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.ExternalSort; -import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; @@ -47,39 +51,58 @@ import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.StatsCollector; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; +import org.apache.drill.exec.proto.BitControl.InitializeFragments; +import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; -import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.util.AtomicState; import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.ErrorHelper; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.eigenbase.sql.parser.SqlParseException; +import org.apache.drill.exec.work.batch.IncomingBuffers; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import org.apache.drill.exec.work.fragment.RootFragmentManager; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; /** * Foreman manages all queries where this is the driving/root node. + * + * The flow is as follows: + * - Foreman is submitted as a runnable. + * - Runnable does query planning. + * - PENDING > RUNNING + * - Runnable sends out starting fragments + * - Status listener are activated + * - Foreman listens for state move messages. + * */ -public class Foreman implements Runnable, Closeable, Comparable<Object>{ +public class Foreman implements Runnable, Closeable, Comparable<Object> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class); private QueryId queryId; private RunQuery queryRequest; private QueryContext context; - private QueryManager fragmentManager; + private QueryManager queryManager; private WorkerBee bee; private UserClientConnection initiatingClient; - private final AtomicState<QueryState> state; + private volatile QueryState state; + private final DistributedSemaphore smallSemaphore; private final DistributedSemaphore largeSemaphore; private final long queueThreshold; @@ -87,11 +110,18 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ private volatile DistributedLease lease; private final boolean queuingEnabled; + private FragmentExecutor rootRunner; + private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); + private final StateListener stateListener = new StateListener(); + private final ResponseSendListener responseListener = new ResponseSendListener(); + public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId, RunQuery queryRequest) { this.queryId = queryId; this.queryRequest = queryRequest; this.context = new QueryContext(connection.getSession(), queryId, dContext); + + // set up queuing this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val; if (queuingEnabled) { int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue(); @@ -106,81 +136,32 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ this.queueThreshold = 0; this.queueTimeout = 0; } + // end queuing setup. this.initiatingClient = connection; - this.fragmentManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(), new ForemanManagerListener(), dContext.getController(), this); + this.queryManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(), + stateListener, this); this.bee = bee; - this.state = new AtomicState<QueryState>(QueryState.PENDING) { - @Override - protected QueryState getStateFromNumber(int i) { - return QueryState.valueOf(i); - } - }; - this.fragmentManager.getStatus().updateQueryStateInStore(); + recordNewState(QueryState.PENDING); } public QueryContext getContext() { return context; } - private boolean isFinished() { - switch(state.getState()) { - case PENDING: - case RUNNING: - return false; - default: - return true; - } - - } - - private void fail(String message, Throwable t) { - if(isFinished()) { - logger.error("Received a failure message query finished of: {}", message, t); - } - if (!state.updateState(QueryState.RUNNING, QueryState.FAILED)) { - if (!state.updateState(QueryState.PENDING, QueryState.FAILED)) { - logger.warn("Tried to update query state to FAILED, but was not RUNNING"); - } - } - - DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(), message, t, logger); - QueryResult result = QueryResult // - .newBuilder() // - .addError(error) // - .setIsLastChunk(true) // - .setQueryState(QueryState.FAILED) // - .setQueryId(queryId) // - .build(); - cleanupAndSendResult(result); - } - public void cancel() { - if (isFinished()) { - return; - } - state.updateState(QueryState.RUNNING, QueryState.CANCELED); - - // cancel remote fragments. - fragmentManager.cancel(); + stateListener.moveToState(QueryState.CANCELED, null); } - void cleanupAndSendResult(QueryResult result) { + private void cleanup(QueryResult result) { bee.retireForeman(this); - initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result), true); - state.updateState(state.getState(), result.getQueryState()); - - this.fragmentManager.getStatus().updateQueryStateInStore(); - } - - private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> { - @Override - public void failed(RpcException ex) { - logger.info( - "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.", - ex); + context.getWorkBus().removeFragmentStatusListener(queryId); + context.getClusterCoordinator().removeDrillbitStatusListener(queryManager); + if(result != null){ + initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true); } + releaseLease(); } /** @@ -190,7 +171,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ final String originalThread = Thread.currentThread().getName(); Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman"); - fragmentManager.getStatus().setStartTime(System.currentTimeMillis()); + getStatus().markStart(); // convert a run query request into action try { switch (queryRequest.getType()) { @@ -204,16 +185,21 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ runSQL(queryRequest.getPlan()); break; default: - throw new UnsupportedOperationException(); + throw new IllegalStateException(); } + } catch (ForemanException e) { + moveToState(QueryState.FAILED, e); + } catch (AssertionError | Exception ex) { - fail("Failure while setting up Foreman.", ex); + moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization.", ex)); + } catch (OutOfMemoryError e) { System.out.println("Out of memory, exiting."); + e.printStackTrace(); System.out.flush(); System.exit(-1); + } finally { - releaseLease(); Thread.currentThread().setName(originalThread); } } @@ -224,45 +210,62 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ lease.close(); } catch (Exception e) { logger.warn("Failure while releasing lease.", e); - }; + } + ; } } - private void parseAndRunLogicalPlan(String json) { + private void parseAndRunLogicalPlan(String json) throws ExecutionSetupException { + LogicalPlan logicalPlan; try { - LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json); + logicalPlan = context.getPlanReader().readLogicalPlan(json); + } catch (IOException e) { + throw new ForemanException("Failure parsing logical plan.", e); + } - if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) { - fail("Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC.", new Exception()); - } - if (logger.isDebugEnabled()) { - logger.debug("Logical {}", logicalPlan.unparse(context.getConfig())); - } - PhysicalPlan physicalPlan = convert(logicalPlan); + if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) { + throw new ForemanException( + "Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC."); + } - if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) { - returnPhysical(physicalPlan); - return; - } + log(logicalPlan); + + PhysicalPlan physicalPlan = convert(logicalPlan); + + if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) { + returnPhysical(physicalPlan); + return; + } - if (logger.isDebugEnabled()) { - logger.debug("Physical {}", context.getConfig().getMapper().writeValueAsString(physicalPlan)); + log(physicalPlan); + + runPhysicalPlan(physicalPlan); + } + + private void log(LogicalPlan plan) { + if (logger.isDebugEnabled()) { + logger.debug("Logical {}", plan.unparse(context.getConfig())); + } + } + + private void log(PhysicalPlan plan) { + if (logger.isDebugEnabled()) { + try { + String planText = context.getConfig().getMapper().writeValueAsString(plan); + logger.debug("Physical {}", planText); + } catch (IOException e) { + logger.warn("Error while attempting to log physical plan.", e); } - runPhysicalPlan(physicalPlan); - } catch (IOException e) { - fail("Failure while parsing logical plan.", e); - } catch (OptimizerException e) { - fail("Failure while converting logical plan to physical plan.", e); } } - private void returnPhysical(PhysicalPlan plan) { + private void returnPhysical(PhysicalPlan plan) throws ExecutionSetupException { String jsonPlan = plan.unparse(context.getConfig().getMapper().writer()); runPhysicalPlan(DirectPlan.createDirectPlan(context, new PhysicalFromLogicalExplain(jsonPlan))); } - private class PhysicalFromLogicalExplain{ + public static class PhysicalFromLogicalExplain { public String json; public PhysicalFromLogicalExplain(String json) { @@ -272,54 +275,53 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } - class SingleListener implements RpcOutcomeListener<Ack>{ - - final SendingAccountor acct; - - public SingleListener() { - acct = new SendingAccountor(); - acct.increment(); - acct.increment(); - } - - @Override - public void failed(RpcException ex) { - acct.decrement(); - fail("Failure while sending single result.", ex); - } - - @Override - public void success(Ack value, ByteBuf buffer) { - acct.decrement(); - } - - } - - private void parseAndRunPhysicalPlan(String json) { + private void parseAndRunPhysicalPlan(String json) throws ExecutionSetupException { try { PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json); runPhysicalPlan(plan); } catch (IOException e) { - fail("Failure while parsing physical plan.", e); + throw new ForemanSetupException("Failure while parsing physical plan.", e); } } - private void runPhysicalPlan(PhysicalPlan plan) { + private void runPhysicalPlan(PhysicalPlan plan) throws ExecutionSetupException { - if(plan.getProperties().resultMode != ResultMode.EXEC) { - fail(String.format("Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception()); - } - PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); + validatePlan(plan); + setupSortMemoryAllocations(plan); + acquireQuerySemaphore(plan); - MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor(); - Fragment rootFragment; - try { - rootFragment = rootOperator.accept(makeFragmentsVisitor, null); - } catch (FragmentSetupException e) { - fail("Failure while fragmenting query.", e); - return; + final QueryWorkUnit work = getQueryWorkUnit(plan); + + this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), queryManager); + this.context.getClusterCoordinator().addDrillbitStatusListener(queryManager); + + logger.debug("Submitting fragments to run."); + + final PlanFragment rootPlanFragment = work.getRootFragment(); + assert queryId == rootPlanFragment.getHandle().getQueryId(); + + queryManager.setup(rootPlanFragment.getHandle(), context.getCurrentEndpoint(), work.getFragments().size()); + + // set up the root fragment first so we'll have incoming buffers available. + setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator()); + + setupNonRootFragments(work.getFragments()); + bee.getContext().getAllocator().resetFragmentLimits(); + + moveToState(QueryState.RUNNING, null); + logger.debug("Fragments running."); + + } + + private void validatePlan(PhysicalPlan plan) throws ForemanSetupException{ + if (plan.getProperties().resultMode != ResultMode.EXEC) { + throw new ForemanSetupException(String.format( + "Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", + plan.getProperties().resultMode)); } + } + private void setupSortMemoryAllocations(PhysicalPlan plan){ int sortCount = 0; for (PhysicalOperator op : plan.getSortedOperators()) { if (op instanceof ExternalSort) { @@ -329,83 +331,193 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ if (sortCount > 0) { long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val; - long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)); - maxAllocPerNode = Math.min(maxAllocPerNode, context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); + long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), + context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)); + maxAllocPerNode = Math.min(maxAllocPerNode, + context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode); logger.debug("Max sort alloc: {}", maxSortAlloc); for (PhysicalOperator op : plan.getSortedOperators()) { - if (op instanceof ExternalSort) { - ((ExternalSort)op).setMaxAllocation(maxSortAlloc); + if (op instanceof ExternalSort) { + ((ExternalSort) op).setMaxAllocation(maxSortAlloc); } } } + } - PlanningSet planningSet = StatsCollector.collectStats(rootFragment); - SimpleParallelizer parallelizer = new SimpleParallelizer(context); + private void acquireQuerySemaphore(PhysicalPlan plan) throws ForemanSetupException { - try { + double size = 0; + for (PhysicalOperator ops : plan.getSortedOperators()) { + size += ops.getCost(); + } - double size = 0; - for (PhysicalOperator ops : plan.getSortedOperators()) { - size += ops.getCost(); - } - if (queuingEnabled) { + if (queuingEnabled) { + try { if (size > this.queueThreshold) { this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); } else { this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); } + } catch (Exception e) { + throw new ForemanSetupException("Unable to acquire slot for query.", e); + } + } + } + + private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException { + PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); + MakeFragmentsVisitor makeFragmentsVisitor = new MakeFragmentsVisitor(); + Fragment rootFragment = rootOperator.accept(makeFragmentsVisitor, null); + PlanningSet planningSet = StatsCollector.collectStats(rootFragment); + SimpleParallelizer parallelizer = new SimpleParallelizer(context); + + + return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(), + queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, + initiatingClient.getSession()); + } + + /** + * Tells the foreman to move to a new state. Note that + * @param state + * @return + */ + private synchronized boolean moveToState(QueryState newState, Exception exception){ + logger.debug("State change requested. {} --> {}", state, newState); + outside: switch(state) { + + case PENDING: + // since we're moving out of pending, we can now start accepting other changes in state. + // This guarantees that the first state change is driven by the original thread. + acceptExternalEvents.countDown(); + + if(newState == QueryState.RUNNING){ + recordNewState(QueryState.RUNNING); + return true; + } + + // fall through to running behavior. + // + case RUNNING: { + + switch(newState){ + + case CANCELED: { + assert exception == null; + recordNewState(QueryState.CANCELED); + cancelExecutingFragments(); + QueryResult result = QueryResult.newBuilder() // + .setQueryId(queryId) // + .setQueryState(QueryState.CANCELED) // + .setIsLastChunk(true) // + .build(); + + cleanup(result); + return true; } - QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(), - queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, initiatingClient.getSession()); + case COMPLETED: { + assert exception == null; + recordNewState(QueryState.COMPLETED); +// QueryResult result = QueryResult // +// .newBuilder() // +// .setIsLastChunk(true) // +// .setQueryState(QueryState.COMPLETED) // +// .setQueryId(queryId) // +// .build(); + cleanup(null); + return true; + } - this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager); - this.context.getClusterCoordinator().addDrillbitStatusListener(fragmentManager); - int totalFragments = 1 + work.getFragments().size();; - fragmentManager.getStatus().setTotalFragments(totalFragments); + case FAILED: + assert exception != null; + recordNewState(QueryState.FAILED); + cancelExecutingFragments(); + DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(), "Query failed.", exception, logger); + QueryResult result = QueryResult // + .newBuilder() // + .addError(error) // + .setIsLastChunk(true) // + .setQueryState(QueryState.FAILED) // + .setQueryId(queryId) // + .build(); + cleanup(result); + return true; + default: + break outside; - logger.debug("Submitting fragments to run."); - fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, work.getFragments()); + } + } + + case CANCELED: + case COMPLETED: + case FAILED: { + // no op. + logger.info("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception); + return false; + } - logger.debug("Fragments running."); - state.updateState(QueryState.PENDING, QueryState.RUNNING); - fragmentManager.getStatus().updateQueryStateInStore(); - } catch (Exception e) { - fail("Failure while setting up query.", e); } + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); } - private void runSQL(String sql) { - try{ - DrillSqlWorker sqlWorker = new DrillSqlWorker(context); - Pointer<String> textPlan = new Pointer<>(); - PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); - fragmentManager.getStatus().setPlanText(textPlan.value); - runPhysicalPlan(plan); - } catch (SqlParseException ex) { - fail("Failure while parsing sql : " + ex.getMessage(), ex); - } catch (ValidationException ex) { - fail("Failure while validating sql : " + ex.getMessage(), ex); - } catch(Exception e) { - fail("Failure while running sql.", e); + private void cancelExecutingFragments(){ + + // Stop all framgents with a currently active status. + List<FragmentData> fragments = getStatus().getFragmentData(); + Collections.sort(fragments, new Comparator<FragmentData>() { + @Override + public int compare(FragmentData o1, FragmentData o2) { + return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId(); + } + }); + for(FragmentData data: fragments){ + FragmentHandle handle = data.getStatus().getHandle(); + switch(data.getStatus().getProfile().getState()){ + case SENDING: + case AWAITING_ALLOCATION: + case RUNNING: + if(data.isLocal()){ + rootRunner.cancel(); + }else{ + bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle); + } + break; + default: + break; + } } + + } + + private QueryStatus getStatus(){ + return queryManager.getStatus(); + } + + private void recordNewState(QueryState newState){ + this.state = newState; + getStatus().updateQueryStateInStore(newState); + } + + private void runSQL(String sql) throws ExecutionSetupException { + DrillSqlWorker sqlWorker = new DrillSqlWorker(context); + Pointer<String> textPlan = new Pointer<>(); + PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); + getStatus().setPlanText(textPlan.value); + runPhysicalPlan(plan); } private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException { if (logger.isDebugEnabled()) { logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig())); } - return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(new BasicOptimizer.BasicOptimizationContext(context), plan); + return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize( + new BasicOptimizer.BasicOptimizationContext(context), plan); } - public QueryResult getResult(UserClientConnection connection, RequestResults req) { - throw new UnsupportedOperationException(); - } - - public QueryId getQueryId() { return queryId; } @@ -414,29 +526,163 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ public void close() throws IOException { } - public QueryState getQueryState() { - return this.state.getState(); + public QueryStatus getQueryStatus() { + return this.queryManager.getStatus(); } - public QueryStatus getQueryStatus() { - return this.fragmentManager.getStatus(); + private void setupRootFragment(PlanFragment rootFragment, UserClientConnection rootClient, FragmentRoot rootOperator) throws ExecutionSetupException { + FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext() + .getFunctionImplementationRegistry()); + + IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext); + + rootContext.setBuffers(buffers); + + // add fragment to local node. + queryManager.addFragmentStatusTracker(rootFragment, true); + + this.rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, queryManager.getRootStatusHandler(rootContext, rootFragment)); + RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); + + if (buffers.isDone()) { + // if we don't have to wait for any incoming data, start the fragment runner. + bee.addFragmentRunner(fragmentManager.getRunnable()); + } else { + // if we do, record the fragment manager in the workBus. + bee.getContext().getWorkBus().setFragmentManager(fragmentManager); + } } + private void setupNonRootFragments(Collection<PlanFragment> fragments) throws ForemanException{ + Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); + Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); + + // record all fragments for status purposes. + for (PlanFragment f : fragments) { +// logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson()); + queryManager.addFragmentStatusTracker(f, false); + if (f.getLeafFragment()) { + leafFragmentMap.put(f.getAssignment(), f); + } else { + intFragmentMap.put(f.getAssignment(), f); + } + } + + CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size()); - class ForemanManagerListener{ - void fail(String message, Throwable t) { - ForemanManagerListener.this.fail(message, t); + // send remote intermediate fragments + for (DrillbitEndpoint ep : intFragmentMap.keySet()) { + sendRemoteFragments(ep, intFragmentMap.get(ep), latch); } - void cleanupAndSendResult(QueryResult result) { - Foreman.this.cleanupAndSendResult(result); + // wait for send complete + try { + latch.await(); + } catch (InterruptedException e) { + throw new ForemanException("Interrupted while waiting to complete send of remote fragments.", e); + } + + // send remote (leaf) fragments. + for (DrillbitEndpoint ep : leafFragmentMap.keySet()) { + sendRemoteFragments(ep, leafFragmentMap.get(ep), null); + } + } + + public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){ + return new FragmentSubmitListener(endpoint, value, latch); + } + + private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){ + Controller controller = bee.getContext().getController(); + InitializeFragments.Builder fb = InitializeFragments.newBuilder(); + for(PlanFragment f : fragments){ + fb.addFragment(f); + } + InitializeFragments initFrags = fb.build(); + + logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags); + FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch); + controller.getTunnel(assignment).sendFragments(listener, initFrags); + } + + public QueryState getState(){ + return state; + } + + private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{ + + private CountDownLatch latch; + + public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) { + super(endpoint, value); + this.latch = latch; + } + + @Override + public void success(Ack ack, ByteBuf byteBuf) { + if (latch != null) { + latch.countDown(); + } + } + + @Override + public void failed(RpcException ex) { + logger.debug("Failure while sending fragment. Stopping query.", ex); + moveToState(QueryState.FAILED, ex); } } + + public class StateListener { + public boolean moveToState(QueryState newState, Exception ex){ + try{ + acceptExternalEvents.await(); + }catch(InterruptedException e){ + logger.warn("Interrupted while waiting to move state.", e); + return false; + } + + return Foreman.this.moveToState(newState, ex); + } + } + + @Override public int compareTo(Object o) { return hashCode() - o.hashCode(); } + private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> { + @Override + public void failed(RpcException ex) { + logger + .info( + "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.", + ex); + moveToState(QueryState.FAILED, ex); + } + } + + + private class CancelListener extends EndpointListener<Ack, FragmentHandle>{ + + public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) { + super(endpoint, handle); + } + + @Override + public void failed(RpcException ex) { + logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex); + } + + @Override + public void success(Ack value, ByteBuf buf) { + if(!value.getOk()){ + logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value); + } + // do nothing. + } + + } }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java new file mode 100644 index 0000000..32a99ad --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanException.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + +import java.lang.reflect.InvocationTargetException; + +import org.apache.drill.common.exceptions.ExecutionSetupException; + +public class ForemanException extends ExecutionSetupException { + private static final long serialVersionUID = -6943409010231014085L; + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ForemanException.class); + + public static ForemanException fromThrowable(String message, Throwable cause) { + Throwable t = cause instanceof InvocationTargetException + ? ((InvocationTargetException)cause).getTargetException() : cause; + if (t instanceof ForemanException) { + return ((ForemanException) t); + } + return new ForemanException(message, t); + } + + public ForemanException() { + super(); + } + + public ForemanException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public ForemanException(String message, Throwable cause) { + super(message, cause); + } + + public ForemanException(String message) { + super(message); + } + + public ForemanException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java new file mode 100644 index 0000000..2083753 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ForemanSetupException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman; + + +public class ForemanSetupException extends ForemanException { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ForemanSetupException.class); + + public ForemanSetupException() { + super(); + } + + public ForemanSetupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + + public ForemanSetupException(String message, Throwable cause) { + super(message, cause); + } + + public ForemanSetupException(String message) { + super(message); + } + + public ForemanSetupException(Throwable cause) { + super(cause); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 0f007ee..d4c87d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -17,204 +17,70 @@ */ package org.apache.drill.exec.work.foreman; -import io.netty.buffer.ByteBuf; - -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.drill.common.exceptions.DrillException; -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.BitControl.InitializeFragments; import org.apache.drill.exec.proto.BitControl.PlanFragment; -import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; -import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.RunQuery; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.control.Controller; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.RemoteRpcException; import org.apache.drill.exec.store.sys.PStoreProvider; -import org.apache.drill.exec.work.EndpointListener; -import org.apache.drill.exec.work.ErrorHelper; -import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.batch.IncomingBuffers; -import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener; +import org.apache.drill.exec.work.foreman.Foreman.StateListener; import org.apache.drill.exec.work.fragment.AbstractStatusReporter; -import org.apache.drill.exec.work.fragment.FragmentExecutor; -import org.apache.drill.exec.work.fragment.RootFragmentManager; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + /** - * Each Foreman holds its own fragment manager. This manages the events associated with execution of a particular query across all fragments. + * Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments. */ public class QueryManager implements FragmentStatusListener, DrillbitStatusListener{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class); + private final Set<DrillbitEndpoint> includedBits; private final QueryStatus status; - private final Controller controller; - private ForemanManagerListener foremanManagerListener; - private AtomicInteger remainingFragmentCount; - private AtomicInteger failedFragmentCount; - private WorkEventBus workBus; - private ClusterCoordinator coord; - private QueryId queryId; - private FragmentExecutor rootRunner; - private RunQuery query; - private volatile boolean running = false; - private volatile boolean cancelled = false; - private volatile boolean stopped = false; + private final StateListener stateListener; + private final AtomicInteger remainingFragmentCount; + private final QueryId queryId; - public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, ForemanManagerListener foremanManagerListener, Controller controller, Foreman foreman) { + public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) { super(); - this.foremanManagerListener = foremanManagerListener; - this.query = query; + this.stateListener = stateListener; this.queryId = id; - this.controller = controller; this.remainingFragmentCount = new AtomicInteger(0); - this.failedFragmentCount = new AtomicInteger(0); this.status = new QueryStatus(query, id, pStoreProvider, foreman); + this.includedBits = Sets.newHashSet(); } public QueryStatus getStatus(){ return status; } - public void addTextPlan(String textPlan){ - - } - - public void runFragments(WorkerBee bee, PlanFragment rootFragment, FragmentRoot rootOperator, - UserClientConnection rootClient, List<PlanFragment> nonRootFragments) throws ExecutionSetupException{ - logger.debug("Setting up fragment runs."); - remainingFragmentCount.set(nonRootFragments.size() + 1); - assert queryId == rootFragment.getHandle().getQueryId(); - workBus = bee.getContext().getWorkBus(); - coord = bee.getContext().getClusterCoordinator(); - - // set up the root fragment first so we'll have incoming buffers available. - { - logger.debug("Setting up root context."); - FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext().getFunctionImplementationRegistry()); - logger.debug("Setting up incoming buffers"); - IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext); - logger.debug("Setting buffers on root context."); - rootContext.setBuffers(buffers); - // add fragment to local node. - status.add(new FragmentData(rootFragment.getHandle(), rootFragment.getAssignment(), true)); - logger.debug("Fragment added to local node."); - rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, new RootStatusHandler(rootContext, rootFragment)); - RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); - - if(buffers.isDone()){ - // if we don't have to wait for any incoming data, start the fragment runner. - bee.addFragmentRunner(fragmentManager.getRunnable()); - }else{ - // if we do, record the fragment manager in the workBus. - workBus.setFragmentManager(fragmentManager); - } - } - - Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); - Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); - - // record all fragments for status purposes. - for (PlanFragment f : nonRootFragments) { - logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson()); - status.add(new FragmentData(f.getHandle(), f.getAssignment(), false)); - if (f.getLeafFragment()) { - leafFragmentMap.put(f.getAssignment(), f); - } else { - intFragmentMap.put(f.getAssignment(), f); - } - } - - CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size()); - - // send remote intermediate fragments - for (DrillbitEndpoint ep : intFragmentMap.keySet()) { - sendRemoteFragments(ep, intFragmentMap.get(ep), latch); - } - - // wait for send complete - try { - latch.await(); - } catch (InterruptedException e) { - throw new ExecutionSetupException(e); - } - - // send remote (leaf) fragments. - for (DrillbitEndpoint ep : leafFragmentMap.keySet()) { - sendRemoteFragments(ep, leafFragmentMap.get(ep), null); - } - - bee.getContext().getAllocator().resetFragmentLimits(); - - logger.debug("Fragment runs setup is complete."); - running = true; - if (cancelled && !stopped) { - stopQuery(); - QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.CANCELED).setIsLastChunk(true).build(); - foremanManagerListener.cleanupAndSendResult(result); - } - } - - private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){ - InitializeFragments.Builder fb = InitializeFragments.newBuilder(); - for(PlanFragment f : fragments){ - fb.addFragment(f); - } - InitializeFragments initFrags = fb.build(); - - logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags); - FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch); - controller.getTunnel(assignment).sendFragments(listener, initFrags); - } - @Override public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) { } @Override public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredDrillbits) { - List<FragmentData> fragments = status.getFragmentData(); - - for (FragmentData fragment : fragments) { - if (unregisteredDrillbits.contains(fragment.getEndpoint())) { - logger.warn("Drillbit {} for major{}:minor{} is not responding. Stop query {}", - fragment.getEndpoint(), - fragment.getHandle().getMajorFragmentId(), - fragment.getHandle().getMinorFragmentId(), - fragment.getHandle().getQueryId()); - - UserBitShared.DrillPBError error = ErrorHelper.logAndConvertError(fragment.getEndpoint(), "Failure while running fragment.", - new DrillRuntimeException(String.format("Drillbit %s not responding", fragment.getEndpoint())), logger); - failWithError(error); - break; + for(DrillbitEndpoint ep : unregisteredDrillbits){ + if(this.includedBits.contains(ep)){ + logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}", ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId)); + this.stateListener.moveToState(QueryState.FAILED, new ForemanException("One more more nodes lost connectivity during query. Identified node was " + ep.getAddress())); } } } + @Override public void statusUpdate(FragmentStatus status) { + logger.debug("New fragment status was provided to Foreman of {}", status); switch(status.getProfile().getState()){ case AWAITING_ALLOCATION: @@ -224,7 +90,7 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe // we don't care about cancellation messages since we're the only entity that should drive cancellations. break; case FAILED: - fail(status); + stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError())); break; case FINISHED: finished(status); @@ -242,139 +108,47 @@ public class QueryManager implements FragmentStatusListener, DrillbitStatusListe } private void finished(FragmentStatus status){ - int remaining = remainingFragmentCount.decrementAndGet(); - if(remaining == 0){ - logger.info("Outcome status: {}", this.status); - QueryResult result = QueryResult.newBuilder() // - .setQueryState(QueryState.COMPLETED) // - .setQueryId(queryId) // - .build(); - this.status.setEndTime(System.currentTimeMillis()); - foremanManagerListener.cleanupAndSendResult(result); - workBus.removeFragmentStatusListener(queryId); - coord.removeDrillbitStatusListener(this); - } this.status.incrementFinishedFragments(); + int remaining = remainingFragmentCount.decrementAndGet(); updateFragmentStatus(status); - } - private void fail(FragmentStatus status){ - updateFragmentStatus(status); - int failed = this.failedFragmentCount.incrementAndGet(); - if (failed == 1) { // only first failed fragment need notify foreman (?) - failWithError(status.getProfile().getError()); + if(remaining == 0){ + stateListener.moveToState(QueryState.COMPLETED, null); } } - private void failWithError(UserBitShared.DrillPBError error) { - stopQuery(); - QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.FAILED).addError(error).setIsLastChunk(true).build(); - this.status.setEndTime(System.currentTimeMillis()); - foremanManagerListener.cleanupAndSendResult(result); - } - + public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){ + remainingFragmentCount.set(countOfNonRootFragments + 1); + status.add(new FragmentData(rootFragmentHandle, localIdentity, true)); + this.status.setTotalFragments(countOfNonRootFragments + 1); - private void stopQuery(){ - // Stop all queries with a currently active status. List<FragmentData> fragments = status.getFragmentData(); - Collections.sort(fragments, new Comparator<FragmentData>() { - @Override - public int compare(FragmentData o1, FragmentData o2) { - return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId(); - } - }); - for(FragmentData data: fragments){ - FragmentHandle handle = data.getStatus().getHandle(); - switch(data.getStatus().getProfile().getState()){ - case SENDING: - case AWAITING_ALLOCATION: - case RUNNING: - if(data.isLocal()){ - rootRunner.cancel(); - }else{ - controller.getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle); - } - break; - default: - break; - } - } - - workBus.removeFragmentStatusListener(queryId); - coord.removeDrillbitStatusListener(this); - - stopped = true; - } - - public void cancel(){ - cancelled = true; - if (running) { - stopQuery(); - stopped = true; - QueryResult result = QueryResult.newBuilder().setQueryId(queryId).setQueryState(QueryState.CANCELED).setIsLastChunk(true).build(); - foremanManagerListener.cleanupAndSendResult(result); + for (FragmentData fragment : fragments) { + this.includedBits.add(fragment.getEndpoint()); } } - private class CancelListener extends EndpointListener<Ack, FragmentHandle>{ - - public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) { - super(endpoint, handle); - } - - @Override - public void failed(RpcException ex) { - logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex); - } - - @Override - public void success(Ack value, ByteBuf buf) { - if(!value.getOk()){ - logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value); - } - // do nothing. - } - + public void addFragmentStatusTracker(PlanFragment fragment, boolean isRoot){ + addFragmentStatusTracker(fragment.getHandle(), fragment.getAssignment(), isRoot); } - public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){ - return new FragmentSubmitListener(endpoint, value, latch); + public void addFragmentStatusTracker(FragmentHandle handle, DrillbitEndpoint node, boolean isRoot){ + status.add(new FragmentData(handle, node, isRoot)); } - private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{ - - private CountDownLatch latch; - - public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) { - super(endpoint, value); - this.latch = latch; - } - - @Override - public void success(Ack ack, ByteBuf byteBuf) { - if (latch != null) { - latch.countDown(); - } - } - - @Override - public void failed(RpcException ex) { - logger.debug("Failure while sending fragment. Stopping query.", ex); - UserBitShared.DrillPBError error = ErrorHelper.logAndConvertError(endpoint, "Failure while sending fragment.", ex, logger); - failWithError(error); - } - + public RootStatusReporter getRootStatusHandler(FragmentContext context, PlanFragment fragment){ + return new RootStatusReporter(context, fragment); } - private class RootStatusHandler extends AbstractStatusReporter{ + class RootStatusReporter extends AbstractStatusReporter{ - private RootStatusHandler(FragmentContext context, PlanFragment fragment){ + private RootStatusReporter(FragmentContext context, PlanFragment fragment){ super(context); } @Override protected void statusChange(FragmentHandle handle, FragmentStatus status) { - QueryManager.this.statusUpdate(status); + statusUpdate(status); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java index 8da0ffb..4e18da6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java @@ -89,8 +89,8 @@ public class QueryStatus { this.planText = planText; } - public void setStartTime(long startTime) { - this.startTime = startTime; + public void markStart() { + this.startTime = System.currentTimeMillis(); } public void setEndTime(long endTime) { @@ -125,8 +125,7 @@ public class QueryStatus { fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus); } - public synchronized void updateQueryStateInStore() { - QueryState queryState = foreman.getQueryState(); + synchronized QueryState updateQueryStateInStore(QueryState queryState) { switch (queryState) { case PENDING: case RUNNING: @@ -140,12 +139,13 @@ public class QueryStatus { }catch(Exception e){ logger.warn("Failure while trying to delete the estore profile for this query.", e); } - //profileEStore.put(queryId, getAsProfile(false)); // Change the state in EStore to complete/cancel/fail. - // profileEStore.delete(queryId); // delete the ephemeral query profile. + profilePStore.put(queryId, getAsProfile()); break; default: + throw new IllegalStateException(); } + return queryState; } @Override @@ -212,7 +212,7 @@ public class QueryStatus { public QueryInfo getAsInfo() { return QueryInfo.newBuilder() // .setQuery(query.getPlan()) - .setState(foreman.getQueryState()) + .setState(foreman.getState()) .setForeman(foreman.getContext().getCurrentEndpoint()) .setStart(startTime) .build(); @@ -243,7 +243,7 @@ public class QueryStatus { } } - b.setState(foreman.getQueryState()); + b.setState(foreman.getState()); b.setForeman(foreman.getContext().getCurrentEndpoint()); b.setStart(startTime); b.setEnd(endTime); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java deleted file mode 100644 index 8c98296..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RootStatusReporter.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.work.foreman; - -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.work.fragment.AbstractStatusReporter; - -public class RootStatusReporter extends AbstractStatusReporter{ - - QueryManager runningFragmentManager; - - private RootStatusReporter(FragmentContext context){ - super(context); - } - - @Override - protected void statusChange(FragmentHandle handle, FragmentStatus status) { - runningFragmentManager.statusUpdate(status); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 9f08e97..f76dfcd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -200,11 +200,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid @Override public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) { - if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanDrillbitEndPoint())) { - logger.warn("Forman : {} seems not responding or not work properly. Cancel this fragment {}:{}", - FragmentExecutor.this.context.getForemanDrillbitEndPoint(), - FragmentExecutor.this.context.getHandle().getMajorFragmentId(), - FragmentExecutor.this.context.getHandle().getMinorFragmentId()); + if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanEndpoint())) { + logger.warn("Forman : {} no longer active. Cancelling fragment {}.", + FragmentExecutor.this.context.getForemanEndpoint().getAddress(), + QueryIdHelper.getQueryIdentifier(context.getHandle())); FragmentExecutor.this.cancel(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 9798701..312f96a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.batch.IncomingBuffers; +import org.apache.drill.exec.work.foreman.ForemanException; /** * This managers determines when to run a non-root fragment node. @@ -47,7 +48,7 @@ public class NonRootFragmentManager implements FragmentManager { private final FragmentContext context; private List<RemoteConnection> connections = new CopyOnWriteArrayList<>(); - public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws FragmentSetupException{ + public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws ExecutionSetupException { try { this.fragment = fragment; DrillbitContext context = bee.getContext(); @@ -58,7 +59,7 @@ public class NonRootFragmentManager implements FragmentManager { this.context.setBuffers(buffers); this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman())); - } catch (ExecutionSetupException | IOException e) { + } catch (ForemanException | IOException e) { throw new FragmentSetupException("Failure while decoding fragment.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java index 30402b7..854f474 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java @@ -17,23 +17,16 @@ */ package org.apache.drill.exec.work.user; -import java.util.Random; -import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; -import org.apache.drill.exec.proto.UserBitShared.QueryResult; -import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; -import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.options.OptionManager; -import org.apache.drill.exec.store.SchemaFactory; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.Foreman; -import org.apache.drill.exec.work.fragment.FragmentExecutor; public class UserWorker{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class); @@ -46,7 +39,7 @@ public class UserWorker{ } public QueryId submitWork(UserClientConnection connection, RunQuery query) { - Random r = new Random(); + ThreadLocalRandom r = ThreadLocalRandom.current(); // create a new queryid where the first four bytes are a growing time (each new value comes earlier in sequence). Last 12 bytes are random. long time = (int) (System.currentTimeMillis()/1000); @@ -58,14 +51,6 @@ public class UserWorker{ return id; } - public QueryResult getResult(UserClientConnection connection, RequestResults req) { - Foreman foreman = bee.getForemanForQueryId(req.getQueryId()); - if (foreman == null) { - return QueryResult.newBuilder().setQueryState(QueryState.UNKNOWN_QUERY).build(); - } - return foreman.getResult(connection, req); - } - public Ack cancelQuery(QueryId query) { Foreman foreman = bee.getForemanForQueryId(query); if(foreman != null) { @@ -74,18 +59,6 @@ public class UserWorker{ return Acks.OK; } - public Ack cancelFragment(FragmentHandle handle) { - FragmentExecutor runner = bee.getFragmentRunner(handle); - if (runner != null) { - runner.cancel(); - } - return Acks.OK; - } - - public SchemaFactory getSchemaFactory() { - return bee.getContext().getSchemaFactory(); - } - public OptionManager getSystemOptions() { return bee.getContext().getOptionManager(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java index ce47578..9a32ff9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; +import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.junit.BeforeClass; import com.google.common.base.Charsets; @@ -53,7 +54,7 @@ public abstract class PopUnitTestBase extends ExecTest{ return i; } - public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException { + public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException, ForemanSetupException { MakeFragmentsVisitor f = new MakeFragmentsVisitor(); PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fc58c693/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java index ec8bd94..6491df6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmenter.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.junit.Test; public class TestFragmenter extends PopUnitTestBase { @@ -34,7 +35,7 @@ public class TestFragmenter extends PopUnitTestBase { @Test - public void ensureOneFragment() throws FragmentSetupException, IOException { + public void ensureOneFragment() throws FragmentSetupException, IOException, ForemanSetupException { PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance()); Fragment b = getRootFragment(ppr, "/physical_test1.json"); assertEquals(1, getFragmentCount(b)); @@ -43,7 +44,7 @@ public class TestFragmenter extends PopUnitTestBase { } @Test - public void ensureThreeFragments() throws FragmentSetupException, IOException { + public void ensureThreeFragments() throws FragmentSetupException, IOException, ForemanSetupException { PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance()); Fragment b = getRootFragment(ppr, "/physical_double_exchange.json"); logger.debug("Fragment Node {}", b);