Repository: storm
Updated Branches:
  refs/heads/1.x-branch 38c7ff866 -> c5433b1c6


http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/Config.java 
b/storm-core/src/jvm/org/apache/storm/Config.java
index 507614b..1f4a63e 100644
--- a/storm-core/src/jvm/org/apache/storm/Config.java
+++ b/storm-core/src/jvm/org/apache/storm/Config.java
@@ -1515,6 +1515,22 @@ public class Config extends HashMap<String, Object> {
     public static final String 
BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
 
     /**
+     * A list of classes implementing IClusterMetricsConsumer (See 
storm.yaml.example for exact config format).
+     * Each listed class will be routed cluster related metrics data.
+     * Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and 
they're executed in Nimbus.
+     * Only consumers which run in leader Nimbus receives metrics data.
+     */
+    
@isListEntryCustom(entryValidatorClasses={ClusterMetricRegistryValidator.class})
+    public static final String STORM_CLUSTER_METRICS_CONSUMER_REGISTER = 
"storm.cluster.metrics.consumer.register";
+
+    /**
+     * How often cluster metrics data is published to metrics consumer.
+     */
+    @NotNull
+    @isPositiveNumber
+    public static final String 
STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = 
"storm.cluster.metrics.consumer.publish.interval.secs";
+
+    /**
      * A list of users that are allowed to interact with the topology.  To use 
this set
      * nimbus.authorizer to 
org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
      */
@@ -1709,7 +1725,6 @@ public class Config extends HashMap<String, Object> {
      * Each listed class will be routed all the metrics data generated by the 
storm metrics API.
      * Each listed class maps 1:1 to a system bolt named 
__metrics_ClassName#N, and it's parallelism is configurable.
      */
-
     @isListEntryCustom(entryValidatorClasses={MetricRegistryValidator.class})
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = 
"topology.metrics.consumer.register";
 

http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
 
b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
new file mode 100644
index 0000000..9bf7481
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metric/ClusterMetricsConsumerExecutor.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.DataPoint;
+import org.apache.storm.metric.api.IClusterMetricsConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+public class ClusterMetricsConsumerExecutor {
+    public static final Logger LOG = 
LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);
+
+    private IClusterMetricsConsumer metricsConsumer;
+    private String consumerClassName;
+    private Object registrationArgument;
+
+    public ClusterMetricsConsumerExecutor(String consumerClassName, Object 
registrationArgument) {
+        this.consumerClassName = consumerClassName;
+        this.registrationArgument = registrationArgument;
+    }
+
+    public void prepare() {
+        try {
+            metricsConsumer = 
(IClusterMetricsConsumer)Class.forName(consumerClassName).newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate a class listed 
in config under section " +
+                    Config.STORM_CLUSTER_METRICS_CONSUMER_REGISTER + " with 
fully qualified name " + consumerClassName, e);
+        }
+
+        metricsConsumer.prepare(registrationArgument);
+    }
+
+    public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo 
clusterInfo, final Collection<DataPoint> dataPoints) {
+        try {
+            metricsConsumer.handleDataPoints(clusterInfo, dataPoints);
+        } catch (Throwable e) {
+            LOG.error("Error while handling cluster data points, consumer 
class: " + consumerClassName, e);
+        }
+    }
+
+    public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo 
supervisorInfo, final Collection<DataPoint> dataPoints) {
+        try {
+            metricsConsumer.handleDataPoints(supervisorInfo, dataPoints);
+        } catch (Throwable e) {
+            LOG.error("Error while handling cluster data points, consumer 
class: " + consumerClassName, e);
+        }
+    }
+
+    public void cleanup() {
+        metricsConsumer.cleanup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java 
b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
new file mode 100644
index 0000000..db47d84
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metric/LoggingClusterMetricsConsumer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import org.apache.storm.metric.api.DataPoint;
+import org.apache.storm.metric.api.IClusterMetricsConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Listens for cluster related metrics, dumps them to log
+ *
+ * To use, edit the storm.yaml config file:
+ *
+ * ```yaml
+ *   storm.cluster.metrics.register:
+ *     - class: "org.apache.storm.metrics.LoggingClusterMetricsConsumer"
+ * ```
+ *
+ */
+public class LoggingClusterMetricsConsumer implements IClusterMetricsConsumer {
+    public static final Logger LOG = 
LoggerFactory.getLogger(LoggingClusterMetricsConsumer.class);
+
+    static private String padding = "                       ";
+
+    @Override
+    public void prepare(Object registrationArgument) {
+    }
+
+    @Override
+    public void handleDataPoints(ClusterInfo clusterInfo, 
Collection<DataPoint> dataPoints) {
+        StringBuilder sb = new StringBuilder();
+        String header = String.format("%d\t%15s\t%40s\t",
+                clusterInfo.getTimestamp(), "<cluster>", "<cluster>");
+        sb.append(header);
+        logDataPoints(dataPoints, sb, header);
+    }
+
+    @Override
+    public void handleDataPoints(SupervisorInfo supervisorInfo, 
Collection<DataPoint> dataPoints) {
+        StringBuilder sb = new StringBuilder();
+        String header = String.format("%d\t%15s\t%40s\t",
+                supervisorInfo.getTimestamp(),
+                supervisorInfo.getSrcSupervisorHost(),
+                supervisorInfo.getSrcSupervisorId());
+        sb.append(header);
+        for (DataPoint p : dataPoints) {
+            sb.delete(header.length(), sb.length());
+            sb.append(p.getName())
+                    
.append(padding).delete(header.length()+23,sb.length()).append("\t")
+                    .append(p.getValue());
+            LOG.info(sb.toString());
+        }
+    }
+
+    @Override
+    public void cleanup() {
+    }
+
+    private void logDataPoints(Collection<DataPoint> dataPoints, StringBuilder 
sb, String header) {
+        for (DataPoint p : dataPoints) {
+            sb.delete(header.length(), sb.length());
+            sb.append(p.getName())
+                    
.append(padding).delete(header.length()+23,sb.length()).append("\t")
+                    .append(p.getValue());
+            LOG.info(sb.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java 
b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java
new file mode 100644
index 0000000..45e077a
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/metric/api/DataPoint.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.api;
+
+public class DataPoint {
+    private String name;
+    private Object value;
+
+    public DataPoint() {}
+
+    public DataPoint(String name, Object value) {
+        this.name = name;
+        this.value = value;
+    }
+
+    @Override
+    public String toString() {
+        return "[" + name + " = " + value + "]";
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Object getValue() {
+        return value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java 
b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java
new file mode 100644
index 0000000..39d60f3
--- /dev/null
+++ 
b/storm-core/src/jvm/org/apache/storm/metric/api/IClusterMetricsConsumer.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.api;
+
+import java.util.Collection;
+
+public interface IClusterMetricsConsumer {
+    class ClusterInfo {
+        private long timestamp;
+
+        public ClusterInfo(long timestamp) {
+            this.timestamp = timestamp;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    class SupervisorInfo {
+        private String srcSupervisorHost;
+        private String srcSupervisorId;
+        private long timestamp;
+
+        public SupervisorInfo(String srcSupervisorHost, String 
srcSupervisorId, long timestamp) {
+            this.srcSupervisorHost = srcSupervisorHost;
+            this.srcSupervisorId = srcSupervisorId;
+            this.timestamp = timestamp;
+        }
+
+        public String getSrcSupervisorHost() {
+            return srcSupervisorHost;
+        }
+
+        public String getSrcSupervisorId() {
+            return srcSupervisorId;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+    }
+
+    void prepare(Object registrationArgument);
+    void handleDataPoints(ClusterInfo clusterInfo, Collection<DataPoint> 
dataPoints);
+    void handleDataPoints(SupervisorInfo supervisorInfo, Collection<DataPoint> 
dataPoints);
+    void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java 
b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
index f475659..fe9b7f9 100644
--- a/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
+++ b/storm-core/src/jvm/org/apache/storm/metric/api/IMetricsConsumer.java
@@ -40,6 +40,8 @@ public interface IMetricsConsumer {
         public long timestamp;
         public int updateIntervalSecs; 
     }
+
+    // We can't move this to outside without breaking backward compatibility.
     public static class DataPoint {
         public DataPoint() {}
         public DataPoint(String name, Object value) {

http://git-wip-us.apache.org/repos/asf/storm/blob/622b6d13/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
----------------------------------------------------------------------
diff --git 
a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java 
b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
index edff5cf..fb220dd 100644
--- a/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
+++ b/storm-core/src/jvm/org/apache/storm/validation/ConfigValidation.java
@@ -457,6 +457,22 @@ public class ConfigValidation {
         }
     }
 
+    public static class ClusterMetricRegistryValidator extends Validator {
+
+        @Override
+        public void validateField(String name, Object o) {
+            if(o == null) {
+                return;
+            }
+            SimpleTypeValidator.validateField(name, Map.class, o);
+            if(!((Map) o).containsKey("class") ) {
+                throw new IllegalArgumentException( "Field " + name + " must 
have map entry with key: class");
+            }
+
+            SimpleTypeValidator.validateField(name, String.class, ((Map) 
o).get("class"));
+        }
+    }
+
     public static class MetricRegistryValidator extends Validator {
 
         @Override

Reply via email to