DRILL-4286: Graceful shutdown of drillbit closes #921
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5f044f2a Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5f044f2a Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5f044f2a Branch: refs/heads/master Commit: 5f044f2a6d0cd34a3d4107ece4c0637469f89b40 Parents: d3f8da2 Author: dvjyothsna <jyothsnadonapati@Skatkam-598.local> Authored: Thu Aug 24 08:58:00 2017 -0700 Committer: Arina Ielchiieva <arina.yelchiy...@gmail.com> Committed: Wed Nov 29 12:22:00 2017 +0200 ---------------------------------------------------------------------- distribution/src/resources/drillbit.sh | 25 +- .../org/apache/drill/exec/ExecConstants.java | 13 + .../apache/drill/exec/client/DrillClient.java | 11 +- .../drill/exec/coord/ClusterCoordinator.java | 24 + .../coord/local/LocalClusterCoordinator.java | 56 ++- .../exec/coord/zk/ZKClusterCoordinator.java | 83 +++- .../exec/coord/zk/ZKRegistrationHandle.java | 14 +- .../org/apache/drill/exec/ops/QueryContext.java | 4 + .../org/apache/drill/exec/server/Drillbit.java | 69 ++- .../drill/exec/server/DrillbitContext.java | 31 +- .../drill/exec/server/DrillbitStateManager.java | 80 ++++ .../drill/exec/server/rest/DrillRestServer.java | 4 +- .../drill/exec/server/rest/DrillRoot.java | 479 ++++++++++++------- .../drill/exec/server/rest/WebServer.java | 10 +- .../drill/exec/service/ServiceEngine.java | 2 + .../drill/exec/store/sys/DrillbitIterator.java | 18 +- .../org/apache/drill/exec/work/WorkManager.java | 43 +- .../apache/drill/exec/work/foreman/Foreman.java | 14 +- .../src/main/resources/drill-module.conf | 10 + .../java-exec/src/main/resources/rest/index.ftl | 110 ++++- .../work/metadata/TestMetadataProvider.java | 2 +- .../org/apache/drill/test/ClusterFixture.java | 19 + .../apache/drill/test/TestGracefulShutdown.java | 250 ++++++++++ .../drill/exec/proto/CoordinationProtos.java | 214 ++++++++- .../exec/proto/SchemaCoordinationProtos.java | 7 + .../exec/proto/beans/DrillbitEndpoint.java | 54 +++ protocol/src/main/protobuf/Coordination.proto | 7 + 27 files changed, 1416 insertions(+), 237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/distribution/src/resources/drillbit.sh ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drillbit.sh b/distribution/src/resources/drillbit.sh index de7f21a..49b92ed 100755 --- a/distribution/src/resources/drillbit.sh +++ b/distribution/src/resources/drillbit.sh @@ -92,15 +92,18 @@ waitForProcessEnd() { pidKilled=$1 commandName=$2 + kill_drillbit=$3 processedAt=`date +%s` origcnt=${DRILL_STOP_TIMEOUT:-120} while kill -0 $pidKilled > /dev/null 2>&1; do echo -n "." sleep 1; - # if process persists more than $DRILL_STOP_TIMEOUT (default 120 sec) no mercy - if [ $(( `date +%s` - $processedAt )) -gt $origcnt ]; then - break; + if [ "$kill_drillbit" = true ] ; then + # if process persists more than $DRILL_STOP_TIMEOUT (default 120 sec) no mercy + if [ $(( `date +%s` - $processedAt )) -gt $origcnt ]; then + break; + fi fi done echo @@ -155,6 +158,7 @@ start_bit ( ) stop_bit ( ) { + kill_drillbit=$1 if [ -f $pid ]; then pidToKill=`cat $pid` # kill -0 == see if the PID exists @@ -162,7 +166,7 @@ stop_bit ( ) echo "Stopping $command" echo "`date` Terminating $command pid $pidToKill" >> "$DRILLBIT_LOG_PATH" kill $pidToKill > /dev/null 2>&1 - waitForProcessEnd $pidToKill $command + waitForProcessEnd $pidToKill $command $kill_drillbit retval=0 else retval=$? @@ -199,7 +203,18 @@ case $startStopStatus in ;; (stop) - stop_bit + kill_drillbit=true + stop_bit $kill_drillbit + exit $? + ;; + +# Shutdown the drillbit gracefully without disrupting the in-flight queries. +# In this case, if there are any long running queries the drillbit will take a +# little longer to shutdown. Incase if the user wishes to shutdown immediately +# they can issue stop instead of graceful_stop. +(graceful_stop) + kill_drillbit=false + stop_bit $kill_drillbit exit $? ;; http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 89b4b48..52aa52d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -602,4 +602,17 @@ public final class ExecConstants { public static String bootDefaultFor(String name) { return OPTION_DEFAULTS_ROOT + name; } + /** + * Boot-time config option provided to modify duration of the grace period. + * Grace period is the amount of time where the drillbit accepts work after + * the shutdown request is triggered. The primary use of grace period is to + * avoid the race conditions caused by zookeeper delay in updating the state + * information of the drillbit that is shutting down. So, it is advisable + * to have a grace period that is atleast twice the amount of zookeeper + * refresh time. + */ + public static final String GRACE_PERIOD = "drill.exec.grace_period_ms"; + + public static final String DRILL_PORT_HUNT = "drill.exec.port_hunt"; + } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 84b34a7..248058f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -329,7 +329,10 @@ public class DrillClient implements Closeable, ConnectionThrottle { throw new RpcException("Failure setting up ZK for client.", e); } } - endpoints.addAll(clusterCoordinator.getAvailableEndpoints()); + // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are + // in QUIESCENT state. This avoids the clients connecting to drillbits that are + // shutting down thereby avoiding reducing the chances of query failures. + endpoints.addAll(clusterCoordinator.getOnlineEndPoints()); // Make sure we have at least one endpoint in the list checkState(!endpoints.isEmpty(), "No active Drillbit endpoint found from ZooKeeper. Check connection parameters?"); } @@ -418,7 +421,10 @@ public class DrillClient implements Closeable, ConnectionThrottle { retry--; try { Thread.sleep(this.reconnectDelay); - final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getAvailableEndpoints()); + // Gets the drillbit endpoints that are ONLINE and excludes the drillbits that are + // in QUIESCENT state. This avoids the clients connecting to drillbits that are + // shutting down thereby reducing the chances of query failures. + final ArrayList<DrillbitEndpoint> endpoints = new ArrayList<>(clusterCoordinator.getOnlineEndPoints()); if (endpoints.isEmpty()) { continue; } @@ -434,6 +440,7 @@ public class DrillClient implements Closeable, ConnectionThrottle { private void connect(DrillbitEndpoint endpoint) throws RpcException { client.connect(endpoint, properties, getUserCredentials()); + logger.info("Foreman drillbit is" + endpoint.getAddress()); } public BufferAllocator getAllocator() { http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java index e758d6f..32b1633 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.drill.exec.coord.store.TransientStore; import org.apache.drill.exec.coord.store.TransientStoreConfig; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; import org.apache.drill.exec.work.foreman.DrillbitStatusListener; /** @@ -60,7 +61,26 @@ public abstract class ClusterCoordinator implements AutoCloseable { */ public abstract Collection<DrillbitEndpoint> getAvailableEndpoints(); + /** + * Get a collection of ONLINE drillbit endpoints by excluding the drillbits + * that are in QUIESCENT state (drillbits that are shutting down). Primarily used by the planner + * to plan queries only on ONLINE drillbits and used by the client during initial connection + * phase to connect to a drillbit (foreman) + * @return A collection of ONLINE endpoints + */ + + public abstract Collection<DrillbitEndpoint> getOnlineEndPoints(); + + public abstract RegistrationHandle update(RegistrationHandle handle, State state); + public interface RegistrationHandle { + /** + * Get the drillbit endpoint associated with the registration handle + * @return drillbit endpoint + */ + public abstract DrillbitEndpoint getEndPoint(); + + public abstract void setEndPoint(DrillbitEndpoint endpoint); } public abstract DistributedSemaphore getSemaphore(String name, int maximumLeases); @@ -108,4 +128,8 @@ public abstract class ClusterCoordinator implements AutoCloseable { listeners.remove(listener); } + public boolean isDrillbitInState(DrillbitEndpoint endpoint, DrillbitEndpoint.State state) { + return (!endpoint.hasState() || endpoint.getState().equals(state)); + } + } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java index 8c13c42..86bc606 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.coord.local; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; import java.util.Map; import java.util.UUID; @@ -33,6 +34,7 @@ import org.apache.drill.exec.coord.store.TransientStore; import org.apache.drill.exec.coord.store.TransientStoreConfig; import org.apache.drill.exec.coord.store.TransientStoreFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; import com.google.common.collect.Maps; @@ -69,9 +71,10 @@ public class LocalClusterCoordinator extends ClusterCoordinator { } @Override - public RegistrationHandle register(final DrillbitEndpoint data) { + public RegistrationHandle register( DrillbitEndpoint data) { logger.debug("Endpoint registered {}.", data); - final Handle h = new Handle(); + final Handle h = new Handle(data); + data = data.toBuilder().setState(State.ONLINE).build(); endpoints.put(h, data); return h; } @@ -85,13 +88,62 @@ public class LocalClusterCoordinator extends ClusterCoordinator { endpoints.remove(handle); } + /** + * Update drillbit endpoint state. Drillbit advertises its + * state. State information is used during planning and initial + * client connection phases. + */ + @Override + public RegistrationHandle update(RegistrationHandle handle, State state) { + DrillbitEndpoint endpoint = handle.getEndPoint(); + endpoint = endpoint.toBuilder().setState(state).build(); + handle.setEndPoint(endpoint); + endpoints.put(handle,endpoint); + return handle; + } + @Override public Collection<DrillbitEndpoint> getAvailableEndpoints() { return endpoints.values(); } + /** + * Get a collection of ONLINE Drillbit endpoints by excluding the drillbits + * that are in QUIESCENT state (drillbits shutting down). Primarily used by the planner + * to plan queries only on ONLINE drillbits and used by the client during initial connection + * phase to connect to a drillbit (foreman) + * @return A collection of ONLINE endpoints + */ + @Override + public Collection<DrillbitEndpoint> getOnlineEndPoints() { + Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>(); + for (DrillbitEndpoint endpoint: endpoints.values()){ + if(isDrillbitInState(endpoint, State.ONLINE)) { + runningEndPoints.add(endpoint); + } + } + return runningEndPoints; + } + private class Handle implements RegistrationHandle { private final UUID id = UUID.randomUUID(); + private DrillbitEndpoint drillbitEndpoint; + + /** + * Get the drillbit endpoint associated with the registration handle + * @return drillbit endpoint + */ + public DrillbitEndpoint getEndPoint() { + return drillbitEndpoint; + } + + public void setEndPoint(DrillbitEndpoint endpoint) { + this.drillbitEndpoint = endpoint; + } + + private Handle(DrillbitEndpoint data) { + drillbitEndpoint = data; + } @Override public int hashCode() { http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java index b14a151..472bc3d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java @@ -23,15 +23,16 @@ import static com.google.common.collect.Collections2.transform; import java.io.IOException; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; -import java.util.List; +import java.util.ArrayList; import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import com.google.common.collect.Lists; +import org.apache.commons.collections.keyvalue.MultiKey; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -54,6 +55,7 @@ import org.apache.drill.exec.coord.store.TransientStore; import org.apache.drill.exec.coord.store.TransientStoreConfig; import org.apache.drill.exec.coord.store.TransientStoreFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; import com.google.common.base.Function; @@ -70,7 +72,10 @@ public class ZKClusterCoordinator extends ClusterCoordinator { private final CountDownLatch initialConnection = new CountDownLatch(1); private final TransientStoreFactory factory; private ServiceCache<DrillbitEndpoint> serviceCache; + private DrillbitEndpoint endpoint; + // endpointsMap maps Multikey( comprises of endoint address and port) to Drillbit endpoints + private ConcurrentHashMap<MultiKey, DrillbitEndpoint> endpointsMap = new ConcurrentHashMap<MultiKey,DrillbitEndpoint>(); private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$"); public ZKClusterCoordinator(DrillConfig config) throws IOException{ @@ -169,9 +174,10 @@ public class ZKClusterCoordinator extends ClusterCoordinator { @Override public RegistrationHandle register(DrillbitEndpoint data) { try { + data = data.toBuilder().setState(State.ONLINE).build(); ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(data); discovery.registerService(serviceInstance); - return new ZKRegistrationHandle(serviceInstance.getId()); + return new ZKRegistrationHandle(serviceInstance.getId(),data); } catch (Exception e) { throw propagate(e); } @@ -200,11 +206,50 @@ public class ZKClusterCoordinator extends ClusterCoordinator { } } + /** + * Update drillbit endpoint state. Drillbit advertises its + * state in Zookeeper when a shutdown request of drillbit is + * triggered. State information is used during planning and + * initial client connection phases. + */ + public RegistrationHandle update(RegistrationHandle handle, State state) { + ZKRegistrationHandle h = (ZKRegistrationHandle) handle; + try { + endpoint = h.endpoint.toBuilder().setState(state).build(); + ServiceInstance<DrillbitEndpoint> serviceInstance = ServiceInstance.<DrillbitEndpoint>builder() + .name(serviceName) + .id(h.id) + .payload(endpoint).build(); + discovery.updateService(serviceInstance); + } catch (Exception e) { + propagate(e); + } + return handle; + } + @Override public Collection<DrillbitEndpoint> getAvailableEndpoints() { return this.endpoints; } + /* + * Get a collection of ONLINE Drillbit endpoints by excluding the drillbits + * that are in QUIESCENT state (drillbits shutting down). Primarily used by the planner + * to plan queries only on ONLINE drillbits and used by the client during initial connection + * phase to connect to a drillbit (foreman) + * @return A collection of ONLINE endpoints + */ + @Override + public Collection<DrillbitEndpoint> getOnlineEndPoints() { + Collection<DrillbitEndpoint> runningEndPoints = new ArrayList<>(); + for (DrillbitEndpoint endpoint: endpoints){ + if(isDrillbitInState(endpoint, State.ONLINE)) { + runningEndPoints.add(endpoint); + } + } + logger.debug("Online endpoints in ZK are" + runningEndPoints.toString()); + return runningEndPoints; + } @Override public DistributedSemaphore getSemaphore(String name, int maximumLeases) { @@ -219,6 +264,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator { private synchronized void updateEndpoints() { try { + // All active bits in the Zookeeper Collection<DrillbitEndpoint> newDrillbitSet = transform(discovery.queryForInstances(serviceName), new Function<ServiceInstance<DrillbitEndpoint>, DrillbitEndpoint>() { @@ -229,27 +275,42 @@ public class ZKClusterCoordinator extends ClusterCoordinator { }); // set of newly dead bits : original bits - new set of active bits. - Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(endpoints); - unregisteredBits.removeAll(newDrillbitSet); - + Set<DrillbitEndpoint> unregisteredBits = new HashSet<>(); // Set of newly live bits : new set of active bits - original bits. - Set<DrillbitEndpoint> registeredBits = new HashSet<>(newDrillbitSet); - registeredBits.removeAll(endpoints); + Set<DrillbitEndpoint> registeredBits = new HashSet<>(); - endpoints = newDrillbitSet; + // Updates the endpoints map if there is a change in state of the endpoint or with the addition + // of new drillbit endpoints. Registered endpoints is set to newly live drillbit endpoints. + for ( DrillbitEndpoint endpoint : newDrillbitSet) { + String endpointAddress = endpoint.getAddress(); + int endpointPort = endpoint.getUserPort(); + if (! endpointsMap.containsKey(new MultiKey(endpointAddress, endpointPort))) { + registeredBits.add(endpoint); + } + endpointsMap.put(new MultiKey(endpointAddress, endpointPort),endpoint); + } + // Remove all the endpoints that are newly dead + for ( MultiKey key: endpointsMap.keySet()) { + if(!newDrillbitSet.contains(endpointsMap.get(key))) { + unregisteredBits.add(endpointsMap.get(key)); + endpointsMap.remove(key); + } + } + endpoints = endpointsMap.values(); if (logger.isDebugEnabled()) { StringBuilder builder = new StringBuilder(); builder.append("Active drillbit set changed. Now includes "); builder.append(newDrillbitSet.size()); builder.append(" total bits. New active drillbits:\n"); - builder.append("Address | User Port | Control Port | Data Port | Version |\n"); + builder.append("Address | User Port | Control Port | Data Port | Version | State\n"); for (DrillbitEndpoint bit: newDrillbitSet) { builder.append(bit.getAddress()).append(" | "); builder.append(bit.getUserPort()).append(" | "); builder.append(bit.getControlPort()).append(" | "); builder.append(bit.getDataPort()).append(" | "); builder.append(bit.getVersion()).append(" |"); + builder.append(bit.getState()).append(" | "); builder.append('\n'); } logger.debug(builder.toString()); http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java index f0c465f..fca3296 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKRegistrationHandle.java @@ -18,15 +18,27 @@ package org.apache.drill.exec.coord.zk; import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; public class ZKRegistrationHandle implements RegistrationHandle { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKRegistrationHandle.class); public final String id; + public DrillbitEndpoint endpoint; - public ZKRegistrationHandle(String id) { + public DrillbitEndpoint getEndPoint() { + return endpoint; + } + + @Override + public void setEndPoint(DrillbitEndpoint endpoint) { + this.endpoint = endpoint; + } + + public ZKRegistrationHandle(String id, DrillbitEndpoint endpoint) { super(); this.id = id; + this.endpoint = endpoint; } } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 125dfac..eb32bc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -213,6 +213,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem return drillbitContext.getBits(); } + public Collection<DrillbitEndpoint> getOnlineEndpoints() { + return drillbitContext.getBits(); + } + public DrillConfig getConfig() { return drillbitContext.getConfig(); } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index a333ff2..4144da0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.StackTrace; +import org.apache.drill.common.concurrent.ExtendedLatch; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.common.scanner.ClassPathScanner; @@ -32,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState; import org.apache.drill.exec.server.options.OptionDefinition; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.server.options.OptionValue.OptionScope; @@ -48,6 +50,7 @@ import org.apache.drill.exec.util.GuavaPatcher; import org.apache.drill.exec.work.WorkManager; import org.apache.zookeeper.Environment; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; @@ -77,6 +80,23 @@ public class Drillbit implements AutoCloseable { private final WorkManager manager; private final BootStrapContext context; private final WebServer webServer; + private final int gracePeriod; + private DrillbitStateManager stateManager; + private boolean quiescentMode; + private boolean forcefulShutdown = false; + + public void setQuiescentMode(boolean quiescentMode) { + this.quiescentMode = quiescentMode; + } + + public void setForcefulShutdown(boolean forcefulShutdown) { + this.forcefulShutdown = forcefulShutdown; + } + + public RegistrationHandle getRegistrationHandle() { + return registrationHandle; + } + private RegistrationHandle registrationHandle; private volatile StoragePluginRegistry storageRegistry; private final PersistentStoreProvider profileStoreProvider; @@ -110,13 +130,15 @@ public class Drillbit implements AutoCloseable { final CaseInsensitiveMap<OptionDefinition> definitions, final RemoteServiceSet serviceSet, final ScanResult classpathScan) throws Exception { + gracePeriod = config.getInt(ExecConstants.GRACE_PERIOD); final Stopwatch w = Stopwatch.createStarted(); logger.debug("Construction started."); - final boolean allowPortHunting = serviceSet != null; + boolean drillPortHunt = config.getBoolean(ExecConstants.DRILL_PORT_HUNT); + final boolean allowPortHunting = (serviceSet != null) || drillPortHunt; context = new BootStrapContext(config, definitions, classpathScan); manager = new WorkManager(context); - webServer = new WebServer(context, manager); + webServer = new WebServer(context, manager, this); boolean isDistributedMode = false; if (serviceSet != null) { coord = serviceSet.getCoordinator(); @@ -137,6 +159,7 @@ public class Drillbit implements AutoCloseable { engine = new ServiceEngine(manager, context, allowPortHunting, isDistributedMode); + stateManager = new DrillbitStateManager(DrillbitState.STARTUP); logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS)); } @@ -152,6 +175,7 @@ public class Drillbit implements AutoCloseable { final Stopwatch w = Stopwatch.createStarted(); logger.debug("Startup begun."); coord.start(10000); + stateManager.setState(DrillbitState.ONLINE); storeProvider.start(); if (profileStoreProvider != storeProvider) { profileStoreProvider.start(); @@ -176,18 +200,43 @@ public class Drillbit implements AutoCloseable { logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS)); } + /* + Wait uninterruptibly + */ + public void waitForGracePeriod() { + ExtendedLatch exitLatch = new ExtendedLatch(); + exitLatch.awaitUninterruptibly(gracePeriod); + } + + /* + + */ + public void shutdown() { + this.close(); + } + /* + The drillbit is moved into Quiescent state and the drillbit waits for grace period amount of time. + Then drillbit moves into draining state and waits for all the queries and fragments to complete. + */ @Override public synchronized void close() { - // avoid complaints about double closing - if (isClosed) { + if ( !stateManager.getState().equals(DrillbitState.ONLINE)) { return; } final Stopwatch w = Stopwatch.createStarted(); logger.debug("Shutdown begun."); - - // wait for anything that is running to complete - manager.waitToExit(); - + registrationHandle = coord.update(registrationHandle, State.QUIESCENT); + stateManager.setState(DrillbitState.GRACE); + waitForGracePeriod(); + stateManager.setState(DrillbitState.DRAINING); + // wait for all the in-flight queries to finish + manager.waitToExit(this, forcefulShutdown); + //safe to exit + registrationHandle = coord.update(registrationHandle, State.OFFLINE); + stateManager.setState(DrillbitState.OFFLINE); + if(quiescentMode == true) { + return; + } if (coord != null && registrationHandle != null) { coord.unregister(registrationHandle); } @@ -219,8 +268,8 @@ public class Drillbit implements AutoCloseable { logger.warn("Failure on close()", e); } - logger.info("Shutdown completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS)); - isClosed = true; + logger.info("Shutdown completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS) ); + stateManager.setState(DrillbitState.SHUTDOWN); } private void javaPropertiesToSystemOptions() { http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index b8a8b1e..f65592b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -157,10 +157,39 @@ public class DrillbitContext implements AutoCloseable { return context.getConfig(); } - public Collection<DrillbitEndpoint> getBits() { + public Collection<DrillbitEndpoint> getAvailableBits() { return coord.getAvailableEndpoints(); } + public Collection<DrillbitEndpoint> getBits() { + return coord.getOnlineEndPoints(); + } + + public boolean isOnline(DrillbitEndpoint endpoint) { + return endpoint.getState().equals(DrillbitEndpoint.State.ONLINE); + } + + public boolean isForeman(DrillbitEndpoint endpoint) { + DrillbitEndpoint foreman = getEndpoint(); + if(endpoint.getAddress().equals(foreman.getAddress()) && + endpoint.getUserPort() == foreman.getUserPort()) { + return true; + } + return false; + } + + public boolean isForemanOnline() { + Collection<DrillbitEndpoint> dbs = getAvailableBits(); + for (DrillbitEndpoint db : dbs) { + if( isForeman(db)) { + if (isOnline(db)) { + return true; + } + } + } + return false; + } + public BufferAllocator getAllocator() { return context.getAllocator(); } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java new file mode 100644 index 0000000..dfffce8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitStateManager.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.server; +/* + State manager to manage the state of drillbit. + */ +public class DrillbitStateManager { + + public enum DrillbitState { + STARTUP, ONLINE, GRACE, DRAINING, OFFLINE, SHUTDOWN + } + + private DrillbitState currentState; + + public DrillbitStateManager(DrillbitState currentState) { + this.currentState = currentState; + } + + public DrillbitState getState() { + return currentState; + } + + public void setState(DrillbitState newState) { + switch (newState) { + case ONLINE: + if (currentState == DrillbitState.STARTUP) { + currentState = newState; + } else { + throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState); + } + break; + case GRACE: + if (currentState == DrillbitState.ONLINE) { + currentState = newState; + } else { + throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState); + } + break; + case DRAINING: + if (currentState == DrillbitState.GRACE) { + currentState = newState; + } else { + throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState); + } + break; + case OFFLINE: + if (currentState == DrillbitState.DRAINING) { + currentState = newState; + } else { + throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState); + } + break; + case SHUTDOWN: + if (currentState == DrillbitState.OFFLINE) { + currentState = newState; + } else { + throw new IllegalStateException("Cannot set drillbit to" + newState + "from" + currentState); + } + break; + } + } + +} + http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java index 1545847..89141d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.rest.WebUserConnection.AnonWebUserConnection; import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature; @@ -73,7 +74,7 @@ import java.util.List; public class DrillRestServer extends ResourceConfig { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRestServer.class); - public DrillRestServer(final WorkManager workManager, final ServletContext servletContext) { + public DrillRestServer(final WorkManager workManager, final ServletContext servletContext, final Drillbit drillbit) { register(DrillRoot.class); register(StatusResources.class); register(StorageResources.class); @@ -120,6 +121,7 @@ public class DrillRestServer extends ResourceConfig { register(new AbstractBinder() { @Override protected void configure() { + bind(drillbit).to(Drillbit.class); bind(workManager).to(WorkManager.class); bind(executor).to(EventExecutor.class); bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class); http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java index 55bfca4..da1d2fb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java @@ -18,12 +18,16 @@ package org.apache.drill.exec.server.rest; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import javax.annotation.security.PermitAll; import javax.inject.Inject; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.ws.rs.core.SecurityContext; import javax.xml.bind.annotation.XmlRootElement; @@ -33,6 +37,7 @@ import com.google.common.collect.Sets; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled; @@ -55,9 +60,18 @@ import com.fasterxml.jackson.annotation.JsonCreator; public class DrillRoot { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillRoot.class); - @Inject UserAuthEnabled authEnabled; - @Inject WorkManager work; - @Inject SecurityContext sc; + @Inject + UserAuthEnabled authEnabled; + @Inject + WorkManager work; + @Inject + SecurityContext sc; + @Inject + Drillbit drillbit; + + public enum ShutdownMode { + forcefulShutdown, gracefulShutdown, quiescent + } @GET @Produces(MediaType.TEXT_HTML) @@ -65,6 +79,90 @@ public class DrillRoot { return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getClusterInfoJSON()); } + + @SuppressWarnings("resource") + @GET + @Path("/state") + @Produces(MediaType.APPLICATION_JSON) + public Response getDrillbitStatus() { + Collection<DrillbitInfo> drillbits = getClusterInfoJSON().getDrillbits(); + Map<String, String> drillStatusMap = new HashMap<String, String>(); + for (DrillbitInfo drillbit : drillbits) { + drillStatusMap.put(drillbit.getAddress() + "-" + drillbit.getUserPort(), drillbit.getState()); + } + Response response = setResponse(drillStatusMap); + return response; + } + + @SuppressWarnings("resource") + @GET + @Path("/gracePeriod") + @Produces(MediaType.APPLICATION_JSON) + public Map<String, Integer> getGracePeriod() { + + final DrillConfig config = work.getContext().getConfig(); + final int gracePeriod = config.getInt(ExecConstants.GRACE_PERIOD); + Map<String, Integer> gracePeriodMap = new HashMap<String, Integer>(); + gracePeriodMap.put("graceperiod", gracePeriod); + return gracePeriodMap; + } + + @SuppressWarnings("resource") + @GET + @Path("/portNum") + @Produces(MediaType.APPLICATION_JSON) + public Map<String, Integer> getPortNum() { + + final DrillConfig config = work.getContext().getConfig(); + final int port = config.getInt(ExecConstants.HTTP_PORT); + Map<String, Integer> portMap = new HashMap<String, Integer>(); + portMap.put("port", port); + return portMap; + } + + @SuppressWarnings("resource") + @GET + @Path("/queriesCount") + @Produces(MediaType.APPLICATION_JSON) + public Response getRemainingQueries() { + Map<String, Integer> queriesInfo = new HashMap<String, Integer>(); + queriesInfo = work.getRemainingQueries(); + Response response = setResponse(queriesInfo); + return response; + } + + @SuppressWarnings("resource") + @POST + @Path("/gracefulShutdown") + @Produces(MediaType.APPLICATION_JSON) + public Response shutdownDrillbit() throws Exception { + String resp = "Graceful Shutdown request is triggered"; + return shutdown(resp); + + } + + @SuppressWarnings("resource") + @POST + @Path("/shutdown") + @Produces(MediaType.APPLICATION_JSON) + public Response ShutdownForcefully() throws Exception { + drillbit.setForcefulShutdown(true); + String resp = "Forceful shutdown request is triggered"; + return shutdown(resp); + + } + + @SuppressWarnings("resource") + @POST + @Path("/quiescent") + @Produces(MediaType.APPLICATION_JSON) + public Response drillbitToQuiescentMode() throws Exception { + drillbit.setQuiescentMode(true); + String resp = "Request to put drillbit in Quiescent mode is triggered"; + return shutdown(resp); + } + + @SuppressWarnings("resource") @GET @Path("/cluster.json") @@ -79,8 +177,8 @@ public class DrillRoot { final DrillConfig config = dbContext.getConfig(); final boolean userEncryptionEnabled = - config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED) || - config .getBoolean(ExecConstants.USER_SSL_ENABLED); + config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED) || + config .getBoolean(ExecConstants.USER_SSL_ENABLED); final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED); // If the user is logged in and is admin user then show the admin user info // For all other cases the user info need-not or should-not be displayed @@ -94,7 +192,7 @@ public class DrillRoot { final boolean shouldShowUserInfo = isUserLoggedIn && ((DrillUserPrincipal)sc.getUserPrincipal()).isAdminUser(); - for (DrillbitEndpoint endpoint : work.getContext().getBits()) { + for (DrillbitEndpoint endpoint : work.getContext().getAvailableBits()) { final DrillbitInfo drillbit = new DrillbitInfo(endpoint, currentDrillbit.equals(endpoint), currentVersion.equals(endpoint.getVersion())); @@ -107,219 +205,254 @@ public class DrillRoot { " userLoggedIn " + isUserLoggedIn + " shouldShowUserInfo: " + shouldShowUserInfo ); return new ClusterInfo(drillbits, currentVersion, mismatchedVersions, - userEncryptionEnabled, bitEncryptionEnabled, processUser, processUserGroups, adminUsers, - adminUserGroups, shouldShowUserInfo, QueueInfo.build(dbContext.getResourceManager())); + userEncryptionEnabled, bitEncryptionEnabled, processUser, processUserGroups, adminUsers, + adminUserGroups, shouldShowUserInfo, QueueInfo.build(dbContext.getResourceManager())); } - /** - * Pretty-printing wrapper class around the ZK-based queue summary. - */ + public Response setResponse(Map entity) { + return Response.ok() + .entity(entity) + .header("Access-Control-Allow-Origin", "*") + .header("Access-Control-Allow-Methods", "GET, POST, DELETE, PUT") + .header("Access-Control-Allow-Credentials","true") + .allow("OPTIONS").build(); + } - @XmlRootElement - public static class QueueInfo { - private final ZKQueueInfo zkQueueInfo; + public Response shutdown(String resp) throws Exception { + Map<String, String> shutdownInfo = new HashMap<String, String>(); + new Thread(new Runnable() { + public void run() { + try { + drillbit.close(); + } catch (Exception e) { + logger.error("Request to shutdown drillbit failed", e); + } + } + }).start(); + shutdownInfo.put("response",resp); + Response response = setResponse(shutdownInfo); + return response; + } - public static QueueInfo build(ResourceManager rm) { - // Consider queues enabled only if the ZK-based queues are in use. +/** + * Pretty-printing wrapper class around the ZK-based queue summary. + */ - ThrottledResourceManager throttledRM = null; - if (rm != null && rm instanceof DynamicResourceManager) { - DynamicResourceManager dynamicRM = (DynamicResourceManager) rm; - rm = dynamicRM.activeRM(); - } - if (rm != null && rm instanceof ThrottledResourceManager) { - throttledRM = (ThrottledResourceManager) rm; - } - if (throttledRM == null) { - return new QueueInfo(null); - } - QueryQueue queue = throttledRM.queue(); - if (queue == null || !(queue instanceof DistributedQueryQueue)) { - return new QueueInfo(null); - } +@XmlRootElement +public static class QueueInfo { + private final ZKQueueInfo zkQueueInfo; - return new QueueInfo(((DistributedQueryQueue) queue).getInfo()); - } + public static QueueInfo build(ResourceManager rm) { + + // Consider queues enabled only if the ZK-based queues are in use. - @JsonCreator - public QueueInfo(ZKQueueInfo queueInfo) { - zkQueueInfo = queueInfo; + ThrottledResourceManager throttledRM = null; + if (rm != null && rm instanceof DynamicResourceManager) { + DynamicResourceManager dynamicRM = (DynamicResourceManager) rm; + rm = dynamicRM.activeRM(); + } + if (rm != null && rm instanceof ThrottledResourceManager) { + throttledRM = (ThrottledResourceManager) rm; + } + if (throttledRM == null) { + return new QueueInfo(null); + } + QueryQueue queue = throttledRM.queue(); + if (queue == null || !(queue instanceof DistributedQueryQueue)) { + return new QueueInfo(null); } - public boolean isEnabled() { return zkQueueInfo != null; } + return new QueueInfo(((DistributedQueryQueue) queue).getInfo()); + } - public int smallQueueSize() { - return isEnabled() ? zkQueueInfo.smallQueueSize : 0; - } + @JsonCreator + public QueueInfo(ZKQueueInfo queueInfo) { + zkQueueInfo = queueInfo; + } - public int largeQueueSize() { - return isEnabled() ? zkQueueInfo.largeQueueSize : 0; - } + public boolean isEnabled() { return zkQueueInfo != null; } - public String threshold() { - return isEnabled() - ? Double.toString(zkQueueInfo.queueThreshold) - : "N/A"; - } + public int smallQueueSize() { + return isEnabled() ? zkQueueInfo.smallQueueSize : 0; + } - public String smallQueueMemory() { - return isEnabled() - ? toBytes(zkQueueInfo.memoryPerSmallQuery) - : "N/A"; - } + public int largeQueueSize() { + return isEnabled() ? zkQueueInfo.largeQueueSize : 0; + } - public String largeQueueMemory() { - return isEnabled() - ? toBytes(zkQueueInfo.memoryPerLargeQuery) - : "N/A"; - } + public String threshold() { + return isEnabled() + ? Double.toString(zkQueueInfo.queueThreshold) + : "N/A"; + } - public String totalMemory() { - return isEnabled() - ? toBytes(zkQueueInfo.memoryPerNode) - : "N/A"; - } + public String smallQueueMemory() { + return isEnabled() + ? toBytes(zkQueueInfo.memoryPerSmallQuery) + : "N/A"; + } - private final long ONE_MB = 1024 * 1024; + public String largeQueueMemory() { + return isEnabled() + ? toBytes(zkQueueInfo.memoryPerLargeQuery) + : "N/A"; + } - private String toBytes(long memory) { - if (memory < 10 * ONE_MB) { - return String.format("%,d bytes", memory); - } else { - return String.format("%,.0f MB", memory * 1.0D / ONE_MB); - } - } + public String totalMemory() { + return isEnabled() + ? toBytes(zkQueueInfo.memoryPerNode) + : "N/A"; } - @XmlRootElement - public static class ClusterInfo { - private final Collection<DrillbitInfo> drillbits; - private final String currentVersion; - private final Collection<String> mismatchedVersions; - private final boolean userEncryptionEnabled; - private final boolean bitEncryptionEnabled; - private final String adminUsers; - private final String adminUserGroups; - private final String processUser; - private final String processUserGroups; - private final boolean shouldShowUserInfo; - private final QueueInfo queueInfo; - - @JsonCreator - public ClusterInfo(Collection<DrillbitInfo> drillbits, - String currentVersion, - Collection<String> mismatchedVersions, - boolean userEncryption, - boolean bitEncryption, - String processUser, - String processUserGroups, - String adminUsers, - String adminUserGroups, - boolean shouldShowUserInfo, - QueueInfo queueInfo) { - this.drillbits = Sets.newTreeSet(drillbits); - this.currentVersion = currentVersion; - this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions); - this.userEncryptionEnabled = userEncryption; - this.bitEncryptionEnabled = bitEncryption; - this.processUser = processUser; - this.processUserGroups = processUserGroups; - this.adminUsers = adminUsers; - this.adminUserGroups = adminUserGroups; - this.shouldShowUserInfo = shouldShowUserInfo; - this.queueInfo = queueInfo; - } + private final long ONE_MB = 1024 * 1024; - public Collection<DrillbitInfo> getDrillbits() { - return Sets.newTreeSet(drillbits); + private String toBytes(long memory) { + if (memory < 10 * ONE_MB) { + return String.format("%,d bytes", memory); + } else { + return String.format("%,.0f MB", memory * 1.0D / ONE_MB); } + } +} - public String getCurrentVersion() { - return currentVersion; - } +@XmlRootElement +public static class ClusterInfo { + private final Collection<DrillbitInfo> drillbits; + private final String currentVersion; + private final Collection<String> mismatchedVersions; + private final boolean userEncryptionEnabled; + private final boolean bitEncryptionEnabled; + private final String adminUsers; + private final String adminUserGroups; + private final String processUser; + private final String processUserGroups; + private final boolean shouldShowUserInfo; + private final QueueInfo queueInfo; + + @JsonCreator + public ClusterInfo(Collection<DrillbitInfo> drillbits, + String currentVersion, + Collection<String> mismatchedVersions, + boolean userEncryption, + boolean bitEncryption, + String processUser, + String processUserGroups, + String adminUsers, + String adminUserGroups, + boolean shouldShowUserInfo, + QueueInfo queueInfo) { + this.drillbits = Sets.newTreeSet(drillbits); + this.currentVersion = currentVersion; + this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions); + this.userEncryptionEnabled = userEncryption; + this.bitEncryptionEnabled = bitEncryption; + this.processUser = processUser; + this.processUserGroups = processUserGroups; + this.adminUsers = adminUsers; + this.adminUserGroups = adminUserGroups; + this.shouldShowUserInfo = shouldShowUserInfo; + this.queueInfo = queueInfo; + } - public Collection<String> getMismatchedVersions() { - return Sets.newTreeSet(mismatchedVersions); - } + public Collection<DrillbitInfo> getDrillbits() { + return Sets.newTreeSet(drillbits); + } + + public String getCurrentVersion() { + return currentVersion; + } + + public Collection<String> getMismatchedVersions() { + return Sets.newTreeSet(mismatchedVersions); + } + + public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; } - public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; } + public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; } - public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; } + public String getProcessUser() { return processUser; } - public String getProcessUser() { return processUser; } + public String getProcessUserGroups() { return processUserGroups; } - public String getProcessUserGroups() { return processUserGroups; } + public String getAdminUsers() { return adminUsers; } - public String getAdminUsers() { return adminUsers; } + public String getAdminUserGroups() { return adminUserGroups; } - public String getAdminUserGroups() { return adminUserGroups; } + public boolean shouldShowUserInfo() { return shouldShowUserInfo; } - public boolean shouldShowUserInfo() { return shouldShowUserInfo; } + public QueueInfo queueInfo() { return queueInfo; } +} - public QueueInfo queueInfo() { return queueInfo; } +public static class DrillbitInfo implements Comparable<DrillbitInfo> { + private final String address; + private final String userPort; + private final String controlPort; + private final String dataPort; + private final String version; + private final boolean current; + private final boolean versionMatch; + private final String state; + + @JsonCreator + public DrillbitInfo(DrillbitEndpoint drillbit, boolean current, boolean versionMatch) { + this.address = drillbit.getAddress(); + this.userPort = String.valueOf(drillbit.getUserPort()); + this.controlPort = String.valueOf(drillbit.getControlPort()); + this.dataPort = String.valueOf(drillbit.getDataPort()); + this.version = Strings.isNullOrEmpty(drillbit.getVersion()) ? "Undefined" : drillbit.getVersion(); + this.current = current; + this.versionMatch = versionMatch; + this.state = String.valueOf(drillbit.getState()); } - public static class DrillbitInfo implements Comparable<DrillbitInfo> { - private final String address; - private final String userPort; - private final String controlPort; - private final String dataPort; - private final String version; - private final boolean current; - private final boolean versionMatch; - - @JsonCreator - public DrillbitInfo(DrillbitEndpoint drillbit, boolean current, boolean versionMatch) { - this.address = drillbit.getAddress(); - this.userPort = String.valueOf(drillbit.getUserPort()); - this.controlPort = String.valueOf(drillbit.getControlPort()); - this.dataPort = String.valueOf(drillbit.getDataPort()); - this.version = Strings.isNullOrEmpty(drillbit.getVersion()) ? "Undefined" : drillbit.getVersion(); - this.current = current; - this.versionMatch = versionMatch; - } + public String getAddress() { return address; } - public String getAddress() { return address; } + public String getUserPort() { return userPort; } - public String getUserPort() { return userPort; } + public String getControlPort() { return controlPort; } - public String getControlPort() { return controlPort; } + public String getDataPort() { return dataPort; } - public String getDataPort() { return dataPort; } + public String getVersion() { return version; } - public String getVersion() { return version; } + public boolean isCurrent() { return current; } - public boolean isCurrent() { return current; } + public boolean isVersionMatch() { return versionMatch; } - public boolean isVersionMatch() { return versionMatch; } + public String getState() { return state; } - /** - * Method used to sort Drillbits. Current Drillbit goes first. - * Then Drillbits with matching versions, after them Drillbits with mismatching versions. - * Matching Drillbits are sorted according address natural order, - * mismatching Drillbits are sorted according version, address natural order. - * - * @param drillbitToCompare Drillbit to compare against - * @return -1 if Drillbit should be before, 1 if after in list - */ - @Override - public int compareTo(DrillbitInfo drillbitToCompare) { - if (this.isCurrent()) { - return -1; - } + /** + * Method used to sort Drillbits. Current Drillbit goes first. + * Then Drillbits with matching versions, after them Drillbits with mismatching versions. + * Matching Drillbits are sorted according address natural order, + * mismatching Drillbits are sorted according version, address natural order. + * + * @param drillbitToCompare Drillbit to compare against + * @return -1 if Drillbit should be before, 1 if after in list + */ + @Override + public int compareTo(DrillbitInfo drillbitToCompare) { + if (this.isCurrent()) { + return -1; + } - if (drillbitToCompare.isCurrent()) { - return 1; - } + if (drillbitToCompare.isCurrent()) { + return 1; + } - if (this.isVersionMatch() == drillbitToCompare.isVersionMatch()) { - if (this.version.equals(drillbitToCompare.getVersion())) { - return this.address.compareTo(drillbitToCompare.getAddress()); + if (this.isVersionMatch() == drillbitToCompare.isVersionMatch()) { + if (this.version.equals(drillbitToCompare.getVersion())) { + { + if (this.address.equals(drillbitToCompare.getAddress())) { + return (this.controlPort.compareTo(drillbitToCompare.getControlPort())); + } + return (this.address.compareTo(drillbitToCompare.getAddress())); } - return this.version.compareTo(drillbitToCompare.getVersion()); } - return this.versionMatch ? -1 : 1; + return this.version.compareTo(drillbitToCompare.getVersion()); } + return this.versionMatch ? -1 : 1; } } +} http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java index 1ad2a09..f0e822f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java @@ -23,13 +23,14 @@ import com.codahale.metrics.servlets.ThreadDumpServlet; import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ssl.SSLConfig; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.rest.auth.DrillRestLoginService; import org.apache.drill.exec.ssl.SSLConfigBuilder; import org.apache.drill.exec.work.WorkManager; @@ -106,6 +107,8 @@ public class WebServer implements AutoCloseable { private Server embeddedJetty; + private final Drillbit drillbit; + private int port; /** @@ -114,11 +117,12 @@ public class WebServer implements AutoCloseable { * @param context Bootstrap context. * @param workManager WorkManager instance. */ - public WebServer(final BootStrapContext context, final WorkManager workManager) { + public WebServer(final BootStrapContext context, final WorkManager workManager, final Drillbit drillbit) { this.context = context; this.config = context.getConfig(); this.metrics = context.getMetrics(); this.workManager = workManager; + this.drillbit = drillbit; } private static final String BASE_STATIC_PATH = "/rest/static/"; @@ -193,7 +197,7 @@ public class WebServer implements AutoCloseable { servletContextHandler.setErrorHandler(errorHandler); servletContextHandler.setContextPath("/"); - final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(workManager, servletContextHandler.getServletContext()))); + final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(workManager, servletContextHandler.getServletContext(), drillbit))); servletHolder.setInitOrder(1); servletContextHandler.addServlet(servletHolder, "/*"); http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java index 29ae0f6..3efa054 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; import org.apache.drill.exec.rpc.TransportCheck; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.ControllerImpl; @@ -102,6 +103,7 @@ public class ServiceEngine implements AutoCloseable { .setAddress(hostName) .setUserPort(userPort) .setVersion(DrillVersionInfo.getVersion()) + .setState(State.STARTUP) .build(); partialEndpoint = controller.start(partialEndpoint, allowPortHunting); http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java index 836d339..dc4e7c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/DrillbitIterator.java @@ -29,7 +29,7 @@ public class DrillbitIterator implements Iterator<Object> { private DrillbitEndpoint current; public DrillbitIterator(FragmentContext c) { - this.endpoints = c.getDrillbitContext().getBits().iterator(); + this.endpoints = c.getDrillbitContext().getAvailableBits().iterator(); this.current = c.getIdentity(); } @@ -40,6 +40,7 @@ public class DrillbitIterator implements Iterator<Object> { public int data_port; public boolean current; public String version; + public String state; } @Override @@ -51,15 +52,28 @@ public class DrillbitIterator implements Iterator<Object> { public Object next() { DrillbitEndpoint ep = endpoints.next(); DrillbitInstance i = new DrillbitInstance(); - i.current = ep.equals(current); + i.current = isCurrent(ep); i.hostname = ep.getAddress(); i.user_port = ep.getUserPort(); i.control_port = ep.getControlPort(); i.data_port = ep.getDataPort(); i.version = ep.getVersion(); + i.state = ep.getState().toString(); return i; } + public boolean isCurrent(DrillbitEndpoint ep) { + + String epAddress = ep.getAddress(); + int epPort = ep.getUserPort(); + String currentEpAddress = current.getAddress(); + int currentEpPort = current.getUserPort(); + if (currentEpAddress.equals(epAddress) && currentEpPort == epPort) { + return true; + } + return false; + } + @Override public void remove() { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 6e560a9..5d369de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.WorkEventBus; import org.apache.drill.exec.rpc.data.DataConnectionCreator; import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.batch.ControlMessageHandler; @@ -46,6 +47,7 @@ import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentManager; import org.apache.drill.exec.work.user.UserWorker; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -77,6 +79,8 @@ public class WorkManager implements AutoCloseable { private final WorkEventBus workBus; private final Executor executor; private final StatusThread statusThread; + private long numOfRunningQueries; + private long numOfRunningFragments; /** * How often the StatusThread collects statistics about running fragments. @@ -165,32 +169,57 @@ public class WorkManager implements AutoCloseable { * * <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p> */ - public void waitToExit() { + public void waitToExit(Drillbit bit, boolean forcefulShutdown) { synchronized(this) { - if (queries.isEmpty() && runningFragments.isEmpty()) { + numOfRunningQueries = queries.size(); + numOfRunningFragments = runningFragments.size(); + if ( queries.isEmpty() && runningFragments.isEmpty()) { return; } - + logger.info("Draining " + queries +" queries and "+ runningFragments+" fragments."); exitLatch = new ExtendedLatch(); } - - // Wait for at most 5 seconds or until the latch is released. - exitLatch.awaitUninterruptibly(5000); + // Wait uninterruptibly until all the queries and running fragments on that drillbit goes down + // to zero + if( forcefulShutdown ) { + exitLatch.awaitUninterruptibly(5000); + } else { + exitLatch.awaitUninterruptibly(); + } } /** * If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will - * unblock. + * unblock. Logs the number of pending fragments and queries that are running on that + * drillbit to track the progress of shutdown process. */ private void indicateIfSafeToExit() { synchronized(this) { if (exitLatch != null) { + logger.info("Waiting for "+ queries.size() +" queries to complete before shutting down"); + logger.info("Waiting for "+ runningFragments.size() +" running fragments to complete before shutting down"); + if(runningFragments.size() > numOfRunningFragments|| queries.size() > numOfRunningQueries) { + logger.info("New Fragments or queries are added while drillbit is Shutting down"); + } if (queries.isEmpty() && runningFragments.isEmpty()) { + // Both Queries and Running fragments are empty. + // So its safe for the drillbit to exit. exitLatch.countDown(); } } } } + /** + * Get the number of queries that are running on a drillbit. + * Primarily used to monitor the number of running queries after a + * shutdown request is triggered. + */ + public synchronized Map<String, Integer> getRemainingQueries() { + Map<String, Integer> queriesInfo = new HashMap<String, Integer>(); + queriesInfo.put("queriesCount", queries.size()); + queriesInfo.put("fragmentsCount", runningFragments.size()); + return queriesInfo; + } /** * Narrowed interface to WorkManager that is made available to tasks it is managing. http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index a1d150e..8ce8639 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -253,7 +253,17 @@ public class Foreman implements Runnable { final Thread currentThread = Thread.currentThread(); final String originalName = currentThread.getName(); currentThread.setName(queryIdString + ":foreman"); - + try { + /* + Check if the foreman is ONLINE. If not dont accept any new queries. + */ + if (!drillbitContext.isForemanOnline()) { + throw new ForemanException("Query submission failed since Foreman is shutting down."); + } + } catch (ForemanException e) { + logger.debug("Failure while submitting query", e); + addToEventQueue(QueryState.FAILED, e); + } // track how long the query takes queryManager.markStartTime(); enqueuedQueries.dec(); @@ -559,7 +569,7 @@ public class Foreman implements Runnable { final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); return parallelizer.getFragments( queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), - queryId, queryContext.getActiveEndpoints(), rootFragment, + queryId, queryContext.getOnlineEndpoints(), rootFragment, initiatingClient.getSession(), queryContext.getQueryContextInfo()); } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index f5e85a3..c923e4f 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -369,6 +369,16 @@ drill.exec: { // planning and managing queries. Primarily for testing. cpus_per_node: 0, } + # Grace period is the amount of time where the drillbit accepts work after + # the shutdown request is triggered. The primary use of grace period is to + # avoid the race conditions caused by zookeeper delay in updating the state + # information of the drillbit that is shutting down. So, it is advisable + # to have a grace period that is atleast twice the amount of zookeeper + # refresh time. + grace_period_ms : 0, + //port hunting for drillbits. Enabled only for testing purposes. + port_hunt : false + } drill.jdbc: { http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/main/resources/rest/index.ftl ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/rest/index.ftl b/exec/java-exec/src/main/resources/rest/index.ftl index 45dc1c9..74425d6 100644 --- a/exec/java-exec/src/main/resources/rest/index.ftl +++ b/exec/java-exec/src/main/resources/rest/index.ftl @@ -46,7 +46,7 @@ <div class="row"> <div class="col-md-12"> - <h3>Drillbits <span class="label label-primary">${model.getDrillbits()?size}</span></h3> + <h3>Drillbits <span class="label label-primary" id="size" >${model.getDrillbits()?size}</span></h3> <div class="table-responsive"> <table class="table table-hover"> <thead> @@ -57,19 +57,19 @@ <th>Control Port</th> <th>Data Port</th> <th>Version</th> + <th>Status</th> </tr> </thead> <tbody> <#assign i = 1> <#list model.getDrillbits() as drillbit> - <tr> + <tr id="row-${i}"> <td>${i}</td> - <td>${drillbit.getAddress()} - <#if drillbit.isCurrent()> + <td id="address" >${drillbit.getAddress()}<#if drillbit.isCurrent()> <span class="label label-info">Current</span> </#if> </td> - <td>${drillbit.getUserPort()}</td> + <td id="port" >${drillbit.getUserPort()}</td> <td>${drillbit.getControlPort()}</td> <td>${drillbit.getDataPort()}</td> <td> @@ -78,6 +78,11 @@ ${drillbit.getVersion()} </span> </td> + <td id="status" >${drillbit.getState()}</td> + <td> + <button type="button" id="shutdown" onClick="shutdown('${drillbit.getAddress()}',$(this));"> SHUTDOWN </button> + </td> + <td id="queriesCount"> </td> </tr> <#assign i = i + 1> </#list> @@ -179,6 +184,101 @@ </div> </div> </div> + <script charset="utf-8"> + var refreshTime = 2000; + var refresh = getRefreshTime(); + var portNum = 0; + var port = getPortNum(); + console.log(portNum); + var timeout; + var size = $("#size").html(); + + + function getPortNum() { + var port = $.ajax({ + type: 'GET', + url: '/portNum', + dataType: "json", + complete: function(data) { + portNum = data.responseJSON["port"]; + } + }); + } + + function getRefreshTime() { + var refresh = $.ajax({ + type: 'GET', + url: '/gracePeriod', + dataType: "json", + complete: function(data) { + refreshTime = data.responseJSON["graceperiod"]; + refreshTime = refreshTime/3; + timeout = setTimeout(reloadStatus,refreshTime ); + } + }); + } + function reloadStatus () { + console.log(refreshTime); + var result = $.ajax({ + type: 'GET', + url: '/state', + dataType: "json", + complete: function(data) { + fillStatus(data,size); + } + }); + timeout = setTimeout(reloadStatus, refreshTime); + } + + function fillStatus(data,size) { + var status_map = (data.responseJSON); + for (i = 1; i <= size; i++) { + var address = $("#row-"+i).find("#address").contents().get(0).nodeValue; + address = address.trim(); + var port = $("#row-"+i).find("#port").html(); + var key = address+"-"+port; + + if (status_map[key] == null) { + $("#row-"+i).find("#status").text("OFFLINE"); + $("#row-"+i).find("#shutdown").prop('disabled',true).css('opacity',0.5); + $("#row-"+i).find("#queriesCount").text(""); + } + else { + if( status_map[key] == "ONLINE") { + $("#row-"+i).find("#status").text(status_map[key]); + } + else { + fillQueryCount(address,i); + $("#row-"+i).find("#status").text(status_map[key]); + } + } + } + } + function fillQueryCount(address,row_id) { + url = "http://"+address+":"+portNum+"/queriesCount"; + var result = $.ajax({ + type: 'GET', + url: url, + complete: function(data) { + queries = data.responseJSON["queriesCount"]; + fragments = data.responseJSON["fragmentsCount"]; + $("#row-"+row_id).find("#queriesCount").text(queries+" queries and "+fragments+" fragments remaining before shutting down"); + } + }); + } + function shutdown(address,button) { + url = "http://"+address+":"+portNum+"/gracefulShutdown"; + var result = $.ajax({ + type: 'POST', + url: url, + contentType : 'text/plain', + complete: function(data) { + alert(data.responseJSON["response"]); + button.prop('disabled',true).css('opacity',0.5); + } + }); + } + </script> </#macro> <@page_html/> http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java index c30cb09..37aa1db 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/metadata/TestMetadataProvider.java @@ -248,7 +248,7 @@ public class TestMetadataProvider extends BaseTestQuery { assertEquals(RequestStatus.OK, resp.getStatus()); List<ColumnMetadata> columns = resp.getColumnsList(); - assertEquals(92, columns.size()); + assertEquals(93, columns.size()); // too many records to verify the output. } http://git-wip-us.apache.org/repos/asf/drill/blob/5f044f2a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index f0653f7..a4d62d4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -404,6 +404,25 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { } /** + * Shutdown the drillbit given the name of the drillbit. + */ + public void closeDrillbit(final String drillbitName) throws Exception { + Exception ex = null; + for (Drillbit bit : drillbits()) { + if (bit.equals(bits.get(drillbitName))) { + try { + bit.close(); + } catch (Exception e) { + ex = ex == null ? e :ex; + } + } + } + if (ex != null) { + throw ex; + } + } + + /** * Close a resource, suppressing the exception, and keeping * only the first exception that may occur. We assume that only * the first is useful, any others are probably down-stream effects