http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java new file mode 100644 index 0000000..9c14ca7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/HttpProbe.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; + +public class HttpProbe extends Probe { + protected static final Logger log = LoggerFactory.getLogger(HttpProbe.class); + + private final URL url; + private final int timeout; + private final int min, max; + + + public HttpProbe(URL url, int timeout, int min, int max, Configuration conf) throws IOException { + super("Http probe of " + url + " [" + min + "-" + max + "]", conf); + this.url = url; + this.timeout = timeout; + this.min = min; + this.max = max; + } + + public static HttpURLConnection getConnection(URL url, int timeout) throws IOException { + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setInstanceFollowRedirects(true); + connection.setConnectTimeout(timeout); + return connection; + } + + @Override + public ProbeStatus ping(boolean livePing) { + ProbeStatus status = new ProbeStatus(); + HttpURLConnection connection = null; + try { + if (log.isDebugEnabled()) { + // LOG.debug("Fetching " + url + " with timeout " + timeout); + } + connection = getConnection(url, this.timeout); + int rc = connection.getResponseCode(); + if (rc < min || rc > max) { + String error = "Probe " + url + " error code: " + rc; + log.info(error); + status.fail(this, + new IOException(error)); + } else { + status.succeed(this); + } + } catch (IOException e) { + String error = "Probe " + url + " failed: " + e; + log.info(error, e); + status.fail(this, + new IOException(error, e)); + } finally { + if (connection != null) { + connection.disconnect(); + } + } + return status; + } + +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/LogEntryBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/LogEntryBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/LogEntryBuilder.java new file mode 100644 index 0000000..a1ad44f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/LogEntryBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +/** + * Build up log entries for ease of splunk + */ +public class LogEntryBuilder { + + private final StringBuilder builder = new StringBuilder(); + + public LogEntryBuilder() { + } + + public LogEntryBuilder(String text) { + elt(text); + } + + + public LogEntryBuilder(String name, Object value) { + entry(name, value); + } + + public LogEntryBuilder elt(String text) { + addComma(); + builder.append(text); + return this; + } + + public LogEntryBuilder elt(String name, Object value) { + addComma(); + entry(name, value); + return this; + } + + private void addComma() { + if (!isEmpty()) { + builder.append(", "); + } + } + + private void entry(String name, Object value) { + builder.append(name).append('='); + if (value != null) { + builder.append('"').append(value.toString()).append('"'); + } else { + builder.append("null"); + } + } + + @Override + public String toString() { + return builder.toString(); + } + + private boolean isEmpty() { + return builder.length() == 0; + } + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java new file mode 100644 index 0000000..f7bdd4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorKeys.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +/** + * Config keys for monitoring + */ +public interface MonitorKeys { + + /** + * Prefix of all other configuration options: {@value} + */ + String MONITOR_KEY_PREFIX = "service.monitor."; + + + /** + * Classname of the reporter Key: {@value} + */ + String MONITOR_REPORTER = + MONITOR_KEY_PREFIX + "report.classname"; + + /** + * Interval in milliseconds between reporting health status to the reporter + * Key: {@value} + */ + String MONITOR_REPORT_INTERVAL = + MONITOR_KEY_PREFIX + "report.interval"; + + /** + * Time in millis between the last probing cycle ending and the new one + * beginning. Key: {@value} + */ + String MONITOR_PROBE_INTERVAL = + MONITOR_KEY_PREFIX + "probe.interval"; + + /** + * How long in milliseconds does the probing loop have to be blocked before + * that is considered a liveness failure Key: {@value} + */ + String MONITOR_PROBE_TIMEOUT = + MONITOR_KEY_PREFIX + "probe.timeout"; + + /** + * How long in milliseconds does the probing loop have to be blocked before + * that is considered a liveness failure Key: {@value} + */ + String MONITOR_BOOTSTRAP_TIMEOUT = + MONITOR_KEY_PREFIX + "bootstrap.timeout"; + + + /** + * does the monitor depend on DFS being live + */ + String MONITOR_DEPENDENCY_DFSLIVE = + MONITOR_KEY_PREFIX + "dependency.dfslive"; + + + /** + * default timeout for the entire bootstrap phase {@value} + */ + + int BOOTSTRAP_TIMEOUT_DEFAULT = 60000; + + + /** + * Default value if the key is not in the config file: {@value} + */ + int REPORT_INTERVAL_DEFAULT = 10000; + /** + * Default value if the key is not in the config file: {@value} + */ + int PROBE_INTERVAL_DEFAULT = 10000; + /** + * Default value if the key is not in the config file: {@value} + */ + int PROBE_TIMEOUT_DEFAULT = 60000; + + /** + * Port probe enabled/disabled flag Key: {@value} + */ + String PORT_PROBE_ENABLED = + MONITOR_KEY_PREFIX + "portprobe.enabled"; + + + /** + * Port probing key : port to attempt to create a TCP connection to {@value} + */ + String PORT_PROBE_PORT = + MONITOR_KEY_PREFIX + "portprobe.port"; + + /** + * Port probing key : port to attempt to create a TCP connection to {@value} + */ + String PORT_PROBE_HOST = + MONITOR_KEY_PREFIX + "portprobe.host"; + + + /** + * Port probing key : timeout of the connection attempt {@value} + */ + String PORT_PROBE_CONNECT_TIMEOUT = + MONITOR_KEY_PREFIX + "portprobe.connect.timeout"; + + /** + * Port probing key : bootstrap timeout -how long in milliseconds should the + * port probing take to connect before the failure to connect is considered a + * liveness failure. That is: how long should the IPC port take to come up? + * {@value} + */ + String PORT_PROBE_BOOTSTRAP_TIMEOUT = + MONITOR_KEY_PREFIX + "portprobe.bootstrap.timeout"; + + + /** + * default timeout for port probes {@value} + */ + int PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = 60000; + + /** + * default value for port probe connection attempts {@value} + */ + + int PORT_PROBE_CONNECT_TIMEOUT_DEFAULT = 1000; + + + /** + * default port for probes {@value} + */ + int DEFAULT_PROBE_PORT = 8020; + + + /** + * default host for probes {@value} + */ + String DEFAULT_PROBE_HOST = "localhost"; + + + /** + * Probe enabled/disabled flag Key: {@value} + */ + String LS_PROBE_ENABLED = + MONITOR_KEY_PREFIX + "lsprobe.enabled"; + + /** + * Probe path for LS operation Key: {@value} + */ + String LS_PROBE_PATH = + MONITOR_KEY_PREFIX + "lsprobe.path"; + + /** + * Default path for LS operation Key: {@value} + */ + String LS_PROBE_DEFAULT = "/"; + + /** + * Port probing key : bootstrap timeout -how long in milliseconds should the + * port probing take to connect before the failure to connect is considered a + * liveness failure. That is: how long should the IPC port take to come up? + * {@value} + */ + String LS_PROBE_BOOTSTRAP_TIMEOUT = + MONITOR_KEY_PREFIX + "lsprobe.bootstrap.timeout"; + + + /** + * default timeout for port probes {@value} + */ + + int LS_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT; + + + /** + * Probe enabled/disabled flag Key: {@value} + */ + String WEB_PROBE_ENABLED = + MONITOR_KEY_PREFIX + "webprobe.enabled"; + + /** + * Probe URL Key: {@value} + */ + String WEB_PROBE_URL = + MONITOR_KEY_PREFIX + "webprobe.url"; + + /** + * Default path for web probe Key: {@value} + */ + String WEB_PROBE_DEFAULT_URL = "http://localhost:50070/"; + + /** + * min error code Key: {@value} + */ + String WEB_PROBE_MIN = + MONITOR_KEY_PREFIX + "webprobe.min"; + /** + * min error code Key: {@value} + */ + String WEB_PROBE_MAX = + MONITOR_KEY_PREFIX + "webprobe.max"; + + + /** + * Port probing key : timeout of the connection attempt {@value} + */ + String WEB_PROBE_CONNECT_TIMEOUT = + MONITOR_KEY_PREFIX + "webprobe.connect.timeout"; + + /** + * Default HTTP response code expected from the far end for + * the endpoint to be considered live. + */ + int WEB_PROBE_DEFAULT_CODE = 200; + + /** + * Port probing key : bootstrap timeout -how long in milliseconds should the + * port probing take to connect before the failure to connect is considered a + * liveness failure. That is: how long should the IPC port take to come up? + * {@value} + */ + String WEB_PROBE_BOOTSTRAP_TIMEOUT = + MONITOR_KEY_PREFIX + "webprobe.bootstrap.timeout"; + + + /** + * default timeout for port probes {@value} + */ + + int WEB_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT; + + /** + * Probe enabled/disabled flag Key: {@value} + */ + String JT_PROBE_ENABLED = + MONITOR_KEY_PREFIX + "jtprobe.enabled"; + + /** + * Port probing key : bootstrap timeout -how long in milliseconds should the + * port probing take to connect before the failure to connect is considered a + * liveness failure. That is: how long should the IPC port take to come up? + * {@value} + */ + String JT_PROBE_BOOTSTRAP_TIMEOUT = + MONITOR_KEY_PREFIX + "jtprobe.bootstrap.timeout"; + + + /** + * default timeout for port probes {@value} + */ + + int JT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT; + + + /** + * Probe enabled/disabled flag Key: {@value} + */ + String PID_PROBE_ENABLED = + MONITOR_KEY_PREFIX + "pidprobe.enabled"; + + /** + * PID probing key : pid to attempt to create a TCP connection to {@value} + */ + String PID_PROBE_PIDFILE = + MONITOR_KEY_PREFIX + "pidprobe.pidfile"; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java new file mode 100644 index 0000000..a4447e3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/MonitorUtils.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Formatter; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeSet; + +/** + * Various utils to work with the monitor + */ +public final class MonitorUtils { + protected static final Logger log = LoggerFactory.getLogger(MonitorUtils.class); + + private MonitorUtils() { + } + + public static String toPlural(int val) { + return val != 1 ? "s" : ""; + } + + /** + * Convert the arguments -including dropping any empty strings that creep in + * @param args arguments + * @return a list view with no empty strings + */ + public static List<String> prepareArgs(String[] args) { + List<String> argsList = new ArrayList<String>(args.length); + StringBuilder argsStr = new StringBuilder("Arguments: ["); + for (String arg : args) { + argsStr.append('"').append(arg).append("\" "); + if (!arg.isEmpty()) { + argsList.add(arg); + } + } + argsStr.append(']'); + log.debug(argsStr.toString()); + return argsList; + } + + /** + * Convert milliseconds to human time -the exact format is unspecified + * @param milliseconds a time in milliseconds + * @return a time that is converted to human intervals + */ + public static String millisToHumanTime(long milliseconds) { + StringBuilder sb = new StringBuilder(); + // Send all output to the Appendable object sb + Formatter formatter = new Formatter(sb, Locale.US); + + long s = Math.abs(milliseconds / 1000); + long m = Math.abs(milliseconds % 1000); + if (milliseconds > 0) { + formatter.format("%d.%03ds", s, m); + } else if (milliseconds == 0) { + formatter.format("0"); + } else { + formatter.format("-%d.%03ds", s, m); + } + return sb.toString(); + } + + public static InetSocketAddress getURIAddress(URI uri) { + String host = uri.getHost(); + int port = uri.getPort(); + return new InetSocketAddress(host, port); + } + + + /** + * Get the localhost -may be null + * @return the localhost if known + */ + public static InetAddress getLocalHost() { + InetAddress localHost; + try { + localHost = InetAddress.getLocalHost(); + } catch (UnknownHostException e) { + localHost = null; + } + return localHost; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java new file mode 100644 index 0000000..b1ff792 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/PortProbe.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; + +/** + * Probe for a port being open + */ +public class PortProbe extends Probe { + protected static final Logger log = LoggerFactory.getLogger(PortProbe.class); + private final String host; + private final int port; + private final int timeout; + + public PortProbe(String host, int port, int timeout, String name, Configuration conf) + throws IOException { + super("Port probe " + name + " " + host + ":" + port + " for " + timeout + "ms", + conf); + this.host = host; + this.port = port; + this.timeout = timeout; + } + + public static PortProbe createPortProbe(Configuration conf, + String hostname, + int port) throws IOException { + PortProbe portProbe = new PortProbe(hostname, + port, + conf.getInt( + PORT_PROBE_CONNECT_TIMEOUT, + PORT_PROBE_CONNECT_TIMEOUT_DEFAULT), + "", + conf); + + return portProbe; + } + + @Override + public void init() throws IOException { + if (port >= 65536) { + throw new IOException("Port is out of range: " + port); + } + InetAddress target; + if (host != null) { + log.debug("looking up host " + host); + target = InetAddress.getByName(host); + } else { + log.debug("Host is null, retrieving localhost address"); + target = InetAddress.getLocalHost(); + } + log.info("Checking " + target + ":" + port); + } + + /** + * Try to connect to the (host,port); a failure to connect within + * the specified timeout is a failure + * @param livePing is the ping live: true for live; false for boot time + * @return the outcome + */ + @Override + public ProbeStatus ping(boolean livePing) { + ProbeStatus status = new ProbeStatus(); + InetSocketAddress sockAddr = new InetSocketAddress(host, port); + Socket socket = new Socket(); + try { + if (log.isDebugEnabled()) { + log.debug("Connecting to " + sockAddr.toString() + " connection-timeout=" + + MonitorUtils.millisToHumanTime(timeout)); + } + socket.connect(sockAddr, timeout); + status.succeed(this); + } catch (IOException e) { + String error = "Probe " + sockAddr + " failed: " + e; + log.debug(error, e); + status.fail(this, + new IOException(error, e)); + } finally { + IOUtils.closeSocket(socket); + } + return status; + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java new file mode 100644 index 0000000..be4b5ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/Probe.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +/** + * Base class of all probes. + */ +public abstract class Probe implements MonitorKeys { + + protected final Configuration conf; + private String name; + + // ======================================================= + /* + * These fields are all used by the probe loops + * to maintain state. Please Leave them alone. + */ + public int successCount; + public int failureCount; + public long bootstrapStarted; + public long bootstrapFinished; + private boolean booted = false; + + // ======================================================= + + /** + * Create a probe of a specific name + * + * @param name probe name + * @param conf configuration being stored. + */ + public Probe(String name, Configuration conf) { + this.name = name; + this.conf = conf; + } + + + protected void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + + @Override + public String toString() { + return getName() + + " {" + + "successCount=" + successCount + + ", failureCount=" + failureCount + + '}'; + } + + /** + * perform any prelaunch initialization + */ + public void init() throws IOException { + + } + + /** + * Ping the endpoint. All exceptions must be caught and included in the + * (failure) status. + * + * @param livePing is the ping live: true for live; false for boot time + * @return the status + */ + public abstract ProbeStatus ping(boolean livePing); + + public void beginBootstrap() { + bootstrapStarted = System.currentTimeMillis(); + } + + public void endBootstrap() { + setBooted(true); + bootstrapFinished = System.currentTimeMillis(); + } + + public boolean isBooted() { + return booted; + } + + public void setBooted(boolean booted) { + this.booted = booted; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeFailedException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeFailedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeFailedException.java new file mode 100644 index 0000000..f09b848 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeFailedException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +/** + * An exception to raise on a probe failure + */ +public class ProbeFailedException extends Exception { + + public final ProbeStatus status; + + public ProbeFailedException(String text, ProbeStatus status) { + super((text == null ? "Probe Failed" : (text + ": ")) + status, status.getThrown()); + this.status = status; + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeInterruptedException.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeInterruptedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeInterruptedException.java new file mode 100644 index 0000000..5a02f46 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeInterruptedException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +/** + * This exception is raised when the probe loop detects that it has been requested to stop + * + */ +public class ProbeInterruptedException extends Exception { + + public ProbeInterruptedException() { + super("Probe Interrupted"); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbePhase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbePhase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbePhase.java new file mode 100644 index 0000000..d87c81b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbePhase.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +/** + * Probe phases. The names are for strings; the index is the order in which things happen; + * -any state can got to terminating directly. + */ +public enum ProbePhase { + INIT("Initializing", 0), + DEPENDENCY_CHECKING("Dependencies", 1), + BOOTSTRAPPING("Bootstrapping", 2), + LIVE("Live", 3), + TERMINATING("Terminating", 4); + + private final String name; + private final int index; + + ProbePhase(String name, int index) { + this.name = name; + this.index = index; + } + + public String getName() { + return name; + } + + public int getIndex() { + return index; + } + + /** + * How many phases are there? + */ + public static final int PHASE_COUNT = TERMINATING.index + 1; + + @Override + public String toString() { + return name; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeReportHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeReportHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeReportHandler.java new file mode 100644 index 0000000..36c20c8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeReportHandler.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +/** + * This interface is for use by the Poll Workers to send events to the reporters. + * + * It is up the reporters what to do with the specific events. + */ +public interface ProbeReportHandler { + + /** + * The probe process has changed state. + * @param probePhase the new process phrase + */ + void probeProcessStateChange(ProbePhase probePhase); + + /** + * Report a probe outcome + * @param phase the current phase of probing + * @param status the probe status + */ + void probeResult(ProbePhase phase, ProbeStatus status); + + /** + * A probe has failed + */ + void probeFailure(ProbeFailedException exception); + + /** + * A probe has just booted + * @param status probe status + */ + void probeBooted(ProbeStatus status); + + boolean commence(String name, String description); + + void unregister(); + + /** + * A heartbeat event should be raised + * @param status the probe status + */ + void heartbeat(ProbeStatus status); + + /** + * A probe has timed out + * @param currentPhase the current execution phase + * @param probe the probe that timed out + * @param lastStatus the last status that was successfully received -which is implicitly + * not the status of the timed out probe + * @param currentTime the current time + */ + void probeTimedOut(ProbePhase currentPhase, + Probe probe, + ProbeStatus lastStatus, + long currentTime); + + /** + * Event to say that the live probe cycle completed so the entire + * system can be considered functional. + */ + void liveProbeCycleCompleted(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java new file mode 100644 index 0000000..653f479 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeStatus.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import java.io.Serializable; +import java.util.Date; + +/** + * Status message of a probe. This is designed to be sent over the wire, though the exception + * Had better be unserializable at the far end if that is to work. + */ +public final class ProbeStatus implements Serializable { + + private long timestamp; + private String timestampText; + private boolean success; + private boolean realOutcome; + private String message; + private Throwable thrown; + private transient Probe originator; + private ProbePhase probePhase; + + public ProbeStatus() { + } + + public ProbeStatus(long timestamp, String message, Throwable thrown) { + this.success = false; + this.message = message; + this.thrown = thrown; + setTimestamp(timestamp); + } + + public ProbeStatus(long timestamp, String message) { + this.success = true; + setTimestamp(timestamp); + this.message = message; + this.thrown = null; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + timestampText = new Date(timestamp).toString(); + } + + public boolean isSuccess() { + return success; + } + + /** + * Set both the success and the real outcome bits to the same value + * @param success the new value + */ + public void setSuccess(boolean success) { + this.success = success; + realOutcome = success; + } + + public String getTimestampText() { + return timestampText; + } + + public boolean getRealOutcome() { + return realOutcome; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public Throwable getThrown() { + return thrown; + } + + public void setThrown(Throwable thrown) { + this.thrown = thrown; + } + + public ProbePhase getProbePhase() { + return probePhase; + } + + public void setProbePhase(ProbePhase probePhase) { + this.probePhase = probePhase; + } + + /** + * Get the probe that generated this result. May be null + * @return a possibly null reference to a probe + */ + public Probe getOriginator() { + return originator; + } + + /** + * The probe has succeeded -capture the current timestamp, set + * success to true, and record any other data needed. + * @param probe probe + */ + public void succeed(Probe probe) { + finish(probe, true, probe.getName(), null); + } + + /** + * A probe has failed either because the test returned false, or an exception + * was thrown. The {@link #success} field is set to false, any exception + * thrown is recorded. + * @param probe probe that failed + * @param thrown an exception that was thrown. + */ + public void fail(Probe probe, Throwable thrown) { + finish(probe, false, "Failure in " + probe, thrown); + } + + public void finish(Probe probe, boolean succeeded, String text, Throwable thrown) { + setTimestamp(System.currentTimeMillis()); + setSuccess(succeeded); + originator = probe; + message = text; + this.thrown = thrown; + } + + @Override + public String toString() { + LogEntryBuilder builder = new LogEntryBuilder("Probe Status"); + builder.elt("time", timestampText) + .elt("phase", probePhase) + .elt("outcome", (success ? "success" : "failure")); + + if (success != realOutcome) { + builder.elt("originaloutcome", (realOutcome ? "success" : "failure")); + } + builder.elt("message", message); + if (thrown != null) { + builder.elt("exception", thrown); + } + + return builder.toString(); + } + + public boolean inPhase(ProbePhase phase) { + return getProbePhase().equals(phase); + } + + /** + * Flip the success bit on while the real outcome bit is kept false + */ + public void markAsSuccessful() { + success = true; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeWorker.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeWorker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeWorker.java new file mode 100644 index 0000000..f64ec8d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ProbeWorker.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * This is the entry point to do work. A list of probes is taken in, in order of + * booting. Once live they go to the live probes list. + * + * The dependency probes are a set of probes for dependent services, all of which + * must be live before boot probes commence. + * + * The boot probes are executed and are allowed to fail; failure is interpreted as "not yet live" + * + * Once all boot probes are live, the live list is used for probes; these must not fail. + * + * There is no timeout on dependency probe bootstrap time, because of the notion that + * restarting this service will have no effect on the dependencies. + */ + +public class ProbeWorker implements Runnable { + protected static final Logger log = LoggerFactory.getLogger(ProbeWorker.class); + + public static final String FAILED_TO_BOOT = "Monitored service failed to bootstrap after "; + public static final String FAILURE_OF_A_LIVE_PROBE_DURING_BOOTSTRAPPING = "Failure of a live probe during bootstrapping"; + private final List<Probe> monitorProbes; + private final List<Probe> dependencyProbes; + public final int interval; + protected volatile ProbeStatus lastStatus; + protected volatile ProbeStatus lastFailingBootstrapProbe; + protected volatile Probe currentProbe; + private volatile boolean mustExit; + private final int bootstrapTimeout; + private long bootstrapEndtime; + + private ProbeReportHandler reportHandler; + private volatile ProbePhase probePhase = ProbePhase.INIT; + + /** + * Create a probe worker + * @param monitorProbes list of probes that must boot and then go live -after which + * they must stay live. + * @param dependencyProbes the list of dependency probes that must all succeed before + * any attempt to probe the direct probe list is performed. Once the + * dependency phase has completed, these probes are never checked again. + * @param interval probe interval in milliseconds. + * @param bootstrapTimeout timeout for bootstrap in milliseconds + */ + public ProbeWorker(List<Probe> monitorProbes, List<Probe> dependencyProbes, int interval, int bootstrapTimeout) { + this.monitorProbes = monitorProbes; + this.dependencyProbes = dependencyProbes != null ? dependencyProbes : new ArrayList<Probe>(0); + this.interval = interval; + lastStatus = new ProbeStatus(now(), + "Initial status"); + lastStatus.setProbePhase(ProbePhase.INIT); + this.bootstrapTimeout = bootstrapTimeout; + } + + public void init() throws IOException { + for (Probe probe : monitorProbes) { + probe.init(); + } + for (Probe probe : dependencyProbes) { + probe.init(); + } + } + + public void setReportHandler(ProbeReportHandler reportHandler) { + this.reportHandler = reportHandler; + } + + public void setMustExit() { + this.mustExit = true; + } + + public ProbeStatus getLastStatus() { + return lastStatus; + } + + public synchronized Probe getCurrentProbe() { + return currentProbe; + } + + public ProbePhase getProbePhase() { + return probePhase; + } + + /** + * Enter the new process state, and report it to the report handler. + * This is synchronized just to make sure there isn't more than one + * invocation at the same time. + * @param status the new process status + */ + private synchronized void enterProbePhase(ProbePhase status) { + this.probePhase = status; + if (reportHandler != null) { + reportHandler.probeProcessStateChange(status); + } + } + + /** + * Report the probe status to the listener -setting the probe phase field + * before doing so. + * The value is also stored in the {@link #lastStatus} field + * @param status the new status + */ + private void reportProbeStatus(ProbeStatus status) { + ProbePhase phase = getProbePhase(); + status.setProbePhase(phase); + lastStatus = status; + reportHandler.probeResult(phase, status); + } + + /** + * Ping one probe. Logs the operation at debug level; sets the field <code>currentProbe</code> + * to the probe for the duration of the operation -this is used when identifying the + * cause of a hung reporting loop + * @param probe probe to ping + * @param live flag to indicate whether or not the operation is live or bootstrapping + * @return the status of the ping + * @throws ProbeInterruptedException if the probe has been told to exit + */ + private ProbeStatus ping(Probe probe, boolean live) throws ProbeInterruptedException { + if (log.isDebugEnabled()) { + log.debug("Executing " + probe); + } + checkForExitRequest(); + currentProbe = probe; + try { + return probe.ping(live); + } finally { + currentProbe = null; + } + } + + /** + * Check for an exit request -and convert it to an exception if made + * @throws ProbeInterruptedException iff {@link #mustExit} is true + */ + private void checkForExitRequest() throws ProbeInterruptedException { + if (mustExit) { + throw new ProbeInterruptedException(); + } + } + + /** + * Check the dependencies. + * The moment a failing test is reached the call returns without + * any reporting. + * + * All successful probes are reported, so as to keep the heartbeats happy. + * + * @return the status of the last dependency check. If this is a success + * them every probe passed. + */ + private ProbeStatus checkDependencyProbes() throws ProbeInterruptedException { + ProbeStatus status = null; + for (Probe dependency : dependencyProbes) { + //ping them, making clear they are not to run any bootstrap logic + status = ping(dependency, true); + + if (!status.isSuccess()) { + //the first failure means the rest of the list can be skipped + break; + } + reportProbeStatus(status); + } + //return the last status + return status; + } + + /** + * Run through all the dependency probes and report their outcomes successes (even if they fail) + * @return true iff all the probes have succeeded. + * @throws ProbeInterruptedException if the process was interrupted. + */ + public boolean checkAndReportDependencyProbes() throws ProbeInterruptedException { + ProbeStatus status; + status = checkDependencyProbes(); + if (status != null && !status.isSuccess()) { + //during dependency checking, a failure is still reported as a success + status.markAsSuccessful(); + reportProbeStatus(status); + //then return without checking anything else + return false; + } + //all dependencies are done. + return true; + } + + /** + * Begin bootstrapping by telling each probe that they have started. + * This sets the timeouts up, as well as permits any other set-up actions + * to begin. + */ + private void beginBootstrapProbes() { + synchronized (this) { + bootstrapEndtime = now() + bootstrapTimeout; + } + for (Probe probe : monitorProbes) { + probe.beginBootstrap(); + } + } + + private long now() { + return System.currentTimeMillis(); + } + + + /** + * Check the bootstrap probe list. All successful probes get reported. + * The first unsuccessful probe will be returned and not reported (left for policy upstream). + * If the failing probe has timed out, that is turned into a {@link ProbeFailedException} + * @return the last (unsuccessful) probe, or null if they all succeeded + * @throws ProbeInterruptedException interrupts + * @throws ProbeFailedException on a boot timeout + */ + private boolean checkBootstrapProbes() throws ProbeInterruptedException, ProbeFailedException { + verifyBootstrapHasNotTimedOut(); + + boolean probeFailed = false; + //now run through all the bootstrap probes + for (Probe probe : monitorProbes) { + //ping them + ProbeStatus status = ping(probe, false); + if (!status.isSuccess()) { + probeFailed = true; + lastFailingBootstrapProbe = status; + probe.failureCount++; + if (log.isDebugEnabled()) { + log.debug("Booting probe failed: " + status); + } + //at this point check to see if the timeout has occurred -and if so, force in the last probe status. + + //this is a failure but not a timeout + //during boot, a failure of a probe that hasn't booted is still reported as a success + if (!probe.isBooted()) { + //so the success bit is flipped + status.markAsSuccessful(); + reportProbeStatus(status); + } else { + //the probe had booted but then it switched to failing + + //update the status unedited + reportProbeStatus(status); + //then fail + throw raiseProbeFailure(status, FAILURE_OF_A_LIVE_PROBE_DURING_BOOTSTRAPPING); + } + } else { + //this probe is working + if (!probe.isBooted()) { + //if it is new, mark it as live + if (log.isDebugEnabled()) { + log.debug("Booting probe is now live: " + probe); + } + probe.endBootstrap(); + //tell the report handler that another probe has booted + reportHandler.probeBooted(status); + } + //push out its status + reportProbeStatus(status); + probe.successCount++; + } + } + return !probeFailed; + } + + + public int getBootstrapTimeout() { + return bootstrapTimeout; + } + + /** + * This checks that bootstrap operations have not timed out + * @throws ProbeFailedException if the bootstrap has failed + */ + public void verifyBootstrapHasNotTimedOut() throws ProbeFailedException { + //first step -look for a timeout + if (isBootstrapTimeExceeded()) { + String text = FAILED_TO_BOOT + + MonitorUtils.millisToHumanTime(bootstrapTimeout); + + ProbeStatus status; + if (lastFailingBootstrapProbe != null) { + status = lastFailingBootstrapProbe; + status.setSuccess(false); + } else { + status = new ProbeStatus(); + status.finish(null, false, text, null); + } + + throw raiseProbeFailure(status, + text); + } + } + + /** + * predicate that gets current time and checks for its time being exceeded. + * @return true iff the current time is > the end time + */ + public synchronized boolean isBootstrapTimeExceeded() { + return now() > bootstrapEndtime; + } + + /** + * run through all the bootstrap probes and see if they are live. + * @return true iff all boot probes succeeded + * @throws ProbeInterruptedException the probe interruption flags + * @throws ProbeFailedException if a probe failed. + */ + public boolean checkAndReportBootstrapProbes() throws ProbeInterruptedException, + ProbeFailedException { + if (bootstrapTimeout <= 0) { + //there is no period of grace for bootstrapping probes, so return true saying + //this phase is complete + return true; + } + //now the bootstrapping probes + return checkBootstrapProbes(); + } + + + /** + * run through all the live probes, pinging and reporting them. + * A single probe failure is turned into an exception + * @throws ProbeFailedException a probe failed + * @throws ProbeInterruptedException the probe process was explicitly interrupted + */ + protected void checkAndReportLiveProbes() throws ProbeFailedException, ProbeInterruptedException { + ProbeStatus status = null; + //go through the live list + if (log.isDebugEnabled()) { + log.debug("Checking live probes"); + } + for (Probe probe : monitorProbes) { + status = ping(probe, true); + reportProbeStatus(status); + if (!status.isSuccess()) { + throw raiseProbeFailure(status, "Failure of probe in \"live\" monitor"); + } + probe.successCount++; + } + //here all is well, so notify the reporter + reportHandler.liveProbeCycleCompleted(); + } + + /** + * Run the set of probes relevant for this phase of the probe lifecycle. + * @throws ProbeFailedException a probe failed + * @throws ProbeInterruptedException the probe process was explicitly interrupted + */ + protected void executeProbePhases() throws ProbeFailedException, ProbeInterruptedException { + switch (probePhase) { + case INIT: + enterProbePhase(ProbePhase.DEPENDENCY_CHECKING); + //fall through straight into the dependency check + case DEPENDENCY_CHECKING: + if (checkAndReportDependencyProbes()) { + enterProbePhase(ProbePhase.BOOTSTRAPPING); + beginBootstrapProbes(); + } + break; + case BOOTSTRAPPING: + if (checkAndReportBootstrapProbes()) { + enterProbePhase(ProbePhase.LIVE); + } + break; + case LIVE: + checkAndReportLiveProbes(); + break; + + case TERMINATING: + default: + //do nothing. + break; + } + } + + + /** + * Raise a probe failure; injecting the phase into the status result first + * + * @param status ping result + * @param text optional text -null or "" means "none" + * @return an exception ready to throw + */ + private ProbeFailedException raiseProbeFailure(ProbeStatus status, String text) { + status.setProbePhase(probePhase); + log.info("Probe failed: " + status); + return new ProbeFailedException(text, status); + } + + @Override + public void run() { + int size = monitorProbes.size(); + log.info("Probe Worker Starting; " + size + " probe" + MonitorUtils.toPlural(size) + ":"); + enterProbePhase(ProbePhase.DEPENDENCY_CHECKING); + for (Probe probe : monitorProbes) { + log.info(probe.getName()); + } + while (!mustExit) { + try { + Thread.sleep(interval); + executeProbePhases(); + } catch (ProbeFailedException e) { + //relay to the inner loop handler + probeFailed(e); + } catch (InterruptedException interrupted) { + break; + } catch (ProbeInterruptedException e) { + //exit raised. + //this will be true, just making extra-sure + break; + } + } + log.info("Probe Worker Exiting"); + enterProbePhase(ProbePhase.TERMINATING); + } + + + protected void probeFailed(ProbeFailedException e) { + reportHandler.probeFailure(e); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ReportingLoop.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ReportingLoop.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ReportingLoop.java new file mode 100644 index 0000000..096838d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/ReportingLoop.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * This is the monitor service + */ +public final class ReportingLoop implements Runnable, ProbeReportHandler, MonitorKeys, Closeable { + protected static final Logger log = LoggerFactory.getLogger(ReportingLoop.class); + private final ProbeWorker worker; + private final Thread workerThread; + private final int reportInterval; + private final int probeTimeout; + private final int bootstrapTimeout; + private ProbeReportHandler reporter; + private final String name; + private volatile boolean mustExit; + + public ReportingLoop(String name, + ProbeReportHandler reporter, + List<Probe> probes, + List<Probe> dependencyProbes, + int probeInterval, + int reportInterval, + int probeTimeout, + int bootstrapTimeout) throws IOException { + this(name, + reporter, + new ProbeWorker(probes, dependencyProbes, probeInterval, bootstrapTimeout), + reportInterval, + probeTimeout); + } + + /** + * Create a new reporting loop -and bond the worker's ProbeReportHandler + * to us + * @param name + * @param reporter + * @param worker + * @param reportInterval + * @param probeTimeout + */ + public ReportingLoop(String name, + ProbeReportHandler reporter, + ProbeWorker worker, + int reportInterval, + int probeTimeout) throws IOException { + this.name = name; + this.reporter = reporter; + this.reportInterval = reportInterval; + this.probeTimeout = probeTimeout; + this.worker = worker; + this.bootstrapTimeout = worker.getBootstrapTimeout(); + worker.setReportHandler(this); + workerThread = new Thread(worker, "probe thread - " + name); + worker.init(); + } + + public int getBootstrapTimeout() { + return bootstrapTimeout; + } + + public ReportingLoop withReporter(ProbeReportHandler reporter) { + assert this.reporter == null : "attempting to reassign reporter "; + assert reporter != null : "new reporter is null"; + this.reporter = reporter; + return this; + } + + /** + * Start the monitoring. + * + * @return false if the monitoring did not start and that the worker threads + * should be run up. + */ + public boolean startReporting() { + String description = "Service Monitor for " + name + ", probe-interval= " + + MonitorUtils.millisToHumanTime(worker.interval) + + ", report-interval=" + MonitorUtils.millisToHumanTime(reportInterval) + + ", probe-timeout=" + timeoutToStr(probeTimeout) + + ", bootstrap-timeout=" + timeoutToStr(bootstrapTimeout); + log.info("Starting reporting" + + " to " + reporter + + description); + return reporter.commence(name, description); + } + + private String timeoutToStr(int timeout) { + return timeout >= 0 ? MonitorUtils.millisToHumanTime(timeout) : "not set"; + } + + private void startWorker() { + log.info("Starting reporting worker thread "); + workerThread.setDaemon(true); + workerThread.start(); + } + + + /** + * This exits the process cleanly + */ + @Override + public void close() { + log.info("Stopping reporting"); + mustExit = true; + if (worker != null) { + worker.setMustExit(); + workerThread.interrupt(); + } + if (reporter != null) { + reporter.unregister(); + } + } + + @Override + public void probeFailure(ProbeFailedException exception) { + reporter.probeFailure(exception); + } + + @Override + public void probeProcessStateChange(ProbePhase probePhase) { + reporter.probeProcessStateChange(probePhase); + } + + @Override + public void probeBooted(ProbeStatus status) { + reporter.probeBooted(status); + } + + private long now() { + return System.currentTimeMillis(); + } + + @Override + public void probeResult(ProbePhase phase, ProbeStatus status) { + reporter.probeResult(phase, status); + } + + @Override + public boolean commence(String n, String description) { + return true; + } + + @Override + public void unregister() { + } + + @Override + public void heartbeat(ProbeStatus status) { + } + + @Override + public void probeTimedOut(ProbePhase currentPhase, Probe probe, ProbeStatus lastStatus, + long currentTime) { + } + + @Override + public void liveProbeCycleCompleted() { + //delegate to the reporter + reporter.liveProbeCycleCompleted(); + } + + /** + * The reporting loop + */ + void reportingLoop() { + + while (!mustExit) { + try { + ProbeStatus workerStatus = worker.getLastStatus(); + long now = now(); + long lastStatusIssued = workerStatus.getTimestamp(); + long timeSinceLastStatusIssued = now - lastStatusIssued; + //two actions can occur here: a heartbeat is issued or a timeout reported. + //this flag decides which + boolean heartbeat; + + //based on phase, decide whether to heartbeat or timeout + ProbePhase probePhase = worker.getProbePhase(); + switch (probePhase) { + case DEPENDENCY_CHECKING: + //no timeouts in dependency phase + heartbeat = true; + break; + + case BOOTSTRAPPING: + //the timeout here is fairly straightforward: heartbeats are + //raised while the worker hasn't timed out + heartbeat = bootstrapTimeout < 0 || timeSinceLastStatusIssued < bootstrapTimeout; + + break; + + case LIVE: + //use the probe timeout interval between the current time + //and the time the last status event was received. + heartbeat = timeSinceLastStatusIssued < probeTimeout; + break; + + case INIT: + case TERMINATING: + default: + //send a heartbeat, because this isn't the time to be failing + heartbeat = true; + } + if (heartbeat) { + //a heartbeat is sent to the reporter + reporter.heartbeat(workerStatus); + } else { + //no response from the worker -it is hung. + reporter.probeTimedOut(probePhase, + worker.getCurrentProbe(), + workerStatus, + now + ); + } + + //now sleep + Thread.sleep(reportInterval); + + } catch (InterruptedException e) { + //interrupted -always exit the loop. + break; + } + } + //this point is reached if and only if a clean exit was requested or something failed. + } + + /** + * This can be run in a separate thread, or it can be run directly from the caller. + * Test runs do the latter, HAM runs multiple reporting threads. + */ + @Override + public void run() { + try { + startWorker(); + reportingLoop(); + } catch (RuntimeException e) { + log.warn("Failure in the reporting loop: " + e, e); + //rethrow so that inline code can pick it up (e.g. test runs) + throw e; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/YarnApplicationProbe.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/YarnApplicationProbe.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/YarnApplicationProbe.java new file mode 100644 index 0000000..adf613c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/servicemonitor/YarnApplicationProbe.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.servicemonitor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.slider.client.SliderYarnClientImpl; +import org.apache.slider.core.exceptions.UnknownApplicationInstanceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +/** + * Probe for YARN application + */ +public class YarnApplicationProbe extends Probe { + protected static final Logger log = LoggerFactory.getLogger( + YarnApplicationProbe.class); + + /** + * Yarn client service + */ + private SliderYarnClientImpl yarnClient; + private final String clustername; + private final String username; + + public YarnApplicationProbe(String clustername, + SliderYarnClientImpl yarnClient, + String name, + Configuration conf, String username) + throws IOException { + super("Port probe " + name + " " + clustername, + conf); + this.clustername = clustername; + this.yarnClient = yarnClient; + this.username = username; + } + + + @Override + public void init() throws IOException { + + log.info("Checking " + clustername ); + } + + /** + * Try to connect to the (host,port); a failure to connect within + * the specified timeout is a failure + * @param livePing is the ping live: true for live; false for boot time + * @return the outcome + */ + @Override + public ProbeStatus ping(boolean livePing) { + + ProbeStatus status = new ProbeStatus(); + try { + + List<ApplicationReport> instances = + yarnClient.listDeployedInstances(username); + ApplicationReport instance = + yarnClient.findClusterInInstanceList(instances, clustername); + if (null == instance) { + throw UnknownApplicationInstanceException.unknownInstance(clustername); + } + + status.succeed(this); + } catch (Exception e) { + status.fail(this, e); + } + return status; + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/security/AbstractSecurityStoreGenerator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/security/AbstractSecurityStoreGenerator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/security/AbstractSecurityStoreGenerator.java new file mode 100644 index 0000000..11d3aa1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/security/AbstractSecurityStoreGenerator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.slider.server.services.security; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; +import org.apache.slider.common.SliderKeys; +import org.apache.slider.core.conf.MapOperations; +import org.apache.slider.core.exceptions.SliderException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * + */ +public abstract class AbstractSecurityStoreGenerator implements + SecurityStoreGenerator { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractSecurityStoreGenerator.class); + + protected CertificateManager certificateMgr; + + public AbstractSecurityStoreGenerator(CertificateManager certificateMgr) { + this.certificateMgr = certificateMgr; + } + + protected String getStorePassword(Map<String, List<String>> credentials, + MapOperations compOps, String role) + throws SliderException, IOException { + String password = getPassword(compOps); + if (password == null) { + // need to leverage credential provider + String alias = getAlias(compOps); + LOG.debug("Alias {} found for role {}", alias, role); + if (alias == null) { + throw new SliderException("No store password or credential provider " + + "alias found"); + } + if (credentials.isEmpty()) { + LOG.info("Credentials can not be retrieved for store generation since " + + "no CP paths are configured"); + } + synchronized (this) { + for (Map.Entry<String, List<String>> cred : credentials.entrySet()) { + String provider = cred.getKey(); + Configuration c = new Configuration(); + c.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, provider); + LOG.debug("Configured provider {}", provider); + CredentialProvider cp = + CredentialProviderFactory.getProviders(c).get(0); + LOG.debug("Aliases: {}", cp.getAliases()); + char[] credential = c.getPassword(alias); + if (credential != null) { + LOG.info("Credential found for role {}", role); + return String.valueOf(credential); + } + } + } + + if (password == null) { + LOG.info("No store credential found for alias {}. " + + "Generation of store for {} is not possible.", alias, role); + + } + } + + return password; + + } + + @Override + public boolean isStoreRequested(MapOperations compOps) { + return compOps.getOptionBool(SliderKeys.COMP_STORES_REQUIRED_KEY, false); + } + + abstract String getPassword(MapOperations compOps); + + abstract String getAlias(MapOperations compOps); +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org