http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index 5a629ad..1dda41c 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -1,19 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.daemon.supervisor; @@ -72,212 +66,85 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback StormMetricsRegistry.registerMeter("supervisor:num-workers-killed-hb-null"); private static final Meter numForceKill = StormMetricsRegistry.registerMeter("supervisor:num-workers-force-kill"); + private static final long ONE_SEC_IN_NANO = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); + private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>(); + private final AtomicReference<Set<TopoProfileAction>> profiling = new AtomicReference<>(new HashSet<>()); - static enum MachineState { - EMPTY, - RUNNING, - WAITING_FOR_WORKER_START, - KILL_AND_RELAUNCH, - KILL, - KILL_BLOB_UPDATE, - WAITING_FOR_BLOB_LOCALIZATION, - WAITING_FOR_BLOB_UPDATE; - } - - static class StaticState { - public final AsyncLocalizer localizer; - public final long hbTimeoutMs; - public final long firstHbTimeoutMs; - public final long killSleepMs; - public final long monitorFreqMs; - public final ContainerLauncher containerLauncher; - public final int port; - public final String host; - public final ISupervisor iSupervisor; - public final LocalState localState; - public final BlobChangingCallback changingCallback; - public final OnlyLatestExecutor<Integer> metricsExec; - public final WorkerMetricsProcessor metricsProcessor; - - StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs, - long killSleepMs, long monitorFreqMs, - ContainerLauncher containerLauncher, String host, int port, - ISupervisor iSupervisor, LocalState localState, - BlobChangingCallback changingCallback, - OnlyLatestExecutor<Integer> metricsExec, - WorkerMetricsProcessor metricsProcessor) { - this.localizer = localizer; - this.hbTimeoutMs = hbTimeoutMs; - this.firstHbTimeoutMs = firstHbTimeoutMs; - this.containerLauncher = containerLauncher; - this.killSleepMs = killSleepMs; - this.monitorFreqMs = monitorFreqMs; - this.host = host; - this.port = port; - this.iSupervisor = iSupervisor; - this.localState = localState; - this.changingCallback = changingCallback; - this.metricsExec = metricsExec; - this.metricsProcessor = metricsProcessor; - } - } + ; + private final BlockingQueue<BlobChanging> changingBlobs = new LinkedBlockingQueue<>(); + private final StaticState staticState; + private final IStormClusterState clusterState; + private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments; + private final OnlyLatestExecutor<Integer> metricsExec; + private volatile boolean done = false; + private volatile DynamicState dynamicState; - static class DynamicState { - public final MachineState state; - public final LocalAssignment newAssignment; - public final LocalAssignment currentAssignment; - public final Container container; - public final LocalAssignment pendingLocalization; - public final Future<Void> pendingDownload; - public final Set<TopoProfileAction> profileActions; - public final Set<TopoProfileAction> pendingStopProfileActions; - public final Set<BlobChanging> changingBlobs; - public final LocalAssignment pendingChangingBlobsAssignment; - public final Set<Future<Void>> pendingChangingBlobs; - - /** - * The last time that WAITING_FOR_WORKER_START, KILL, or KILL_AND_RELAUNCH were entered into. - */ - public final long startTime; - - public DynamicState(final LocalAssignment currentAssignment, Container container, final LocalAssignment newAssignment) { - this.currentAssignment = currentAssignment; - this.container = container; - if ((currentAssignment == null) ^ (container == null)) { - throw new IllegalArgumentException("Container and current assignment must both be null, or neither can be null"); - } - - if (currentAssignment == null) { - state = MachineState.EMPTY; - } else { - state = MachineState.RUNNING; - } - - this.startTime = System.currentTimeMillis(); - this.newAssignment = newAssignment; - this.pendingLocalization = null; - this.pendingDownload = null; - this.profileActions = Collections.emptySet(); - this.pendingStopProfileActions = Collections.emptySet(); - this.changingBlobs = Collections.emptySet(); - this.pendingChangingBlobsAssignment = null; - this.pendingChangingBlobs = Collections.emptySet(); - } + public Slot(AsyncLocalizer localizer, Map<String, Object> conf, + ContainerLauncher containerLauncher, String host, + int port, LocalState localState, + IStormClusterState clusterState, + ISupervisor iSupervisor, + AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments, + OnlyLatestExecutor<Integer> metricsExec, + WorkerMetricsProcessor metricsProcessor) throws Exception { + super("SLOT_" + port); + this.metricsExec = metricsExec; - public DynamicState(final MachineState state, final LocalAssignment newAssignment, - final Container container, final LocalAssignment currentAssignment, - final LocalAssignment pendingLocalization, final long startTime, - final Future<Void> pendingDownload, final Set<TopoProfileAction> profileActions, - final Set<TopoProfileAction> pendingStopProfileActions, - final Set<BlobChanging> changingBlobs, - final Set<Future<Void>> pendingChangingBlobs, final LocalAssignment pendingChaningBlobsAssignment) { - assert pendingChangingBlobs != null; - assert !(pendingChangingBlobs.isEmpty() ^ (pendingChaningBlobsAssignment == null)); - this.state = state; - this.newAssignment = newAssignment; - this.currentAssignment = currentAssignment; - this.container = container; - this.pendingLocalization = pendingLocalization; - this.startTime = startTime; - this.pendingDownload = pendingDownload; - this.profileActions = profileActions; - this.pendingStopProfileActions = pendingStopProfileActions; - this.changingBlobs = changingBlobs; - this.pendingChangingBlobs = pendingChangingBlobs; - this.pendingChangingBlobsAssignment = pendingChaningBlobsAssignment; + this.cachedCurrentAssignments = cachedCurrentAssignments; + this.clusterState = clusterState; + Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap(); + LocalAssignment currentAssignment = null; + if (assignments != null) { + currentAssignment = assignments.get(port); } - - public String toString() { - StringBuffer sb = new StringBuffer(); - sb.append(state); - sb.append(" msInState: "); - sb.append(Time.currentTimeMillis() - startTime); - if (container != null) { - sb.append(" "); - sb.append(container); + Container container = null; + if (currentAssignment != null) { + try { + // For now we do not make a transaction when removing a topology assignment from local, an overdue + // assignment may be left on local disk. + // So we should check if the local disk assignment is valid when initializing: + // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out; + // if topology files exist but the topology id is invalid, just let Supervisor make a sync; + // if topology files exist and topology files is valid, recover the container. + if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) { + container = containerLauncher.recoverContainer(port, currentAssignment, localState); + } else { + // Make the assignment null to let slot clean up the disk assignment. + currentAssignment = null; + } + } catch (ContainerRecoveryException e) { + //We could not recover container will be null. } - return sb.toString(); - } - - /** - * Set the new assignment for the state. This should never be called from within the state machine. - * It is an input from outside. - * @param newAssignment the new assignment to set - * @return the updated DynamicState. - */ - public DynamicState withNewAssignment(LocalAssignment newAssignment) { - return new DynamicState(this.state, newAssignment, - this.container, this.currentAssignment, - this.pendingLocalization, this.startTime, - this.pendingDownload, this.profileActions, - this.pendingStopProfileActions, this.changingBlobs, - this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); - } - - public DynamicState withPendingLocalization(LocalAssignment pendingLocalization, Future<Void> pendingDownload) { - return new DynamicState(this.state, this.newAssignment, - this.container, this.currentAssignment, - pendingLocalization, this.startTime, - pendingDownload, this.profileActions, - this.pendingStopProfileActions, this.changingBlobs, - this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); - } - - public DynamicState withPendingLocalization(Future<Void> pendingDownload) { - return withPendingLocalization(this.pendingLocalization, pendingDownload); - } - - public DynamicState withState(final MachineState state) { - long newStartTime = Time.currentTimeMillis(); - return new DynamicState(state, this.newAssignment, - this.container, this.currentAssignment, - this.pendingLocalization, newStartTime, - this.pendingDownload, this.profileActions, - this.pendingStopProfileActions, this.changingBlobs, - this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); - } - - public DynamicState withCurrentAssignment(final Container container, final LocalAssignment currentAssignment) { - return new DynamicState(this.state, this.newAssignment, - container, currentAssignment, - this.pendingLocalization, this.startTime, - this.pendingDownload, this.profileActions, - this.pendingStopProfileActions, this.changingBlobs, - this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); - } - - public DynamicState withProfileActions(Set<TopoProfileAction> profileActions, Set<TopoProfileAction> pendingStopProfileActions) { - return new DynamicState(this.state, this.newAssignment, - this.container, this.currentAssignment, - this.pendingLocalization, this.startTime, - this.pendingDownload, profileActions, - pendingStopProfileActions, this.changingBlobs, - this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); } - public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) { - if (changingBlobs == this.changingBlobs) { - return this; - } - return new DynamicState(this.state, this.newAssignment, - this.container, this.currentAssignment, - this.pendingLocalization, this.startTime, - this.pendingDownload, profileActions, - this.pendingStopProfileActions, changingBlobs, - this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); + LocalAssignment newAssignment = currentAssignment; + if (currentAssignment != null && container == null) { + currentAssignment = null; + //Assigned something but it is not running } - public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBlobs, - LocalAssignment pendingChangingBlobsAssignment) { - return new DynamicState(this.state, this.newAssignment, - this.container, this.currentAssignment, - this.pendingLocalization, this.startTime, - this.pendingDownload, profileActions, - this.pendingStopProfileActions, this.changingBlobs, - pendingChangingBlobs, - pendingChangingBlobsAssignment); + dynamicState = new DynamicState(currentAssignment, container, newAssignment); + staticState = new StaticState(localizer, + ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000, + ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000, + ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000, + ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000, + containerLauncher, + host, + port, + iSupervisor, + localState, + this, + metricsExec, metricsProcessor); + this.newAssignment.set(dynamicState.newAssignment); + if (MachineState.RUNNING == dynamicState.state) { + //We are running so we should recover the blobs. + staticState.localizer.recoverRunningTopology(currentAssignment, port, this); + saveNewAssignment(currentAssignment); } - }; + LOG.info("SLOT {}:{} Starting in state {} - assignment {}", staticState.host, staticState.port, dynamicState.state, + dynamicState.currentAssignment); + } //In some cases the new LocalAssignment may be equivalent to the old, but // It is not equal. In those cases we want to update the current assignment to @@ -291,55 +158,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback return dynamicState; } - static class TopoProfileAction { - public final String topoId; - public final ProfileRequest request; - - public TopoProfileAction(String topoId, ProfileRequest request) { - this.topoId = topoId; - this.request = request; - } - - @Override - public int hashCode() { - return (37 * topoId.hashCode()) + request.hashCode(); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof TopoProfileAction)) { - return false; - } - TopoProfileAction o = (TopoProfileAction) other; - return topoId.equals(o.topoId) && request.equals(o.request); - } - - @Override - public String toString() { - return "{ " + topoId + ": " + request + " }"; - } - } - - /** - * Holds the information about a blob that is changing. - */ - static class BlobChanging { - private final LocalAssignment assignment; - private final LocallyCachedBlob blob; - private final GoodToGo.GoodToGoLatch latch; - - public BlobChanging(LocalAssignment assignment, LocallyCachedBlob blob, GoodToGo.GoodToGoLatch latch) { - this.assignment = assignment; - this.blob = blob; - this.latch = latch; - } - - @Override - public String toString() { - return "BLOB CHANGING " + blob + " " + assignment; - } - } - @VisibleForTesting static boolean forSameTopology(LocalAssignment a, LocalAssignment b) { if (a == null && b == null) { @@ -378,7 +196,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } return false; } - + static DynamicState stateMachineStep(DynamicState dynamicState, StaticState staticState) throws Exception { LOG.debug("STATE {}", dynamicState.state); switch (dynamicState.state) { @@ -399,10 +217,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback case WAITING_FOR_BLOB_UPDATE: return handleWaitingForBlobUpdate(dynamicState, staticState); default: - throw new IllegalStateException("Code not ready to handle a state of "+dynamicState.state); + throw new IllegalStateException("Code not ready to handle a state of " + dynamicState.state); } } - + /** * Prepare for a new assignment by downloading new required blobs, or going to empty if there is nothing to download. * PRECONDITION: The slot should be empty @@ -412,34 +230,34 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @throws IOException on any error */ static DynamicState prepareForNewAssignmentNoWorkersRunning(DynamicState dynamicState, StaticState staticState) throws IOException { - assert(dynamicState.container == null); - + assert (dynamicState.container == null); + if (dynamicState.newAssignment == null) { return dynamicState.withState(MachineState.EMPTY); } Future<Void> pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment, - staticState.port, staticState.changingCallback); + staticState.port, staticState.changingCallback); return dynamicState.withPendingLocalization(dynamicState.newAssignment, pendingDownload) - .withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION); + .withState(MachineState.WAITING_FOR_BLOB_LOCALIZATION); } - + /** * Kill the current container and start downloading what the new assignment needs, if there is a new assignment. * PRECONDITION: container != null * @param dynamicState current state * @param staticState static data * @return the next state - * @throws Exception + * @throws Exception */ static DynamicState killContainerForChangedAssignment(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); + assert (dynamicState.container != null); staticState.iSupervisor.killedWorker(staticState.port); dynamicState.container.kill(); Future<Void> pendingDownload = null; if (dynamicState.newAssignment != null) { pendingDownload = staticState.localizer.requestDownloadTopologyBlobs(dynamicState.newAssignment, staticState.port, - staticState.changingCallback); + staticState.changingCallback); } dynamicState = drainAllChangingBlobs(dynamicState); Time.sleep(staticState.killSleepMs); @@ -454,7 +272,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @return the next state */ private static DynamicState killContainerForChangedBlobs(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); + assert (dynamicState.container != null); staticState.iSupervisor.killedWorker(staticState.port); dynamicState.container.kill(); @@ -469,20 +287,20 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @param dynamicState current state * @param staticState static data * @return the next state - * @throws Exception + * @throws Exception */ static DynamicState killAndRelaunchContainer(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); - + assert (dynamicState.container != null); + dynamicState.container.kill(); Time.sleep(staticState.killSleepMs); - + //any stop profile actions that hadn't timed out yet, we should restart after the worker is running again. HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions); mod.addAll(dynamicState.pendingStopProfileActions); return dynamicState.withState(MachineState.KILL_AND_RELAUNCH).withProfileActions(mod, Collections.emptySet()); } - + /** * Clean up a container. * PRECONDITION: All of the processes have died. @@ -491,11 +309,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @param nextState the next MachineState to go to. * @return the next state. */ - static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws Exception { - assert(dynamicState.container != null); - assert(dynamicState.currentAssignment != null); - assert(dynamicState.container.areAllProcessesDead()); - + static DynamicState cleanupCurrentContainer(DynamicState dynamicState, StaticState staticState, MachineState nextState) throws + Exception { + assert (dynamicState.container != null); + assert (dynamicState.currentAssignment != null); + assert (dynamicState.container.areAllProcessesDead()); + dynamicState.container.cleanUp(); staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port); DynamicState ret = dynamicState.withCurrentAssignment(null, null); @@ -544,13 +363,13 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } //Otherwise they will just be replaced - for (BlobChanging rc: dynamicState.changingBlobs) { + for (BlobChanging rc : dynamicState.changingBlobs) { futures.add(rc.latch.countDown()); } LOG.debug("found changing blobs {} moving them to pending...", dynamicState.changingBlobs); return dynamicState.withChangingBlobs(Collections.emptySet()) - .withPendingChangingBlobs(futures, assignment); + .withPendingChangingBlobs(futures, assignment); } /** @@ -567,7 +386,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } HashSet<BlobChanging> savedBlobs = new HashSet<>(dynamicState.changingBlobs.size()); - for (BlobChanging rc: dynamicState.changingBlobs) { + for (BlobChanging rc : dynamicState.changingBlobs) { if (forSameTopology(assignment, rc.assignment)) { savedBlobs.add(rc); } else { @@ -587,10 +406,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @throws Exception on any error */ static DynamicState handleWaitingForBlobLocalization(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.pendingLocalization != null); - assert(dynamicState.pendingDownload != null); - assert(dynamicState.container == null); - + assert (dynamicState.pendingLocalization != null); + assert (dynamicState.pendingDownload != null); + assert (dynamicState.container == null); + //Ignore changes to scheduling while downloading the topology blobs // We don't support canceling the download through the future yet, // so to keep everything in sync, just wait @@ -608,19 +427,20 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback //Scheduling changed staticState.localizer.releaseSlotFor(dynamicState.pendingLocalization, staticState.port); return prepareForNewAssignmentNoWorkersRunning(dynamicState.withPendingChangingBlobs(Collections.emptySet(), null), - staticState); + staticState); } if (!dynamicState.pendingChangingBlobs.isEmpty()) { LOG.info("There are pending changes, waiting for them to finish before launching container..."); //We cannot launch the container yet the resources may still be updating return dynamicState.withState(MachineState.WAITING_FOR_BLOB_UPDATE) - .withPendingLocalization(null, null); + .withPendingLocalization(null, null); } dynamicState = updateAssignmentIfNeeded(dynamicState); numWorkersLaunched.mark(); - Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState); + Container c = + staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingLocalization, staticState.localState); return dynamicState .withCurrentAssignment(c, dynamicState.pendingLocalization).withState(MachineState.WAITING_FOR_WORKER_START) .withPendingLocalization(null, null); @@ -643,8 +463,6 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } } - private static final long ONE_SEC_IN_NANO = TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS); - /** * State Transitions for WAITING_FOR_BLOB_UPDATE state. * @@ -667,14 +485,14 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback //We were rescheduled while waiting for the resources to be updated, // but the container is already not running. LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, - dynamicState.currentAssignment, dynamicState.newAssignment); + dynamicState.currentAssignment, dynamicState.newAssignment); if (dynamicState.currentAssignment != null) { staticState.localizer.releaseSlotFor(dynamicState.currentAssignment, staticState.port); } staticState.localizer.releaseSlotFor(dynamicState.pendingChangingBlobsAssignment, staticState.port); return prepareForNewAssignmentNoWorkersRunning(dynamicState.withCurrentAssignment(null, null) - .withPendingChangingBlobs(Collections.emptySet(), null), - staticState); + .withPendingChangingBlobs(Collections.emptySet(), null), + staticState); } dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.pendingChangingBlobsAssignment); @@ -685,7 +503,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback //We only have a set amount of time we can wait for before looping around again long start = Time.nanoTime(); try { - for (Future<Void> pending: dynamicState.pendingChangingBlobs) { + for (Future<Void> pending : dynamicState.pendingChangingBlobs) { long now = Time.nanoTime(); long timeLeft = ONE_SEC_IN_NANO - (now - start); if (timeLeft <= 0) { @@ -695,7 +513,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } //All done we can launch the worker now Container c = staticState.containerLauncher.launchContainer(staticState.port, dynamicState.pendingChangingBlobsAssignment, - staticState.localState); + staticState.localState); return dynamicState .withCurrentAssignment(c, dynamicState.pendingChangingBlobsAssignment).withState(MachineState.WAITING_FOR_WORKER_START) .withPendingChangingBlobs(Collections.emptySet(), null); @@ -714,13 +532,14 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @throws Exception on any error */ static DynamicState handleKill(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); - assert(dynamicState.currentAssignment != null); + assert (dynamicState.container != null); + assert (dynamicState.currentAssignment != null); if (dynamicState.container.areAllProcessesDead()) { LOG.info("SLOT {} all processes are dead...", staticState.port); return cleanupCurrentContainer(dynamicState, staticState, - dynamicState.pendingLocalization == null ? MachineState.EMPTY : MachineState.WAITING_FOR_BLOB_LOCALIZATION); + dynamicState.pendingLocalization == + null ? MachineState.EMPTY : MachineState.WAITING_FOR_BLOB_LOCALIZATION); } LOG.warn("SLOT {} force kill and wait...", staticState.port); @@ -740,9 +559,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @throws Exception on any error */ static DynamicState handleKillAndRelaunch(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); - assert(dynamicState.currentAssignment != null); - + assert (dynamicState.container != null); + assert (dynamicState.currentAssignment != null); + if (dynamicState.container.areAllProcessesDead()) { if (equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { dynamicState.container.cleanUpForRestart(); @@ -773,8 +592,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @throws Exception on any error */ private static DynamicState handleKillBlobUpdate(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); - assert(dynamicState.currentAssignment != null); + assert (dynamicState.container != null); + assert (dynamicState.currentAssignment != null); //Release things that don't need to wait for us dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment); @@ -807,9 +626,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @throws Exception on any error */ static DynamicState handleWaitingForWorkerStart(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); - assert(dynamicState.currentAssignment != null); - + assert (dynamicState.container != null); + assert (dynamicState.currentAssignment != null); + LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat(); if (hb != null) { long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; @@ -817,18 +636,19 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback return dynamicState.withState(MachineState.RUNNING); } } - + if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { //We were rescheduled while waiting for the worker to come up LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment, - dynamicState.newAssignment); + dynamicState.newAssignment); return killContainerForChangedAssignment(dynamicState, staticState); } dynamicState = updateAssignmentIfNeeded(dynamicState); long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime); if (timeDiffms > staticState.firstHbTimeoutMs) { - LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container, staticState.firstHbTimeoutMs); + LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container, + staticState.firstHbTimeoutMs); return killAndRelaunchContainer(dynamicState, staticState); } @@ -850,12 +670,12 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback * @throws Exception on any error */ static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception { - assert(dynamicState.container != null); - assert(dynamicState.currentAssignment != null); - + assert (dynamicState.container != null); + assert (dynamicState.currentAssignment != null); + if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment, - dynamicState.newAssignment); + dynamicState.newAssignment); //Scheduling changed while running... return killContainerForChangedAssignment(dynamicState, staticState); } @@ -878,7 +698,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback LOG.warn("SLOT {}: violated memory limits", staticState.port); return killAndRelaunchContainer(dynamicState, staticState); } - + LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat(); if (hb == null) { numWorkersKilledHBNull.mark(); @@ -887,14 +707,14 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback // worker that never came up. return killAndRelaunchContainer(dynamicState, staticState); } - + long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; if (timeDiffMs > staticState.hbTimeoutMs) { numWorkersKilledHBTimeout.mark(); LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, staticState.hbTimeoutMs); return killAndRelaunchContainer(dynamicState, staticState); } - + //The worker is up and running check for profiling requests if (!dynamicState.profileActions.isEmpty()) { HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions); @@ -943,114 +763,37 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } dynamicState = dynamicState.withProfileActions(mod, modPending); } - - dynamicState.container.processMetrics(staticState.metricsExec, staticState.metricsProcessor); - - Time.sleep(staticState.monitorFreqMs); - return dynamicState; - } - - static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticState) throws InterruptedException, IOException { - assert dynamicState.changingBlobs.isEmpty(); - assert dynamicState.pendingChangingBlobsAssignment == null; - if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { - return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState); - } - dynamicState = updateAssignmentIfNeeded(dynamicState); - - //Both assignments are null, just wait - if (dynamicState.profileActions != null && !dynamicState.profileActions.isEmpty()) { - //Nothing is scheduled here so throw away all of the profileActions - LOG.warn("Dropping {} no topology is running", dynamicState.profileActions); - dynamicState = dynamicState.withProfileActions(Collections.emptySet(), Collections.emptySet()); - } - //Drop the change notifications we are not running anything right now - dynamicState = drainAllChangingBlobs(dynamicState); - Time.sleep(1000); - return dynamicState; - } - - private final AtomicReference<LocalAssignment> newAssignment = new AtomicReference<>(); - private final AtomicReference<Set<TopoProfileAction>> profiling = new AtomicReference<>(new HashSet<>()); - private final BlockingQueue<BlobChanging> changingBlobs = new LinkedBlockingQueue<>(); - private final StaticState staticState; - private final IStormClusterState clusterState; - private volatile boolean done = false; - private volatile DynamicState dynamicState; - private final AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments; - private final OnlyLatestExecutor<Integer> metricsExec; - - public Slot(AsyncLocalizer localizer, Map<String, Object> conf, - ContainerLauncher containerLauncher, String host, - int port, LocalState localState, - IStormClusterState clusterState, - ISupervisor iSupervisor, - AtomicReference<Map<Long, LocalAssignment>> cachedCurrentAssignments, - OnlyLatestExecutor<Integer> metricsExec, - WorkerMetricsProcessor metricsProcessor) throws Exception { - super("SLOT_"+port); - this.metricsExec = metricsExec; - - this.cachedCurrentAssignments = cachedCurrentAssignments; - this.clusterState = clusterState; - Map<Integer, LocalAssignment> assignments = localState.getLocalAssignmentsMap(); - LocalAssignment currentAssignment = null; - if (assignments != null) { - currentAssignment = assignments.get(port); - } - Container container = null; - if (currentAssignment != null) { - try { - // For now we do not make a transaction when removing a topology assignment from local, an overdue - // assignment may be left on local disk. - // So we should check if the local disk assignment is valid when initializing: - // if topology files does not exist, the worker[possibly alive] will be reassigned if it is timed-out; - // if topology files exist but the topology id is invalid, just let Supervisor make a sync; - // if topology files exist and topology files is valid, recover the container. - if (ClientSupervisorUtils.doRequiredTopoFilesExist(conf, currentAssignment.get_topology_id())) { - container = containerLauncher.recoverContainer(port, currentAssignment, localState); - } else { - // Make the assignment null to let slot clean up the disk assignment. - currentAssignment = null; - } - } catch (ContainerRecoveryException e) { - //We could not recover container will be null. - } - } - - LocalAssignment newAssignment = currentAssignment; - if (currentAssignment != null && container == null) { - currentAssignment = null; - //Assigned something but it is not running + + dynamicState.container.processMetrics(staticState.metricsExec, staticState.metricsProcessor); + + Time.sleep(staticState.monitorFreqMs); + return dynamicState; + } + + static DynamicState handleEmpty(DynamicState dynamicState, StaticState staticState) throws InterruptedException, IOException { + assert dynamicState.changingBlobs.isEmpty(); + assert dynamicState.pendingChangingBlobsAssignment == null; + if (!equivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) { + return prepareForNewAssignmentNoWorkersRunning(dynamicState, staticState); } - - dynamicState = new DynamicState(currentAssignment, container, newAssignment); - staticState = new StaticState(localizer, - ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)) * 1000, - ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_START_TIMEOUT_SECS)) * 1000, - ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS)) * 1000, - ObjectReader.getInt(conf.get(DaemonConfig.SUPERVISOR_MONITOR_FREQUENCY_SECS)) * 1000, - containerLauncher, - host, - port, - iSupervisor, - localState, - this, - metricsExec, metricsProcessor); - this.newAssignment.set(dynamicState.newAssignment); - if (MachineState.RUNNING == dynamicState.state) { - //We are running so we should recover the blobs. - staticState.localizer.recoverRunningTopology(currentAssignment, port, this); - saveNewAssignment(currentAssignment); + dynamicState = updateAssignmentIfNeeded(dynamicState); + + //Both assignments are null, just wait + if (dynamicState.profileActions != null && !dynamicState.profileActions.isEmpty()) { + //Nothing is scheduled here so throw away all of the profileActions + LOG.warn("Dropping {} no topology is running", dynamicState.profileActions); + dynamicState = dynamicState.withProfileActions(Collections.emptySet(), Collections.emptySet()); } - LOG.info("SLOT {}:{} Starting in state {} - assignment {}", staticState.host, staticState.port, dynamicState.state, - dynamicState.currentAssignment); + //Drop the change notifications we are not running anything right now + dynamicState = drainAllChangingBlobs(dynamicState); + Time.sleep(1000); + return dynamicState; } - + public MachineState getMachineState() { return dynamicState.state; } - + /** * Set a new assignment asynchronously. * @param newAssignment the new assignment for this slot to run, null to run nothing @@ -1072,7 +815,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback public void addProfilerActions(Set<TopoProfileAction> actions) { if (actions != null) { - while(true) { + while (true) { Set<TopoProfileAction> orig = profiling.get(); Set<TopoProfileAction> newActions = new HashSet<>(orig); newActions.addAll(actions); @@ -1082,7 +825,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } } } - + public String getWorkerId() { String workerId = null; Container c = dynamicState.container; @@ -1091,9 +834,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } return workerId; } - + private void saveNewAssignment(LocalAssignment assignment) { - synchronized(staticState.localState) { + synchronized (staticState.localState) { Map<Integer, LocalAssignment> assignments = staticState.localState.getLocalAssignmentsMap(); if (assignments == null) { assignments = new HashMap<>(); @@ -1118,10 +861,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } } while (!cachedCurrentAssignments.compareAndSet(orig, update)); } - + public void run() { try { - while(!done) { + while (!done) { Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get()); Set<TopoProfileAction> removed = new HashSet<>(origProfileActions); @@ -1132,7 +875,7 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback Iterator<BlobChanging> it = changingResourcesToHandle.iterator(); //Remove/Clean up changed requests that are not for us - while(it.hasNext()) { + while (it.hasNext()) { BlobChanging rc = it.next(); if (!forSameTopology(rc.assignment, dynamicState.currentAssignment) && !forSameTopology(rc.assignment, dynamicState.newAssignment)) { @@ -1144,16 +887,17 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback DynamicState nextState = stateMachineStep(dynamicState.withNewAssignment(newAssignment.get()) - .withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions) - .withChangingBlobs(changingResourcesToHandle), staticState); + .withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions) + .withChangingBlobs(changingResourcesToHandle), staticState); if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) { LOG.info("STATE {} -> {}", dynamicState, nextState); } //Save the current state for recovery if ((nextState.currentAssignment != null && !nextState.currentAssignment.equals(dynamicState.currentAssignment)) || - (dynamicState.currentAssignment != null && !dynamicState.currentAssignment.equals(nextState.currentAssignment))) { - LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment, nextState.currentAssignment); + (dynamicState.currentAssignment != null && !dynamicState.currentAssignment.equals(nextState.currentAssignment))) { + LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment, + nextState.currentAssignment); saveNewAssignment(nextState.currentAssignment); } @@ -1167,11 +911,11 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback saveNewAssignment(nextState.newAssignment); nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment); } - + // clean up the profiler actions that are not being processed removed.removeAll(dynamicState.profileActions); removed.removeAll(dynamicState.pendingStopProfileActions); - for (TopoProfileAction action: removed) { + for (TopoProfileAction action : removed) { try { clusterState.deleteTopologyProfileRequests(action.topoId, action.request); } catch (Exception e) { @@ -1200,4 +944,259 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback this.interrupt(); this.join(); } + + static enum MachineState { + EMPTY, + RUNNING, + WAITING_FOR_WORKER_START, + KILL_AND_RELAUNCH, + KILL, + KILL_BLOB_UPDATE, + WAITING_FOR_BLOB_LOCALIZATION, + WAITING_FOR_BLOB_UPDATE; + } + + static class StaticState { + public final AsyncLocalizer localizer; + public final long hbTimeoutMs; + public final long firstHbTimeoutMs; + public final long killSleepMs; + public final long monitorFreqMs; + public final ContainerLauncher containerLauncher; + public final int port; + public final String host; + public final ISupervisor iSupervisor; + public final LocalState localState; + public final BlobChangingCallback changingCallback; + public final OnlyLatestExecutor<Integer> metricsExec; + public final WorkerMetricsProcessor metricsProcessor; + + StaticState(AsyncLocalizer localizer, long hbTimeoutMs, long firstHbTimeoutMs, + long killSleepMs, long monitorFreqMs, + ContainerLauncher containerLauncher, String host, int port, + ISupervisor iSupervisor, LocalState localState, + BlobChangingCallback changingCallback, + OnlyLatestExecutor<Integer> metricsExec, + WorkerMetricsProcessor metricsProcessor) { + this.localizer = localizer; + this.hbTimeoutMs = hbTimeoutMs; + this.firstHbTimeoutMs = firstHbTimeoutMs; + this.containerLauncher = containerLauncher; + this.killSleepMs = killSleepMs; + this.monitorFreqMs = monitorFreqMs; + this.host = host; + this.port = port; + this.iSupervisor = iSupervisor; + this.localState = localState; + this.changingCallback = changingCallback; + this.metricsExec = metricsExec; + this.metricsProcessor = metricsProcessor; + } + } + + static class DynamicState { + public final MachineState state; + public final LocalAssignment newAssignment; + public final LocalAssignment currentAssignment; + public final Container container; + public final LocalAssignment pendingLocalization; + public final Future<Void> pendingDownload; + public final Set<TopoProfileAction> profileActions; + public final Set<TopoProfileAction> pendingStopProfileActions; + public final Set<BlobChanging> changingBlobs; + public final LocalAssignment pendingChangingBlobsAssignment; + public final Set<Future<Void>> pendingChangingBlobs; + + /** + * The last time that WAITING_FOR_WORKER_START, KILL, or KILL_AND_RELAUNCH were entered into. + */ + public final long startTime; + + public DynamicState(final LocalAssignment currentAssignment, Container container, final LocalAssignment newAssignment) { + this.currentAssignment = currentAssignment; + this.container = container; + if ((currentAssignment == null) ^ (container == null)) { + throw new IllegalArgumentException("Container and current assignment must both be null, or neither can be null"); + } + + if (currentAssignment == null) { + state = MachineState.EMPTY; + } else { + state = MachineState.RUNNING; + } + + this.startTime = System.currentTimeMillis(); + this.newAssignment = newAssignment; + this.pendingLocalization = null; + this.pendingDownload = null; + this.profileActions = Collections.emptySet(); + this.pendingStopProfileActions = Collections.emptySet(); + this.changingBlobs = Collections.emptySet(); + this.pendingChangingBlobsAssignment = null; + this.pendingChangingBlobs = Collections.emptySet(); + } + + public DynamicState(final MachineState state, final LocalAssignment newAssignment, + final Container container, final LocalAssignment currentAssignment, + final LocalAssignment pendingLocalization, final long startTime, + final Future<Void> pendingDownload, final Set<TopoProfileAction> profileActions, + final Set<TopoProfileAction> pendingStopProfileActions, + final Set<BlobChanging> changingBlobs, + final Set<Future<Void>> pendingChangingBlobs, final LocalAssignment pendingChaningBlobsAssignment) { + assert pendingChangingBlobs != null; + assert !(pendingChangingBlobs.isEmpty() ^ (pendingChaningBlobsAssignment == null)); + this.state = state; + this.newAssignment = newAssignment; + this.currentAssignment = currentAssignment; + this.container = container; + this.pendingLocalization = pendingLocalization; + this.startTime = startTime; + this.pendingDownload = pendingDownload; + this.profileActions = profileActions; + this.pendingStopProfileActions = pendingStopProfileActions; + this.changingBlobs = changingBlobs; + this.pendingChangingBlobs = pendingChangingBlobs; + this.pendingChangingBlobsAssignment = pendingChaningBlobsAssignment; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append(state); + sb.append(" msInState: "); + sb.append(Time.currentTimeMillis() - startTime); + if (container != null) { + sb.append(" "); + sb.append(container); + } + return sb.toString(); + } + + /** + * Set the new assignment for the state. This should never be called from within the state machine. + * It is an input from outside. + * @param newAssignment the new assignment to set + * @return the updated DynamicState. + */ + public DynamicState withNewAssignment(LocalAssignment newAssignment) { + return new DynamicState(this.state, newAssignment, + this.container, this.currentAssignment, + this.pendingLocalization, this.startTime, + this.pendingDownload, this.profileActions, + this.pendingStopProfileActions, this.changingBlobs, + this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); + } + + public DynamicState withPendingLocalization(LocalAssignment pendingLocalization, Future<Void> pendingDownload) { + return new DynamicState(this.state, this.newAssignment, + this.container, this.currentAssignment, + pendingLocalization, this.startTime, + pendingDownload, this.profileActions, + this.pendingStopProfileActions, this.changingBlobs, + this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); + } + + public DynamicState withPendingLocalization(Future<Void> pendingDownload) { + return withPendingLocalization(this.pendingLocalization, pendingDownload); + } + + public DynamicState withState(final MachineState state) { + long newStartTime = Time.currentTimeMillis(); + return new DynamicState(state, this.newAssignment, + this.container, this.currentAssignment, + this.pendingLocalization, newStartTime, + this.pendingDownload, this.profileActions, + this.pendingStopProfileActions, this.changingBlobs, + this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); + } + + public DynamicState withCurrentAssignment(final Container container, final LocalAssignment currentAssignment) { + return new DynamicState(this.state, this.newAssignment, + container, currentAssignment, + this.pendingLocalization, this.startTime, + this.pendingDownload, this.profileActions, + this.pendingStopProfileActions, this.changingBlobs, + this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); + } + + public DynamicState withProfileActions(Set<TopoProfileAction> profileActions, Set<TopoProfileAction> pendingStopProfileActions) { + return new DynamicState(this.state, this.newAssignment, + this.container, this.currentAssignment, + this.pendingLocalization, this.startTime, + this.pendingDownload, profileActions, + pendingStopProfileActions, this.changingBlobs, + this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); + } + + public DynamicState withChangingBlobs(Set<BlobChanging> changingBlobs) { + if (changingBlobs == this.changingBlobs) { + return this; + } + return new DynamicState(this.state, this.newAssignment, + this.container, this.currentAssignment, + this.pendingLocalization, this.startTime, + this.pendingDownload, profileActions, + this.pendingStopProfileActions, changingBlobs, + this.pendingChangingBlobs, this.pendingChangingBlobsAssignment); + } + + public DynamicState withPendingChangingBlobs(Set<Future<Void>> pendingChangingBlobs, + LocalAssignment pendingChangingBlobsAssignment) { + return new DynamicState(this.state, this.newAssignment, + this.container, this.currentAssignment, + this.pendingLocalization, this.startTime, + this.pendingDownload, profileActions, + this.pendingStopProfileActions, this.changingBlobs, + pendingChangingBlobs, + pendingChangingBlobsAssignment); + } + } + + static class TopoProfileAction { + public final String topoId; + public final ProfileRequest request; + + public TopoProfileAction(String topoId, ProfileRequest request) { + this.topoId = topoId; + this.request = request; + } + + @Override + public int hashCode() { + return (37 * topoId.hashCode()) + request.hashCode(); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof TopoProfileAction)) { + return false; + } + TopoProfileAction o = (TopoProfileAction) other; + return topoId.equals(o.topoId) && request.equals(o.request); + } + + @Override + public String toString() { + return "{ " + topoId + ": " + request + " }"; + } + } + + /** + * Holds the information about a blob that is changing. + */ + static class BlobChanging { + private final LocalAssignment assignment; + private final LocallyCachedBlob blob; + private final GoodToGo.GoodToGoLatch latch; + + public BlobChanging(LocalAssignment assignment, LocallyCachedBlob blob, GoodToGo.GoodToGoLatch latch) { + this.assignment = assignment; + this.blob = blob; + this.latch = latch; + } + + @Override + public String toString() { + return "BLOB CHANGING " + blob + " " + assignment; + } + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java index 1332a21..0730957 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java @@ -1,32 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.daemon.supervisor; -import org.apache.storm.DaemonConfig; -import org.apache.storm.scheduler.ISupervisor; -import org.apache.storm.utils.Utils; -import org.apache.storm.utils.LocalState; +package org.apache.storm.daemon.supervisor; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collection; import java.util.Map; +import org.apache.storm.DaemonConfig; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; public class StandaloneSupervisor implements ISupervisor { private String supervisorId; @@ -79,7 +73,7 @@ public class StandaloneSupervisor implements ISupervisor { } - public String generateSupervisorId(){ + public String generateSupervisorId() { String extraPart = ""; try { extraPart = "-" + InetAddress.getLocalHost().getHostAddress(); http://git-wip-us.apache.org/repos/asf/storm/blob/f1c0bcbe/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java index 4e5b861..c445383 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Supervisor.java @@ -8,7 +8,7 @@ * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,6 +18,7 @@ package org.apache.storm.daemon.supervisor; +import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.net.BindException; @@ -31,8 +32,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.io.FileUtils; import org.apache.storm.Config; import org.apache.storm.DaemonConfig; @@ -66,8 +65,6 @@ import org.apache.storm.security.auth.ReqContext; import org.apache.storm.security.auth.ThriftConnectionType; import org.apache.storm.security.auth.ThriftServer; import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.ServerConfigUtils; -import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.LocalState; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerConfigUtils; @@ -76,7 +73,6 @@ import org.apache.storm.utils.Utils; import org.apache.storm.utils.VersionInfo; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; -import org.apache.zookeeper.data.ACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +81,6 @@ public class Supervisor implements DaemonCommon, AutoCloseable { private final Map<String, Object> conf; private final IContext sharedContext; private final IAuthorizer authorizationHandler; - private volatile boolean active; private final ISupervisor iSupervisor; private final Utils.UptimeComputer upTime; private final String stormVersion; @@ -105,12 +100,13 @@ public class Supervisor implements DaemonCommon, AutoCloseable { // to really make this work well. private final ExecutorService heartbeatExecutor; private final AsyncLocalizer asyncLocalizer; + private volatile boolean active; private EventManager eventManager; private ReadClusterState readState; private ThriftServer thriftServer; //used for local cluster heartbeating private Nimbus.Iface localNimbus; - + private Supervisor(ISupervisor iSupervisor) throws IOException, IllegalAccessException, InstantiationException, ClassNotFoundException { this(Utils.readStormConfig(), null, iSupervisor); @@ -118,13 +114,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable { /** * Constructor for supervisor daemon. - * @param conf config + * + * @param conf config * @param sharedContext {@link IContext} - * @param iSupervisor {@link ISupervisor} + * @param iSupervisor {@link ISupervisor} * @throws IOException */ public Supervisor(Map<String, Object> conf, IContext sharedContext, ISupervisor iSupervisor) - throws IOException, IllegalAccessException, ClassNotFoundException, InstantiationException{ + throws IOException, IllegalAccessException, ClassNotFoundException, InstantiationException { this.conf = conf; this.iSupervisor = iSupervisor; this.active = true; @@ -133,17 +130,17 @@ public class Supervisor implements DaemonCommon, AutoCloseable { this.sharedContext = sharedContext; this.heartbeatExecutor = Executors.newFixedThreadPool(1); this.authorizationHandler = StormCommon.mkAuthorizationHandler( - (String) conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf); + (String) conf.get(DaemonConfig.SUPERVISOR_AUTHORIZER), conf); if (authorizationHandler == null && conf.get(DaemonConfig.NIMBUS_AUTHORIZER) != null) { throw new IllegalStateException("It looks like authorization is turned on for nimbus but not for the " - + "supervisor...."); + + "supervisor...."); } - + iSupervisor.prepare(conf, ServerConfigUtils.supervisorIsupervisorDir(conf)); try { this.stormClusterState = ClusterUtils.mkStormClusterState(conf, - new ClusterStateContext(DaemonType.SUPERVISOR, conf)); + new ClusterStateContext(DaemonType.SUPERVISOR, conf)); } catch (Exception e) { LOG.error("supervisor can't create stormClusterState"); throw Utils.wrapInRuntime(e); @@ -175,6 +172,18 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } /** + * supervisor daemon enter entrance. + * + * @param args + */ + public static void main(String[] args) throws Exception { + Utils.setupDefaultUncaughtExceptionHandler(); + @SuppressWarnings("resource") + Supervisor instance = new Supervisor(new StandaloneSupervisor()); + instance.launchDaemon(); + } + + /** * Get the executor service that is supposed to be used for heart-beats. */ public ExecutorService getHeartbeatExecutor() { @@ -184,7 +193,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { public String getId() { return supervisorId; } - + IContext getSharedContext() { return sharedContext; } @@ -232,11 +241,11 @@ public class Supervisor implements DaemonCommon, AutoCloseable { public AtomicReference<Map<Long, LocalAssignment>> getCurrAssignment() { return currAssignment; } - + AsyncLocalizer getAsyncLocalizer() { return asyncLocalizer; } - + EventManager getEventManger() { return eventManager; } @@ -245,14 +254,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable { return this; } + public Nimbus.Iface getLocalNimbus() { + return this.localNimbus; + } + public void setLocalNimbus(Nimbus.Iface nimbus) { this.localNimbus = nimbus; } - public Nimbus.Iface getLocalNimbus() { - return this.localNimbus; - } - /** * Launch the supervisor. */ @@ -276,7 +285,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up // to date even if callbacks don't all work exactly right eventTimer.scheduleRecurring(0, 10, - new EventManagerPushCallback(new SynchronizeAssignments(this, null, readState), eventManager)); + new EventManagerPushCallback(new SynchronizeAssignments(this, null, readState), eventManager)); // supervisor health check eventTimer.scheduleRecurring(30, 30, new SupervisorHealthCheck(this)); @@ -323,7 +332,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { @VisibleForTesting public void checkAuthorization(String topoName, Map<String, Object> topoConf, String operation, ReqContext context) - throws AuthorizationException { + throws AuthorizationException { IAuthorizer aclHandler = authorizationHandler; if (context == null) { context = ReqContext.context(); @@ -337,19 +346,19 @@ public class Supervisor implements DaemonCommon, AutoCloseable { if (context.isImpersonating()) { LOG.warn("principal: {} is trying to impersonate principal: {}", context.realPrincipal(), - context.principal()); + context.principal()); throw new AuthorizationException("Supervisor does not support impersonation"); } if (aclHandler != null) { if (!aclHandler.permit(context, operation, checkConf)) { ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(), - operation, topoName, "access-denied"); - throw new AuthorizationException( operation + (topoName != null ? " on topology " + topoName : "") + - " is not authorized"); + operation, topoName, "access-denied"); + throw new AuthorizationException(operation + (topoName != null ? " on topology " + topoName : "") + + " is not authorized"); } else { ThriftAccessLogger.logAccess(context.requestID(), context.remoteAddress(), context.principal(), - operation, topoName, "access-granted"); + operation, topoName, "access-granted"); } } } @@ -366,58 +375,59 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } TProcessor processor = new org.apache.storm.generated.Supervisor.Processor( - new org.apache.storm.generated.Supervisor.Iface() { - @Override - public void sendSupervisorAssignments(SupervisorAssignments assignments) - throws AuthorizationException, TException { - checkAuthorization("sendSupervisorAssignments"); - LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments); - SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments, - getReadClusterState()); - getEventManger().add(syn); - } + new org.apache.storm.generated.Supervisor.Iface() { + @Override + public void sendSupervisorAssignments(SupervisorAssignments assignments) + throws AuthorizationException, TException { + checkAuthorization("sendSupervisorAssignments"); + LOG.info("Got an assignments from master, will start to sync with assignments: {}", assignments); + SynchronizeAssignments syn = new SynchronizeAssignments(getSupervisor(), assignments, + getReadClusterState()); + getEventManger().add(syn); + } - @Override - public Assignment getLocalAssignmentForStorm(String id) - throws NotAliveException, AuthorizationException, TException { - Map<String, Object> topoConf = null; - try { - topoConf = ConfigUtils.readSupervisorStormConf(conf, id); - } catch (IOException e) { - LOG.warn("Topology config is not localized yet..."); - } - checkAuthorization(id, topoConf, "getLocalAssignmentForStorm"); - Assignment assignment = getStormClusterState().assignmentInfo(id, null); - if (null == assignment) { - throw new NotAliveException("No local assignment assigned for storm: " - + id - + " for node: " - + getHostName()); - } - return assignment; + @Override + public Assignment getLocalAssignmentForStorm(String id) + throws NotAliveException, AuthorizationException, TException { + Map<String, Object> topoConf = null; + try { + topoConf = ConfigUtils.readSupervisorStormConf(conf, id); + } catch (IOException e) { + LOG.warn("Topology config is not localized yet..."); } + checkAuthorization(id, topoConf, "getLocalAssignmentForStorm"); + Assignment assignment = getStormClusterState().assignmentInfo(id, null); + if (null == assignment) { + throw new NotAliveException("No local assignment assigned for storm: " + + id + + " for node: " + + getHostName()); + } + return assignment; + } - @Override - public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat) - throws AuthorizationException, NotAliveException, TException { - // do nothing except validate heartbeat for now. - String id = heartbeat.get_storm_id(); - Map<String, Object> topoConf = null; - try { - topoConf = ConfigUtils.readSupervisorStormConf(conf, id); - } catch (IOException e) { - LOG.warn("Topology config is not localized yet..."); - throw new NotAliveException(id + " does not appear to be alive, you should probably exit"); - } - checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat"); + @Override + public void sendSupervisorWorkerHeartbeat(SupervisorWorkerHeartbeat heartbeat) + throws AuthorizationException, NotAliveException, TException { + // do nothing except validate heartbeat for now. + String id = heartbeat.get_storm_id(); + Map<String, Object> topoConf = null; + try { + topoConf = ConfigUtils.readSupervisorStormConf(conf, id); + } catch (IOException e) { + LOG.warn("Topology config is not localized yet..."); + throw new NotAliveException(id + " does not appear to be alive, you should probably exit"); } - }); + checkAuthorization(id, topoConf, "sendSupervisorWorkerHeartbeat"); + } + }); this.thriftServer = new ThriftServer(conf, processor, ThriftConnectionType.SUPERVISOR); this.thriftServer.serve(); } /** * Used for local cluster assignments distribution. + * * @param assignments {@link SupervisorAssignments} */ public void sendSupervisorAssignments(SupervisorAssignments assignments) { @@ -438,7 +448,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } }); } - + @Override public void close() { try { @@ -455,14 +465,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } asyncLocalizer.close(); getStormClusterState().disconnect(); - if(thriftServer != null) { + if (thriftServer != null) { this.thriftServer.stop(); } } catch (Exception e) { LOG.error("Error Shutting down", e); } } - + void killWorkers(Collection<String> workerIds, ContainerLauncher launcher) throws InterruptedException, IOException { HashSet<Killable> containers = new HashSet<>(); for (String workerId : workerIds) { @@ -482,14 +492,14 @@ public class Supervisor implements DaemonCommon, AutoCloseable { if (!containers.isEmpty()) { Time.sleepSecs(shutdownSleepSecs); } - for (Killable k: containers) { + for (Killable k : containers) { try { k.forceKill(); long start = Time.currentTimeMillis(); while (!k.areAllProcessesDead()) { if ((Time.currentTimeMillis() - start) > 10_000) { - throw new RuntimeException("Giving up on killing " + k - + " after " + (Time.currentTimeMillis() - start) + " ms"); + throw new RuntimeException("Giving up on killing " + k + + " after " + (Time.currentTimeMillis() - start) + " ms"); } Time.sleep(100); k.forceKill(); @@ -507,7 +517,7 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } else { try { ContainerLauncher launcher = ContainerLauncher.make(getConf(), getId(), getThriftServerPort(), - getSharedContext()); + getSharedContext()); killWorkers(SupervisorUtils.supervisorWorkerIds(conf), launcher); } catch (Exception e) { throw Utils.wrapInRuntime(e); @@ -522,20 +532,8 @@ public class Supervisor implements DaemonCommon, AutoCloseable { } return heartbeatTimer.isTimerWaiting() - && workerHeartbeatTimer.isTimerWaiting() - && eventTimer.isTimerWaiting() - && eventManager.waiting(); - } - - /** - * supervisor daemon enter entrance. - * - * @param args - */ - public static void main(String[] args) throws Exception { - Utils.setupDefaultUncaughtExceptionHandler(); - @SuppressWarnings("resource") - Supervisor instance = new Supervisor(new StandaloneSupervisor()); - instance.launchDaemon(); + && workerHeartbeatTimer.isTimerWaiting() + && eventTimer.isTimerWaiting() + && eventManager.waiting(); } }
