This is an automated email from the ASF dual-hosted git repository. ethanli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new e28401f STORM-3479 HB timeout configurable on a topology level new 822a468 Merge pull request #3121 from dandsager1/STORM-3479 e28401f is described below commit e28401fe1838a7e4d2edcd2b636b0ee6710b9435 Author: dandsager <david.andsa...@verizonmedia.com> AuthorDate: Thu Sep 5 15:10:17 2019 -0500 STORM-3479 HB timeout configurable on a topology level --- conf/defaults.yaml | 2 + storm-client/src/jvm/org/apache/storm/Config.java | 18 +++++++ .../main/java/org/apache/storm/DaemonConfig.java | 4 +- .../org/apache/storm/daemon/nimbus/Nimbus.java | 62 ++++++++++++++++++---- .../apache/storm/daemon/supervisor/Container.java | 8 +++ .../org/apache/storm/daemon/supervisor/Slot.java | 37 +++++++++++-- .../storm/daemon/supervisor/SupervisorUtils.java | 8 --- .../storm/daemon/logviewer/utils/WorkerLogs.java | 39 +++++++++++++- 8 files changed, 152 insertions(+), 26 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index e555ac8..3e9f8fc 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -171,6 +171,8 @@ supervisor.monitor.frequency.secs: 3 supervisor.heartbeat.frequency.secs: 5 #max timeout for a node worker heartbeats when master gains leadership supervisor.worker.heartbeats.max.timeout.secs: 600 +#For topology configurable heartbeat timeout, maximum allowed heartbeat timeout. +worker.max.timeout.secs: 600 supervisor.enable: true supervisor.supervisors: [] supervisor.supervisors.commands: [] diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index b46c112..a54faf6 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -1042,12 +1042,30 @@ public class Config extends HashMap<String, Object> { public static final String STORM_THRIFT_TRANSPORT_PLUGIN = "storm.thrift.transport"; /** * How long a worker can go without heartbeating before the supervisor tries to restart the worker process. + * Can be overridden by {@link #TOPOLOGY_WORKER_TIMEOUT_SECS}, if set. */ @IsInteger @IsPositiveNumber @NotNull public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs"; /** + * Enforce maximum on {@link #TOPOLOGY_WORKER_TIMEOUT_SECS}. + */ + @IsInteger + @IsPositiveNumber + @NotNull + public static final String WORKER_MAX_TIMEOUT_SECS = "worker.max.timeout.secs"; + /** + * Topology configurable worker heartbeat timeout before the supervisor tries to restart the worker process. + * Maximum value constrained by {@link #WORKER_MAX_TIMEOUT_SECS}. + * When topology timeout is greater, the following configs are effectively overridden: + * {@link #SUPERVISOR_WORKER_TIMEOUT_SECS}, SUPERVISOR_WORKER_START_TIMEOUT_SECS, NIMBUS_TASK_TIMEOUT_SECS and NIMBUS_TASK_LAUNCH_SECS. + */ + @IsInteger + @IsPositiveNumber + @NotNull + public static final String TOPOLOGY_WORKER_TIMEOUT_SECS = "topology.worker.timeout.secs"; + /** * How many seconds to allow for graceful worker shutdown when killing workers before resorting to force kill. * If a worker fails to shut down gracefully within this delay, it will either suicide or be forcibly killed by the supervisor. */ diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index def49ae..28076f4 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -186,12 +186,12 @@ public class DaemonConfig implements Validated { /** * How long without heartbeating a task can go before nimbus will consider the task dead and reassign it to another location. + * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set. */ @IsInteger @IsPositiveNumber public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs"; - /** * How often nimbus should wake up to check heartbeats and do reassignments. Note that if a machine ever goes down Nimbus will * immediately wake up and take action. This parameter is for checking for failures when there's no explicit event like that occurring. @@ -234,6 +234,7 @@ public class DaemonConfig implements Validated { * * <p>A separate timeout exists for launch because there can be quite a bit of overhead * to launching new JVM's and configuring them.</p> + * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set. */ @IsInteger @IsPositiveNumber @@ -794,6 +795,7 @@ public class DaemonConfig implements Validated { * How long a worker can go without heartbeating during the initial launch before the supervisor tries to restart the worker process. * This value override supervisor.worker.timeout.secs during launch because there is additional overhead to starting and configuring the * JVM on launch. + * Can be exceeded when {@link Config#TOPOLOGY_WORKER_TIMEOUT_SECS} is set. */ @IsInteger @IsPositiveNumber diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index 45add73..69b2bb1 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -1113,6 +1113,17 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { ret.put(Config.TOPOLOGY_ACKER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_ACKER_EXECUTORS)); ret.put(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS, mergedConf.get(Config.TOPOLOGY_EVENTLOGGER_EXECUTORS)); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, mergedConf.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM)); + + if (mergedConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) { + int workerTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)); + int workerMaxTimeoutSecs = (Integer) ObjectReader.getInt(mergedConf.get(Config.WORKER_MAX_TIMEOUT_SECS)); + if (workerTimeoutSecs > workerMaxTimeoutSecs) { + ret.put(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, workerMaxTimeoutSecs); + String topoId = (String) mergedConf.get(Config.STORM_ID); + LOG.warn("Topology {} topology.worker.timeout.secs is too large. Reducing from {} to {}", + topoId, workerTimeoutSecs, workerMaxTimeoutSecs); + } + } return ret; } @@ -1862,8 +1873,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { IStormClusterState state = stormClusterState; Map<List<Integer>, Map<String, Object>> executorBeats = StatsUtil.convertExecutorBeats(state.executorBeats(topoId, existingAssignment.get_executor_node_port())); - heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors, - ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); + heartbeatsCache.updateFromZkHeartbeat(topoId, executorBeats, allExecutors, getTopologyHeartbeatTimeoutSecs(topoId)); } /** @@ -1880,17 +1890,21 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { updateHeartbeatsFromZkHeartbeat(topoId, topologyToExecutors.get(topoId), entry.getValue()); } else { LOG.debug("Timing out old heartbeats for {}", topoId); - heartbeatsCache.timeoutOldHeartbeats(topoId, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); + heartbeatsCache.timeoutOldHeartbeats(topoId, getTopologyHeartbeatTimeoutSecs(topoId)); } } } - private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat) { - heartbeatsCache.updateHeartbeat(workerHeartbeat, ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS))); + private void updateCachedHeartbeatsFromWorker(SupervisorWorkerHeartbeat workerHeartbeat, int heartbeatTimeoutSecs) { + heartbeatsCache.updateHeartbeat(workerHeartbeat, heartbeatTimeoutSecs); } private void updateCachedHeartbeatsFromSupervisor(SupervisorWorkerHeartbeats workerHeartbeats) { - workerHeartbeats.get_worker_heartbeats().forEach(this::updateCachedHeartbeatsFromWorker); + for (SupervisorWorkerHeartbeat hb : workerHeartbeats.get_worker_heartbeats()) { + String topoId = hb.get_storm_id(); + int heartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoId); + updateCachedHeartbeatsFromWorker(hb, heartbeatTimeoutSecs); + } if (!heartbeatsReadyFlag.get() && !Strings.isNullOrEmpty(workerHeartbeats.get_supervisor_id())) { heartbeatsRecoveryStrategy.reportNodeId(workerHeartbeats.get_supervisor_id()); } @@ -1927,8 +1941,7 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { } private Set<List<Integer>> aliveExecutors(String topoId, Set<List<Integer>> allExecutors, Assignment assignment) { - return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment, - ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS))); + return heartbeatsCache.getAliveExecutors(topoId, allExecutors, assignment, getTopologyLaunchHeartbeatTimeoutSec(topoId)); } private List<List<Integer>> computeExecutors(String topoId, StormBase base, Map<String, Object> topoConf, @@ -2508,6 +2521,35 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { base.set_principal((String) topoConf.get(Config.TOPOLOGY_SUBMITTER_PRINCIPAL)); } + // Topology may set custom heartbeat timeout. + private int getTopologyHeartbeatTimeoutSecs(Map<String, Object> topoConf) { + int defaultNimbusTimeout = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)); + if (topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) { + int topoTimeout = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)); + topoTimeout = Math.max(topoTimeout, defaultNimbusTimeout); + return topoTimeout; + } + + return defaultNimbusTimeout; + } + + private int getTopologyHeartbeatTimeoutSecs(String topoId) { + try { + Map<String, Object> topoConf = tryReadTopoConf(topoId, topoCache); + return getTopologyHeartbeatTimeoutSecs(topoConf); + } catch (Exception e) { + // contain any exception + LOG.warn("Exception when getting heartbeat timeout.", e.getMessage()); + return ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_TIMEOUT_SECS)); + } + } + + private int getTopologyLaunchHeartbeatTimeoutSec(String topoId) { + int nimbusLaunchTimeout = ObjectReader.getInt(conf.get(DaemonConfig.NIMBUS_TASK_LAUNCH_SECS)); + int topoHeartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoId); + return Math.max(nimbusLaunchTimeout, topoHeartbeatTimeoutSecs); + } + private void startTopology(String topoName, String topoId, TopologyStatus initStatus, String owner, String principal, Map<String, Object> topoConf, StormTopology stormTopology) throws KeyNotFoundException, AuthorizationException, IOException, InvalidTopologyException { @@ -4705,11 +4747,11 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { String id = hb.get_storm_id(); try { Map<String, Object> topoConf = tryReadTopoConf(id, topoCache); - topoConf = Utils.merge(conf, topoConf); String topoName = (String) topoConf.get(Config.TOPOLOGY_NAME); checkAuthorization(topoName, topoConf, "sendSupervisorWorkerHeartbeat"); if (isLeader()) { - updateCachedHeartbeatsFromWorker(hb); + int heartbeatTimeoutSecs = getTopologyHeartbeatTimeoutSecs(topoConf); + updateCachedHeartbeatsFromWorker(hb, heartbeatTimeoutSecs); } } catch (Exception e) { LOG.warn("Send HB exception. (topology id='{}')", id, e); diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java index a25faad..21ff8ec 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Container.java @@ -49,6 +49,7 @@ import org.apache.storm.metricstore.MetricException; import org.apache.storm.metricstore.WorkerMetricsProcessor; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; @@ -418,6 +419,13 @@ public abstract class Container implements Killable { } data.put(DaemonConfig.LOGS_USERS, logsUsers.toArray()); + if (topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS) != null) { + int topoTimeout = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)); + int defaultWorkerTimeout = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + topoTimeout = Math.max(topoTimeout, defaultWorkerTimeout); + data.put(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, topoTimeout); + } + File file = ServerConfigUtils.getLogMetaDataFile(conf, topologyId, port); Yaml yaml = new Yaml(); diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index c8e6f19..7575a91 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -667,7 +667,8 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat(); if (hb != null) { long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; - if (hbAgeMs <= staticState.hbTimeoutMs) { + long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState); + if (hbAgeMs <= hbTimeoutMs) { return dynamicState.withState(MachineState.RUNNING); } } @@ -681,9 +682,10 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback dynamicState = updateAssignmentIfNeeded(dynamicState); long timeDiffms = (Time.currentTimeMillis() - dynamicState.startTime); - if (timeDiffms > staticState.firstHbTimeoutMs) { + long hbFirstTimeoutMs = getFirstHbTimeoutMs(staticState, dynamicState); + if (timeDiffms > hbFirstTimeoutMs) { LOG.warn("SLOT {}: Container {} failed to launch in {} ms.", staticState.port, dynamicState.container, - staticState.firstHbTimeoutMs); + hbFirstTimeoutMs); return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState); } @@ -745,8 +747,9 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback } long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; - if (timeDiffMs > staticState.hbTimeoutMs) { - LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, staticState.hbTimeoutMs); + long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState); + if (timeDiffMs > hbTimeoutMs) { + LOG.warn("SLOT {}: HB is too old {} > {}", staticState.port, timeDiffMs, hbTimeoutMs); return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState); } @@ -833,6 +836,30 @@ public class Slot extends Thread implements AutoCloseable, BlobChangingCallback return dynamicState.state; } + /* + * Get worker heartbeat timeout time in ms. Use topology specified timeout if provided. + */ + private static long getHbTimeoutMs(StaticState staticState, DynamicState dynamicState) { + long hbTimeoutMs = staticState.hbTimeoutMs; + Map<String, Object> topoConf = dynamicState.container.topoConf; + + if (topoConf != null && topoConf.containsKey(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) { + long topoHbTimeoutMs = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_WORKER_TIMEOUT_SECS)) * 1000; + topoHbTimeoutMs = Math.max(topoHbTimeoutMs, hbTimeoutMs); + hbTimeoutMs = topoHbTimeoutMs; + } + + return hbTimeoutMs; + } + + /* + * Get worker heartbeat timeout when waiting for worker to start. + * If topology specific timeout if set, ensure first heartbeat timeout >= topology specific timeout. + */ + private static long getFirstHbTimeoutMs(StaticState staticState, DynamicState dynamicState) { + return Math.max(getHbTimeoutMs(staticState, dynamicState), staticState.firstHbTimeoutMs); + } + /** * Set a new assignment asynchronously. * @param newAssignment the new assignment for this slot to run, null to run nothing diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java index a0d0397..5e4ce3f 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -117,10 +117,6 @@ public class SupervisorUtils { return _instance.readWorkerHeartbeatImpl(conf, workerId); } - public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) { - return _instance.isWorkerHbTimedOutImpl(now, whb, conf); - } - public Map<String, LSWorkerHeartbeat> readWorkerHeartbeatsImpl(Map<String, Object> conf) { Map<String, LSWorkerHeartbeat> workerHeartbeats = new HashMap<>(); @@ -143,8 +139,4 @@ public class SupervisorUtils { return null; } } - - private boolean isWorkerHbTimedOutImpl(int now, LSWorkerHeartbeat whb, Map<String, Object> conf) { - return (now - whb.get_time_secs()) > ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); - } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java index 0d5b14a..e57b626 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java @@ -25,6 +25,7 @@ import static org.apache.storm.Config.TOPOLOGY_SUBMITTER_USER; import com.codahale.metrics.Meter; import com.google.common.collect.Lists; +import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -40,11 +41,15 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.storm.Config; import org.apache.storm.daemon.supervisor.ClientSupervisorUtils; import org.apache.storm.daemon.supervisor.SupervisorUtils; import org.apache.storm.daemon.utils.PathUtil; +import org.apache.storm.generated.LSWorkerHeartbeat; import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.utils.LruMap; import org.apache.storm.utils.ObjectReader; +import org.apache.storm.utils.ServerConfigUtils; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; import org.jooq.lambda.Unchecked; @@ -64,6 +69,7 @@ public class WorkerLogs { private final Map<String, Object> stormConf; private final Path logRootDir; private final DirectoryCleaner directoryCleaner; + private final LruMap<String, Integer> mapTopologyIdToHeartbeatTimeout; /** * Constructor. @@ -77,6 +83,7 @@ public class WorkerLogs { this.logRootDir = logRootDir.toAbsolutePath().normalize(); this.numSetPermissionsExceptions = metricsRegistry.registerMeter(ExceptionMeterNames.NUM_SET_PERMISSION_EXCEPTIONS); this.directoryCleaner = new DirectoryCleaner(metricsRegistry); + this.mapTopologyIdToHeartbeatTimeout = new LruMap<>(200); } /** @@ -189,12 +196,40 @@ public class WorkerLogs { */ public Set<String> getAliveIds(int nowSecs) throws IOException { return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream() - .filter(entry -> Objects.nonNull(entry.getValue()) - && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf)) + .filter(entry -> Objects.nonNull(entry.getValue()) && !isTimedOut(nowSecs, entry)) .map(Map.Entry::getKey) .collect(toCollection(TreeSet::new)); } + private boolean isTimedOut(int nowSecs, Map.Entry<String, LSWorkerHeartbeat> entry) { + LSWorkerHeartbeat hb = entry.getValue(); + int workerLogTimeout = getTopologyTimeout(hb); + return (nowSecs - hb.get_time_secs()) >= workerLogTimeout; + } + + private int getTopologyTimeout(LSWorkerHeartbeat hb) { + String topoId = hb.get_topology_id(); + Integer cachedTimeout = mapTopologyIdToHeartbeatTimeout.get(topoId); + if (cachedTimeout != null) { + return cachedTimeout; + } else { + int timeout = getWorkerLogTimeout(stormConf, topoId, hb.get_port()); + mapTopologyIdToHeartbeatTimeout.put(topoId, timeout); + return timeout; + } + } + + private int getWorkerLogTimeout(Map<String, Object> conf, String topologyId, int port) { + int defaultWorkerLogTimeout = ObjectReader.getInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS)); + File file = ServerConfigUtils.getLogMetaDataFile(conf, topologyId, port); + Map<String, Object> map = (Map<String, Object>) Utils.readYamlFile(file.getAbsolutePath()); + if (map == null) { + return defaultWorkerLogTimeout; + } + + return (Integer) map.getOrDefault(Config.TOPOLOGY_WORKER_TIMEOUT_SECS, defaultWorkerLogTimeout); + } + /** * Finds directories for specific worker ids that can be cleaned up. *