Repository: samza Updated Branches: refs/heads/master 9d6831bd1 -> baf9faa1c
SAMZA-850 : Yarn Job Validation Tool Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/baf9faa1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/baf9faa1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/baf9faa1 Branch: refs/heads/master Commit: baf9faa1cc6170a89ea336326f85b2678772eccd Parents: 9d6831b Author: Xinyu Liu <[email protected]> Authored: Tue Mar 29 16:30:44 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Tue Mar 29 16:30:44 2016 -0700 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../apache/samza/metrics/MetricsAccessor.java | 54 ++++++ .../MetricsValidationFailureException.java | 32 ++++ .../apache/samza/metrics/MetricsValidator.java | 48 +++++ .../org/apache/samza/job/model/JobModel.java | 14 ++ .../samza/metrics/JmxMetricsAccessor.java | 93 +++++++++ .../java/org/apache/samza/metrics/JmxUtil.java | 59 ++++++ .../samza/metrics/reporter/JmxReporter.scala | 26 +-- .../samza/metrics/TestJmxMetricsAccessor.java | 93 +++++++++ samza-shell/src/main/bash/validate-yarn-job.sh | 21 +++ .../samza/validation/YarnJobValidationTool.java | 189 +++++++++++++++++++ .../samza/validation/MockMetricsValidator.java | 50 +++++ .../validation/TestYarnJobValidationTool.java | 142 ++++++++++++++ 13 files changed, 797 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 53cb8b4..b5bd365 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -104,6 +104,7 @@ <subpackage name="metrics"> <allow pkg="org.apache.samza.config" /> <allow pkg="org.apache.samza.util" /> + <allow pkg="org.apache.samza.container" /> </subpackage> <subpackage name="task"> http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java new file mode 100644 index 0000000..8bd75cd --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsAccessor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import java.util.Map; + + +/** + * A MetricsAccessor allows users to retrieve metric values, based on group name and metric name, + * though specific metrics system, such as JMX. + */ +public interface MetricsAccessor { + /** + * Get the values of a counter + * @param group Group for the counter, e.g. org.apache.samza.container.SamzaContainerMetrics + * @param counter Name of the counter, e.g. commit-calls + * @return A map of counter values, keyed by type, e.g. {"samza-container-0": 100L} + */ + Map<String, Long> getCounterValues(String group, String counter); + + /** + * Get the values of a gauge + * @param group Group for the gauge, e.g. org.apache.samza.container.SamzaContainerMetrics + * @param gauge Name of the gauge, e.g. event-loop-utilization + * @param <T> Type of the gauge value, e.g. Double + * @return A map of gauge values, keyed by type, e.g. {"samza-container-0": 0.8} + */ + <T> Map<String, T> getGaugeValues(String group, String gauge); + + /** + * Get the values of a timer + * @param group Group for the timer, e.g. org.apache.samza.container.SamzaContainerMetrics + * @param timer Name of the timer, e.g. choose-ns + * @return A map of timer values, keyed by type, e.g. {"samza-container-0": 10.5} + */ + Map<String, Double> getTimerValues(String group, String timer); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java new file mode 100644 index 0000000..00c96f3 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidationFailureException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +/** + * Thrown when the metrics validation fails. See {@link org.apache.samza.metrics.MetricsValidator}. + */ +public class MetricsValidationFailureException extends Exception { + public MetricsValidationFailureException(String message) { + super(message); + } + public MetricsValidationFailureException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java new file mode 100644 index 0000000..27e7a1c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/metrics/MetricsValidator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import org.apache.samza.config.Config; + + +/** + * A MetricsValidator reads the job's metrics values by using the {@link org.apache.samza.metrics.MetricsAccessor}, + * and validate them. + */ +public interface MetricsValidator { + /** + * Initialize with config. + * @param config Job config + */ + void init(Config config); + + /** + * Validate the metrics values of a job + * @param accessor Accessor to get the metrics values through specific metrics system, e.g. JMX. + * @throws MetricsValidationFailureException Exception when the validation fails. + */ + void validate(MetricsAccessor accessor) throws MetricsValidationFailureException; + + /** + * Complete validation. Final checks can be performed here. + * @throws MetricsValidationFailureException Exception when the validation fails. + */ + void complete() throws MetricsValidationFailureException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java index 9445a30..dbd6dcc 100644 --- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java +++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java @@ -103,6 +103,20 @@ public class JobModel { return mappings.get(key); } + public Map<Integer, String> getAllContainerToHostValues(String key) { + if (localityManager == null) { + return Collections.EMPTY_MAP; + } + Map<Integer, String> allValues = new HashMap<>(); + for (Map.Entry<Integer, Map<String, String>> entry : localityManager.readContainerLocality().entrySet()) { + String value = entry.getValue().get(key); + if (value != null) { + allValues.put(entry.getKey(), value); + } + } + return allValues; + } + private void populateContainerLocalityMappings() { Map<Integer, Map<String, String>> allMappings = localityManager.readContainerLocality(); for (Integer containerId: containers.keySet()) { http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java b/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java new file mode 100644 index 0000000..1581d12 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/metrics/JmxMetricsAccessor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * JMX metrics accessor. + * It connects to a container JMX url,and get metrics values by querying the MBeans. + */ +public class JmxMetricsAccessor implements MetricsAccessor { + private static final Logger log = LoggerFactory.getLogger(JmxMetricsAccessor.class); + + private final String url; + private JMXConnector jmxc; + + public JmxMetricsAccessor(String url) { + this.url = url; + } + + public void connect() throws IOException { + JMXServiceURL jmxUrl = new JMXServiceURL(url); + jmxc = JMXConnectorFactory.connect(jmxUrl, null); + } + + public void close() throws IOException { + jmxc.close(); + } + + private <T> Map<String, T> getMetricValues(String group, String metric, String attribute) { + try { + StringBuilder nameBuilder = new StringBuilder(); + nameBuilder.append(JmxUtil.makeNameJmxSafe(group)); + nameBuilder.append(":type=*,name="); + nameBuilder.append(JmxUtil.makeNameJmxSafe(metric)); + ObjectName query = new ObjectName(nameBuilder.toString()); + Map<String, T> values = new HashMap<>(); + MBeanServerConnection conn = jmxc.getMBeanServerConnection(); + for (ObjectName objName : conn.queryNames(query, null)) { + String type = objName.getKeyProperty("type"); + T val = (T) conn.getAttribute(objName, attribute); + values.put(type, val); + } + return values; + } catch (Exception e) { + log.error(e.getMessage(), e); + return Collections.EMPTY_MAP; + } + } + + @Override + public Map<String, Long> getCounterValues(String group, String counter) { + return getMetricValues(group, counter, "Count"); + } + + @Override + public <T> Map<String, T> getGaugeValues(String group, String gauge) { + return getMetricValues(group, gauge, "Value"); + } + + @Override + public Map<String, Double> getTimerValues(String group, String timer) { + return getMetricValues(group, timer, "AverageTime"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java b/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java new file mode 100644 index 0000000..6080f7c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/metrics/JmxUtil.java @@ -0,0 +1,59 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package org.apache.samza.metrics; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class to create JMX related objects + */ +public class JmxUtil { + private static final Logger log = LoggerFactory.getLogger(JmxUtil.class); + + public static ObjectName getObjectName(String group, String name, String t) throws MalformedObjectNameException { + StringBuilder nameBuilder = new StringBuilder(); + nameBuilder.append(makeNameJmxSafe(group)); + nameBuilder.append(":type="); + nameBuilder.append(makeNameJmxSafe(t)); + nameBuilder.append(",name="); + nameBuilder.append(makeNameJmxSafe(name)); + ObjectName objName = new ObjectName(nameBuilder.toString()); + log.debug("Resolved name for " + group + ", " + name + ", " + t + " to: " + objName); + return objName; + } + + /* + * JMX only has ObjectName.quote, which is pretty nasty looking. This + * function escapes without quoting, using the rules outlined in: + * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html + */ + public static String makeNameJmxSafe(String str) { + return str + .replace(",", "_") + .replace("=", "_") + .replace(":", "_") + .replace("\"", "_") + .replace("*", "_") + .replace("?", "_"); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala index e966102..63123ff 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala @@ -33,6 +33,7 @@ import org.apache.samza.metrics.ReadableMetricsRegistry import org.apache.samza.metrics.ReadableMetricsRegistryListener import scala.collection.JavaConversions._ import org.apache.samza.metrics.MetricsVisitor +import org.apache.samza.metrics.JmxUtil._ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { var sources = Map[ReadableMetricsRegistry, String]() @@ -84,31 +85,6 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging { } } - def getObjectName(group: String, name: String, t: String) = { - val nameBuilder = new StringBuilder - nameBuilder.append(makeNameJmxSafe(group)) - nameBuilder.append(":type=") - nameBuilder.append(makeNameJmxSafe(t)) - nameBuilder.append(",name=") - nameBuilder.append(makeNameJmxSafe(name)) - val objName = new ObjectName(nameBuilder.toString) - debug("Resolved name for %s, %s, %s to: %s" format (group, name, t, objName)) - objName - } - - /* - * JMX only has ObjectName.quote, which is pretty nasty looking. This - * function escapes without quoting, using the rules outlined in: - * http://docs.oracle.com/javase/1.5.0/docs/api/javax/management/ObjectName.html - */ - def makeNameJmxSafe(str: String) = str - .replace(",", "_") - .replace("=", "_") - .replace(":", "_") - .replace("\"", "_") - .replace("*", "_") - .replace("?", "_") - def registerBean(bean: MetricMBean) { if (!server.isRegistered(bean.objectName)) { debug("Registering MBean for %s." format bean.objectName) http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java b/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java new file mode 100644 index 0000000..5de2cc7 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/metrics/TestJmxMetricsAccessor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.metrics; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.QueryExp; +import javax.management.remote.JMXConnector; +import org.apache.samza.container.SamzaContainerMetrics; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestJmxMetricsAccessor { + private JmxMetricsAccessor jmxMetricsAccessor; + private Set<ObjectName> objectNames; + private MBeanServerConnection conn; + + @Before + public void setup() throws Exception { + jmxMetricsAccessor = new JmxMetricsAccessor("dummyurl"); + JMXConnector jmxc = mock(JMXConnector.class); + conn = mock(MBeanServerConnection.class); + when(jmxc.getMBeanServerConnection()).thenReturn(conn); + objectNames = new HashSet<>(); + when(conn.queryNames(any(ObjectName.class), any(QueryExp.class))).thenReturn(objectNames); + Field jmxcField = JmxMetricsAccessor.class.getDeclaredField("jmxc"); + jmxcField.setAccessible(true); + jmxcField.set(jmxMetricsAccessor, jmxc); + } + + @Test + public void testGetCounterValues() throws Exception { + ObjectName counterObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "commit-calls", "samza-container-0"); + objectNames.add(counterObject); + Long commitCalls = 100L; + when(conn.getAttribute(counterObject, "Count")).thenReturn(commitCalls); + + Map<String, Long> result = jmxMetricsAccessor.getCounterValues(SamzaContainerMetrics.class.getName(), + "commit-calls"); + assertTrue(result.size() == 1); + assertTrue(result.get("samza-container-0").equals(commitCalls)); + } + + @Test + public void testGetGaugeValues() throws Exception { + ObjectName gaugeObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "event-loop-utilization", "samza-container-1"); + objectNames.add(gaugeObject); + Double loopUtil = 0.8; + when(conn.getAttribute(gaugeObject, "Value")).thenReturn(loopUtil); + + Map<String, Double> result = jmxMetricsAccessor.getGaugeValues(SamzaContainerMetrics.class.getName(), "event-loop-utilization"); + assertTrue(result.size() == 1); + assertTrue(result.get("samza-container-1").equals(loopUtil)); + } + + @Test + public void testGetTimerValues() throws Exception { + ObjectName timerObject = JmxUtil.getObjectName(SamzaContainerMetrics.class.getName(), "choose-ns", "samza-container-2"); + objectNames.add(timerObject); + Double time = 42.42; + when(conn.getAttribute(timerObject, "AverageTime")).thenReturn(time); + + Map<String, Double> result = jmxMetricsAccessor.getTimerValues(SamzaContainerMetrics.class.getName(), "choose-ns"); + assertTrue(result.size() == 1); + assertTrue(result.get("samza-container-2").equals(time)); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-shell/src/main/bash/validate-yarn-job.sh ---------------------------------------------------------------------- diff --git a/samza-shell/src/main/bash/validate-yarn-job.sh b/samza-shell/src/main/bash/validate-yarn-job.sh new file mode 100644 index 0000000..8273a32 --- /dev/null +++ b/samza-shell/src/main/bash/validate-yarn-job.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[[ $JAVA_OPTS != *-Dlog4j.configuration* ]] && export JAVA_OPTS="$JAVA_OPTS -Dlog4j.configuration=file:$(dirname $0)/log4j-console.xml" + +exec $(dirname $0)/run-class.sh org.apache.samza.validation.YarnJobValidationTool "$@" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java new file mode 100644 index 0000000..70f1e4f --- /dev/null +++ b/samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.validation; + +import java.util.Map; +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.coordinator.JobCoordinator; +import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; +import org.apache.samza.job.yarn.ClientHelper; +import org.apache.samza.metrics.JmxMetricsAccessor; +import org.apache.samza.metrics.MetricsValidator; +import org.apache.samza.util.hadoop.HttpFileSystem; +import org.apache.samza.util.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Command-line tool for validating the status of a Yarn job. + * It checks the job has been successfully submitted to the Yarn cluster, the status of + * the application attempt is running and the running container count matches the expectation. + * It also supports an optional MetricsValidator plugin through arguments so job metrics can + * be validated too using JMX. This tool can be used, for example, as an automated validation + * step after starting a job. + * + * When running this tool, please provide the configuration URI of job. For example: + * + * deploy/samza/bin/validate-yarn-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties [--metrics-validator=com.foo.bar.SomeMetricsValidator] + * + * The tool prints out the validation result in each step and throws an exception when the + * validation fails. + */ +public class YarnJobValidationTool { + private static final Logger log = LoggerFactory.getLogger(YarnJobValidationTool.class); + + private final JobConfig config; + private final YarnClient client; + private final String jobName; + private final MetricsValidator validator; + + public YarnJobValidationTool(JobConfig config, YarnClient client, MetricsValidator validator) { + this.config = config; + this.client = client; + String name = this.config.getName().get(); + String jobId = this.config.getJobId().nonEmpty()? this.config.getJobId().get() : "1"; + this.jobName = name + "_" + jobId; + this.validator = validator; + } + + public void run() { + ApplicationId appId; + ApplicationAttemptId attemptId; + + try { + log.info("Start validating job " + this.jobName); + + appId = validateAppId(); + attemptId = validateRunningAttemptId(appId); + validateContainerCount(attemptId); + if(validator != null) { + validateJmxMetrics(); + } + + log.info("End of validation"); + } catch (Exception e) { + log.error(e.getMessage(), e); + System.exit(1); + } + } + + public ApplicationId validateAppId() throws Exception { + // fetch only the last created application with the job name and id + // i.e. get the application with max appId + ApplicationId appId = null; + for(ApplicationReport applicationReport : this.client.getApplications()) { + if(applicationReport.getName().equals(this.jobName)) { + ApplicationId id = applicationReport.getApplicationId(); + if(appId == null || appId.compareTo(id) < 0) { + appId = id; + } + } + } + if (appId != null) { + log.info("Job lookup success. ApplicationId " + appId.toString()); + return appId; + } else { + throw new SamzaException("Job lookup failure " + this.jobName); + } + } + + public ApplicationAttemptId validateRunningAttemptId(ApplicationId appId) throws Exception { + ApplicationAttemptId attemptId = this.client.getApplicationReport(appId).getCurrentApplicationAttemptId(); + ApplicationAttemptReport attemptReport = this.client.getApplicationAttemptReport(attemptId); + if (attemptReport.getYarnApplicationAttemptState() == YarnApplicationAttemptState.RUNNING) { + log.info("Job is running. AttempId " + attemptId.toString()); + return attemptId; + } else { + throw new SamzaException("Job not running " + this.jobName); + } + } + + public int validateContainerCount(ApplicationAttemptId attemptId) throws Exception { + int runningContainerCount = 0; + for(ContainerReport containerReport : this.client.getContainers(attemptId)) { + if(containerReport.getContainerState() == ContainerState.RUNNING) { + ++runningContainerCount; + } + } + // expected containers to be the configured job containers plus the AppMaster container + int containerExpected = this.config.getContainerCount() + 1; + + if (runningContainerCount == containerExpected) { + log.info("Container count matches. " + runningContainerCount + " containers are running."); + return runningContainerCount; + } else { + throw new SamzaException("Container count does not match. " + runningContainerCount + " containers are running, while " + containerExpected + " is expected."); + } + } + + public void validateJmxMetrics() throws Exception { + JobCoordinator jobCoordinator = JobCoordinator.apply(config); + validator.init(config); + Map<Integer, String> jmxUrls = jobCoordinator.jobModel().getAllContainerToHostValues(SetContainerHostMapping.JMX_TUNNELING_URL_KEY); + for (Map.Entry<Integer, String> entry : jmxUrls.entrySet()) { + Integer containerId = entry.getKey(); + String jmxUrl = entry.getValue(); + log.info("validate container " + containerId + " metrics with JMX: " + jmxUrl); + JmxMetricsAccessor jmxMetrics = new JmxMetricsAccessor(jmxUrl); + jmxMetrics.connect(); + validator.validate(jmxMetrics); + jmxMetrics.close(); + log.info("validate container " + containerId + " successfully"); + } + validator.complete(); + } + + public static void main(String [] args) throws Exception { + CommandLine cmdline = new CommandLine(); + OptionParser parser = cmdline.parser(); + OptionSpec<String> validatorOpt = parser.accepts("metrics-validator", "The metrics validator class.") + .withOptionalArg() + .ofType(String.class).describedAs("com.foo.bar.ClassName"); + OptionSet options = cmdline.parser().parse(args); + Config config = cmdline.loadConfig(options); + MetricsValidator validator = null; + if (options.has(validatorOpt)) { + String validatorClass = options.valueOf(validatorOpt); + validator = (MetricsValidator) Class.forName(validatorClass).newInstance(); + } + + YarnConfiguration hadoopConfig = new YarnConfiguration(); + hadoopConfig.set("fs.http.impl", HttpFileSystem.class.getName()); + hadoopConfig.set("fs.https.impl", HttpFileSystem.class.getName()); + ClientHelper clientHelper = new ClientHelper(hadoopConfig); + + new YarnJobValidationTool(new JobConfig(config), clientHelper.yarnClient(), validator).run(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java b/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java new file mode 100644 index 0000000..c3cf935 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/validation/MockMetricsValidator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.validation; + +import java.util.Map; +import org.apache.samza.config.Config; +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.metrics.MetricsAccessor; +import org.apache.samza.metrics.MetricsValidationFailureException; +import org.apache.samza.metrics.MetricsValidator; + + +public class MockMetricsValidator implements MetricsValidator { + + @Override + public void init(Config config) { + } + + @Override + public void validate(MetricsAccessor accessor) throws MetricsValidationFailureException { + Map<String, Long> commitCalls = accessor.getCounterValues(SamzaContainerMetrics.class.getName(), "commit-calls"); + if(commitCalls.isEmpty()) throw new MetricsValidationFailureException("no value"); + for(Map.Entry<String, Long> entry: commitCalls.entrySet()) { + if(entry.getValue() <= 0) { + throw new MetricsValidationFailureException("commit call <= 0"); + } + } + } + + @Override + public void complete() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/baf9faa1/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java ---------------------------------------------------------------------- diff --git a/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java b/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java new file mode 100644 index 0000000..7a8d291 --- /dev/null +++ b/samza-yarn/src/test/java/org/apache/samza/validation/TestYarnJobValidationTool.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.validation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.config.MapConfig; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.samza.container.SamzaContainerMetrics; +import org.apache.samza.metrics.JmxMetricsAccessor; +import org.apache.samza.metrics.MetricsValidationFailureException; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.*; + +public class TestYarnJobValidationTool { + private YarnClient client; + private YarnJobValidationTool tool; + private String jobName = "test"; + private int jobId = 1; + private ApplicationId appId; + ApplicationAttemptId attemptId; + private int containerCount = 9; + private Config config = new MapConfig(new HashMap<String, String>() { + { + put("job.name", jobName); + put("job.id", String.valueOf(jobId)); + put("yarn.container.count", String.valueOf(containerCount)); + } + }); + private MockMetricsValidator validator = new MockMetricsValidator(); + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Before + public void setup() throws Exception { + client = mock(YarnClient.class); + tool = new YarnJobValidationTool(new JobConfig(config), client, validator); + appId = mock(ApplicationId.class); + when(appId.getId()).thenReturn(1111); + attemptId = mock(ApplicationAttemptId.class); + when(attemptId.getApplicationId()).thenReturn(appId); + when(attemptId.getAttemptId()).thenReturn(2222); + } + + @Test + public void testValidateAppId() throws Exception { + ApplicationReport appReport = mock(ApplicationReport.class); + when(appReport.getName()).thenReturn(jobName + "_" + jobId); + when(appReport.getApplicationId()).thenReturn(appId); + when(client.getApplications()).thenReturn(Collections.singletonList(appReport)); + assertTrue(tool.validateAppId().equals(appId)); + + when(appReport.getName()).thenReturn("dummy"); + exception.expect(SamzaException.class); + tool.validateAppId(); + } + + @Test + public void testValidateRunningAttemptId() throws Exception { + ApplicationReport appReport = mock(ApplicationReport.class); + when(client.getApplicationReport(appId)).thenReturn(appReport); + when(appReport.getCurrentApplicationAttemptId()).thenReturn(attemptId); + ApplicationAttemptReport attemptReport = mock(ApplicationAttemptReport.class); + when(attemptReport.getYarnApplicationAttemptState()).thenReturn(YarnApplicationAttemptState.RUNNING); + when(attemptReport.getApplicationAttemptId()).thenReturn(attemptId); + when(client.getApplicationAttemptReport(attemptId)).thenReturn(attemptReport); + assertTrue(tool.validateRunningAttemptId(appId).equals(attemptId)); + + when(attemptReport.getYarnApplicationAttemptState()).thenReturn(YarnApplicationAttemptState.FAILED); + exception.expect(SamzaException.class); + tool.validateRunningAttemptId(appId); + } + + @Test + public void testValidateContainerCount() throws Exception { + List<ContainerReport> containerReports = new ArrayList<>(); + for (int i = 0; i <= containerCount; i++) { + ContainerReport report = mock(ContainerReport.class); + when(report.getContainerState()).thenReturn(ContainerState.RUNNING); + containerReports.add(report); + } + when(client.getContainers(attemptId)).thenReturn(containerReports); + assertTrue(tool.validateContainerCount(attemptId) == (containerCount + 1)); + + containerReports.remove(0); + exception.expect(SamzaException.class); + tool.validateContainerCount(attemptId); + } + + @Test + public void testValidateJmxMetrics() throws MetricsValidationFailureException { + JmxMetricsAccessor jmxMetricsAccessor = mock(JmxMetricsAccessor.class); + Map<String, Long> values = new HashMap<>(); + values.put("samza-container-0", 100L); + when(jmxMetricsAccessor.getCounterValues(SamzaContainerMetrics.class.getName(), "commit-calls")).thenReturn(values); + validator.validate(jmxMetricsAccessor); + + values.put("samza-container-0", -1L); + // the mock validator will fail if the commit-calls are less than or equal to 0 + exception.expect(MetricsValidationFailureException.class); + validator.validate(jmxMetricsAccessor); + } +}
