Repository: storm Updated Branches: refs/heads/1.x-branch 38c7ff866 -> c5433b1c6
http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 507614b..1f4a63e 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -1515,6 +1515,22 @@ public class Config extends HashMap<String, Object> { public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark"; /** + * A list of classes implementing IClusterMetricsConsumer (See storm.yaml.example for exact config format). + * Each listed class will be routed cluster related metrics data. + * Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and they're executed in Nimbus. + * Only consumers which run in leader Nimbus receives metrics data. + */ + @isListEntryCustom(entryValidatorClasses={ClusterMetricRegistryValidator.class}) + public static final String STORM_CLUSTER_METRICS_CONSUMER_REGISTER = "storm.cluster.metrics.consumer.register"; + + /** + * How often cluster metrics data is published to metrics consumer. + */ + @NotNull + @isPositiveNumber + public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = "storm.cluster.metrics.consumer.publish.interval.secs"; + + /** * A list of users that are allowed to interact with the topology. To use this set * nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer */ @@ -1709,7 +1725,6 @@ public class Config extends HashMap<String, Object> { * Each listed class will be routed all the metrics data generated by the storm metrics API. * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. */ - @isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class}) public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register"; http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java new file mode 100644 index 0000000..9bf7481 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric; + +import com.google.common.util.concurrent.MoreExecutors; +import org.apache.storm.Config; +import org.apache.storm.metric.api.DataPoint; +import org.apache.storm.metric.api.IClusterMetricsConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ClusterMetricsConsumerExecutor { + public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class); + + private IClusterMetricsConsumer metricsConsumer; + private String consumerClassName; + private Object registrationArgument; + + public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) { + this.consumerClassName = consumerClassName; + this.registrationArgument = registrationArgument; + } + + public void prepare() { + try { + metricsConsumer = (IClusterMetricsConsumer)Class.forName(consumerClassName).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate a class listed in config under section " + + Config.STORM_CLUSTER_METRICS_CONSUMER_REGISTER + " with fully qualified name " + consumerClassName, e); + } + + metricsConsumer.prepare(registrationArgument); + } + + public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) { + try { + metricsConsumer.handleDataPoints(clusterInfo, dataPoints); + } catch (Throwable e) { + LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); + } + } + + public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) { + try { + metricsConsumer.handleDataPoints(supervisorInfo, dataPoints); + } catch (Throwable e) { + LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e); + } + } + + public void cleanup() { + metricsConsumer.cleanup(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java new file mode 100644 index 0000000..db47d84 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric; + +import org.apache.storm.metric.api.DataPoint; +import org.apache.storm.metric.api.IClusterMetricsConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +/** + * Listens for cluster related metrics, dumps them to log + * + * To use, edit the storm.yaml config file: + * + * ```yaml + * storm.cluster.metrics.register: + * - class: "org.apache.storm.metrics.LoggingClusterMetricsConsumer" + * ``` + * + */ +public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer { + public static final Logger LOG = LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class); + + static private String padding = " "; + + @Override + public void prepare(Object registrationArgument) { + } + + @Override + public void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints) { + StringBuilder sb = new StringBuilder(); + String header = String.format("%d\t%15s\t%40s\t", + clusterInfo.getTimestamp(), "<cluster>", "<cluster>"); + sb.append(header); + logDataPoints(dataPoints, sb, header); + } + + @Override + public void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints) { + StringBuilder sb = new StringBuilder(); + String header = String.format("%d\t%15s\t%40s\t", + supervisorInfo.getTimestamp(), + supervisorInfo.getSrcSupervisorHost(), + supervisorInfo.getSrcSupervisorId()); + sb.append(header); + for (DataPoint p : dataPoints) { + sb.delete(header.length(), sb.length()); + sb.append(p.getName()) + .append(padding).delete(header.length()+23,sb.length()).append("\t") + .append(p.getValue()); + LOG.info(sb.toString()); + } + } + + @Override + public void cleanup() { + } + + private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder sb, String header) { + for (DataPoint p : dataPoints) { + sb.delete(header.length(), sb.length()); + sb.append(p.getName()) + .append(padding).delete(header.length()+23,sb.length()).append("\t") + .append(p.getValue()); + LOG.info(sb.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java new file mode 100644 index 0000000..45e077a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.api; + +public class DataPoint { + private String name; + private Object value; + + public DataPoint() {} + + public DataPoint(String name, Object value) { + this.name = name; + this.value = value; + } + + @Override + public String toString() { + return "[" + name + " = " + value + "]"; + } + + public String getName() { + return name; + } + + public Object getValue() { + return value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java new file mode 100644 index 0000000..39d60f3 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.api; + +import java.util.Collection; + +public interface IClusterMetricsConsumer { + class ClusterInfo { + private long timestamp; + + public ClusterInfo(long timestamp) { + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + } + + class SupervisorInfo { + private String srcSupervisorHost; + private String srcSupervisorId; + private long timestamp; + + public SupervisorInfo(String srcSupervisorHost, String srcSupervisorId, long timestamp) { + this.srcSupervisorHost = srcSupervisorHost; + this.srcSupervisorId = srcSupervisorId; + this.timestamp = timestamp; + } + + public String getSrcSupervisorHost() { + return srcSupervisorHost; + } + + public String getSrcSupervisorId() { + return srcSupervisorId; + } + + public long getTimestamp() { + return timestamp; + } + } + + void prepare(Object registrationArgument); + void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> dataPoints); + void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> dataPoints); + void cleanup(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java index f475659..fe9b7f9 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java +++ b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java @@ -40,6 +40,8 @@ public interface IMetricsConsumer { public long timestamp; public int updateIntervalSecs; } + + // We can't move this to outside without breaking backward compatibility. public static class DataPoint { public DataPoint() {} public DataPoint(String name, Object value) { http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java index edff5cf..fb220dd 100644 --- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java @@ -457,6 +457,22 @@ public class ConfigValidation { } } + public static class ClusterMetricRegistryValidator extends Validator { + + @Override + public void validateField(String name, Object o) { + if(o == null) { + return; + } + SimpleTypeValidator.validateField(name, Map.class, o); + if(!((Map) o).containsKey("class") ) { + throw new IllegalArgumentException( "Field " + name + " must have map entry with key: class"); + } + + SimpleTypeValidator.validateField(name, String.class, ((Map) o).get("class")); + } + } + public static class MetricRegistryValidator extends Validator { @Override
