Repository: eagle Updated Branches: refs/heads/master 97ae1da52 -> 6e0fc410d
[EAGLE-928] Refine system metric schema design and fix system metric collector https://issues.apache.org/jira/browse/EAGLE-928 * Support new stream `SYSTEM_METRIC_STREAM`: <stream> <streamId>SYSTEM_METRIC_STREAM</streamId> <description>System Metrics Stream including CPU, Network, Disk, etc.</description> <columns> <column> <name>host</name> <type>string</type> </column> <column> <name>timestamp</name> <type>long</type> </column> <column> <name>metric</name> <type>string</type> </column> <column> <name>group</name> <type>string</type> </column> <column> <name>site</name> <type>string</type> </column> <column> <name>device</name> <type>string</type> </column> <column> <name>value</name> <type>double</type> <defaultValue>0.0</defaultValue> </column> </columns> </stream> * Sample Metric Event { 'timestamp': 1487918913569, 'metric': 'system.nic.transmitdrop', 'site': 'sandbox', 'value': 7724.0, 'host': 'sandbox.hortonworks.com', 'device': 'eth0' } * Add `system_metric_collector.py` * Support to persist system metric collector for query * Refactor MetricSchemaEntity and MetricDescriptor Author: Hao Chen <[email protected]> Closes #842 from haoch/FixSystemMetricCollector. Project: http://git-wip-us.apache.org/repos/asf/eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/6e0fc410 Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/6e0fc410 Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/6e0fc410 Branch: refs/heads/master Commit: 6e0fc410d4d148eba238c86d3bb7b2be507f6d82 Parents: 97ae1da Author: Hao Chen <[email protected]> Authored: Tue Feb 28 14:33:55 2017 +0800 Committer: Hao Chen <[email protected]> Committed: Tue Feb 28 14:33:55 2017 +0800 ---------------------------------------------------------------------- .../environment/builder/ApplicationBuilder.java | 256 ++++++++-------- .../environment/builder/MetricDefinition.java | 201 ------------- .../environment/builder/MetricDescriptor.java | 297 +++++++++++++++++++ .../app/environment/impl/StormEnvironment.java | 11 +- .../app/messaging/MetricSchemaGenerator.java | 19 +- .../app/messaging/MetricStreamPersist.java | 38 +-- .../apache/eagle/app/utils/AppConfigUtils.java | 30 ++ .../metadata/model/MetricSchemaEntity.java | 7 +- .../hadoop_jmx_collector/metric_collector.py | 15 + .../system_metric_collector.py | 24 +- .../system_metric_config-sample.json | 11 +- .../eagle/metric/HadoopMetricMonitorApp.java | 38 ++- ...le.metric.HadoopMetricMonitorAppProdiver.xml | 42 ++- .../metric/HadoopMetricMonitorAppDebug.java | 3 + .../src/test/resources/application.conf | 18 +- .../app/dev/public/js/ctrls/metricCtrl.js | 2 +- 16 files changed, 611 insertions(+), 401 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java index 95cf491..88f0886 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/ApplicationBuilder.java @@ -1,127 +1,131 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.builder; - - -import backtype.storm.generated.StormTopology; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import org.apache.eagle.app.environment.impl.StormEnvironment; -import org.apache.eagle.app.messaging.MetricSchemaGenerator; -import org.apache.eagle.app.messaging.MetricStreamPersist; -import org.apache.eagle.app.messaging.StormStreamSource; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * Storm Application Builder DSL. - */ -public class ApplicationBuilder { - private final StormEnvironment environment; - private final Config appConfig; - private final TopologyBuilder topologyBuilder; - private final AtomicInteger identifier; - - public ApplicationBuilder(Config appConfig, StormEnvironment environment) { - this.appConfig = appConfig; - this.environment = environment; - this.identifier = new AtomicInteger(0); - this.topologyBuilder = new TopologyBuilder(); - } - - public class BuilderContext { - public StormTopology toTopology() { - return topologyBuilder.createTopology(); - } - } - - public abstract class InitializedStream extends BuilderContext { - private String id; - - InitializedStream(String id) { - Preconditions.checkNotNull(id); - this.id = id; - } - - String getId() { - return this.id; - } - - /** - * Persist source data stream as metric. - */ - public BuilderContext saveAsMetric(MetricDefinition metricDefinition) { - String metricDataID = generateId("MetricDataSink"); - String metricSchemaID = generateId("MetricSchemaGenerator"); - topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDefinition, appConfig)).shuffleGrouping(getId()); - topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDefinition,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD)); - return this; - } - - public TransformedStream transformBy(TransformFunction function) { - String componentId = generateId(function.getName()); - topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId()); - return new TransformedStream(componentId); - } - } - - public class SourcedStream extends InitializedStream { - private final Config appConfig; - private final StormStreamSource streamSource; - - private SourcedStream(SourcedStream withSourcedStream) { - this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource); - } - - private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) { - super(componentId); - this.appConfig = appConfig; - this.streamSource = streamSource; - topologyBuilder.setSpout(componentId, streamSource); - } - } - - public class TransformedStream extends InitializedStream { - public TransformedStream(String id) { - super(id); - throw new IllegalStateException("TODO: Not implemented yet"); - } - } - - public TopologyBuilder getTopologyBuilder() { - return this.topologyBuilder; - } - - public StormTopology createTopology() { - return topologyBuilder.createTopology(); - } - - - public SourcedStream fromStream(String streamId) { - return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig)); - } - - public SourcedStream fromStream(SourcedStream sourcedStream) { - return new SourcedStream(sourcedStream); - } - - private String generateId(String prefix) { - return String.format("%s_%s", prefix, this.identifier.getAndIncrement()); - } +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + + +import backtype.storm.generated.StormTopology; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.tuple.Fields; +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.messaging.MetricSchemaGenerator; +import org.apache.eagle.app.messaging.MetricStreamPersist; +import org.apache.eagle.app.messaging.StormStreamSource; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Storm Application Builder DSL. + */ +public class ApplicationBuilder { + private final StormEnvironment environment; + private final Config appConfig; + private final TopologyBuilder topologyBuilder; + private final AtomicInteger identifier; + + public ApplicationBuilder(Config appConfig, StormEnvironment environment) { + this.appConfig = appConfig; + this.environment = environment; + this.identifier = new AtomicInteger(0); + this.topologyBuilder = new TopologyBuilder(); + } + + public class BuilderContext { + public StormTopology toTopology() { + return topologyBuilder.createTopology(); + } + + public SourcedStream fromStream(String streamId) { + return ApplicationBuilder.this.fromStream(streamId); + } + } + + public abstract class InitializedStream extends BuilderContext { + private String id; + + InitializedStream(String id) { + Preconditions.checkNotNull(id); + this.id = id; + } + + String getId() { + return this.id; + } + + /** + * Persist source data stream as metric. + */ + public BuilderContext saveAsMetric(MetricDescriptor metricDescriptor) { + String metricDataID = generateId("MetricDataSink"); + String metricSchemaID = generateId("MetricSchemaGenerator"); + topologyBuilder.setBolt(metricDataID, new MetricStreamPersist(metricDescriptor, appConfig)).shuffleGrouping(getId()); + topologyBuilder.setBolt(metricSchemaID, new MetricSchemaGenerator(metricDescriptor,appConfig)).fieldsGrouping(metricDataID,new Fields(MetricStreamPersist.METRIC_NAME_FIELD)); + return this; + } + + public TransformedStream transformBy(TransformFunction function) { + String componentId = generateId(function.getName()); + topologyBuilder.setBolt(componentId, new TransformFunctionBolt(function)).shuffleGrouping(getId()); + return new TransformedStream(componentId); + } + } + + public class SourcedStream extends InitializedStream { + private final Config appConfig; + private final StormStreamSource streamSource; + + private SourcedStream(SourcedStream withSourcedStream) { + this(withSourcedStream.getId(), withSourcedStream.appConfig, withSourcedStream.streamSource); + } + + private SourcedStream(String componentId, Config appConfig, StormStreamSource streamSource) { + super(componentId); + this.appConfig = appConfig; + this.streamSource = streamSource; + topologyBuilder.setSpout(componentId, streamSource); + } + } + + public class TransformedStream extends InitializedStream { + public TransformedStream(String id) { + super(id); + throw new IllegalStateException("TODO: Not implemented yet"); + } + } + + public TopologyBuilder getTopologyBuilder() { + return this.topologyBuilder; + } + + public StormTopology createTopology() { + return topologyBuilder.createTopology(); + } + + + public SourcedStream fromStream(String streamId) { + return new SourcedStream(generateId("SourcedStream-" + streamId), this.appConfig, environment.getStreamSource(streamId, this.appConfig)); + } + + public SourcedStream fromStream(SourcedStream sourcedStream) { + return new SourcedStream(sourcedStream); + } + + private String generateId(String prefix) { + return String.format("%s_%s", prefix, this.identifier.getAndIncrement()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java deleted file mode 100644 index 639d27f..0000000 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDefinition.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.eagle.app.environment.builder; - - -import java.io.Serializable; -import java.util.Arrays; -import java.util.Calendar; -import java.util.List; -import java.util.Map; - -public class MetricDefinition implements Serializable { - - /** - * Support simple and complex name format, by default using "metric" field. - */ - private NameSelector nameSelector = new FieldNameSelector("metric"); - - /** - * Support event/system time, by default using system time. - */ - private TimestampSelector timestampSelector = new SystemTimestampSelector(); - - /** - * Metric dimension field name. - */ - private List<String> dimensionFields; - - /** - * Metric granularity. - */ - private int granularity = Calendar.MINUTE; - - private String metricType = "DEFAULT"; - - /** - * Metric value field name. - */ - private String valueField = "value"; - - public NameSelector getNameSelector() { - return nameSelector; - } - - public void setNameSelector(NameSelector nameSelector) { - this.nameSelector = nameSelector; - } - - public String getValueField() { - return valueField; - } - - public void setValueField(String valueField) { - this.valueField = valueField; - } - - public List<String> getDimensionFields() { - return dimensionFields; - } - - public void setDimensionFields(List<String> dimensionFields) { - this.dimensionFields = dimensionFields; - } - - public TimestampSelector getTimestampSelector() { - return timestampSelector; - } - - public void setTimestampSelector(TimestampSelector timestampSelector) { - this.timestampSelector = timestampSelector; - } - - public int getGranularity() { - return granularity; - } - - public void setGranularity(int granularity) { - this.granularity = granularity; - } - - public String getMetricType() { - return metricType; - } - - public void setMetricType(String metricType) { - this.metricType = metricType; - } - - - @FunctionalInterface - public interface NameSelector extends Serializable { - String getMetricName(Map event); - } - - @FunctionalInterface - public interface TimestampSelector extends Serializable { - Long getTimestamp(Map event); - } - - public MetricDefinition namedBy(NameSelector nameSelector) { - this.setNameSelector(nameSelector); - return this; - } - - /** - * @see java.util.Calendar - */ - public MetricDefinition granularity(int granularity) { - this.setGranularity(granularity); - return this; - } - - public MetricDefinition namedByField(String nameField) { - this.setNameSelector(new FieldNameSelector(nameField)); - return this; - } - - public static MetricDefinition metricType(String metricType) { - MetricDefinition metricDefinition = new MetricDefinition(); - metricDefinition.setMetricType(metricType); - return metricDefinition; - } - - public MetricDefinition eventTimeByField(String timestampField) { - this.setTimestampSelector(new EventTimestampSelector(timestampField)); - return this; - } - - public MetricDefinition dimensionFields(String... dimensionFields) { - this.setDimensionFields(Arrays.asList(dimensionFields)); - return this; - } - - public MetricDefinition valueField(String valueField) { - this.setValueField(valueField); - return this; - } - - public class EventTimestampSelector implements TimestampSelector { - private final String timestampField; - - EventTimestampSelector(String timestampField) { - this.timestampField = timestampField; - } - - @Override - public Long getTimestamp(Map event) { - if (event.containsKey(timestampField)) { - Object timestampValue = event.get(timestampField); - if (timestampValue instanceof Integer) { - return Long.valueOf((Integer) timestampValue); - } - if (timestampValue instanceof String) { - return Long.valueOf((String) timestampValue); - } else { - return (Long) timestampValue; - } - } else { - throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists"); - } - } - } - - public static class SystemTimestampSelector implements TimestampSelector { - @Override - public Long getTimestamp(Map event) { - return System.currentTimeMillis(); - } - } - - public static class FieldNameSelector implements NameSelector { - private final String fieldName; - - FieldNameSelector(String fieldName) { - this.fieldName = fieldName; - } - - @Override - public String getMetricName(Map event) { - if (event.containsKey(fieldName)) { - return (String) event.get(fieldName); - } else { - throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java new file mode 100644 index 0000000..e79e4d7 --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/builder/MetricDescriptor.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.app.environment.builder; + + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Calendar; +import java.util.List; +import java.util.Map; + +public class MetricDescriptor implements Serializable { + + /** + * Support simple and complex name format, by default using "metric" field. + */ + private MetricNameSelector metricNameSelector = new FieldMetricNameSelector("metric"); + + public MetricGroupSelector getMetricGroupSelector() { + return metricGroupSelector; + } + + public void setMetricGroupSelector(MetricGroupSelector metricGroupSelector) { + this.metricGroupSelector = metricGroupSelector; + } + + + private static final String DEFAULT_METRIC_GROUP_NAME = "Default"; + + private MetricGroupSelector metricGroupSelector = new FixedMetricGroupSelector(DEFAULT_METRIC_GROUP_NAME); + private SiteIdSelector siteIdSelector = new FieldSiteIdSelector("site"); + + /** + * Support event/system time, by default using system time. + */ + private TimestampSelector timestampSelector = new SystemTimestampSelector(); + + /** + * Metric dimension field name. + */ + private List<String> dimensionFields; + + /** + * Metric granularity. + */ + private int granularity = Calendar.MINUTE; + + /** + * Metric value field name. + */ + private String valueField = "value"; + + public MetricNameSelector getMetricNameSelector() { + return metricNameSelector; + } + + public void setMetricNameSelector(MetricNameSelector metricNameSelector) { + this.metricNameSelector = metricNameSelector; + } + + public String getValueField() { + return valueField; + } + + public void setValueField(String valueField) { + this.valueField = valueField; + } + + public List<String> getDimensionFields() { + return dimensionFields; + } + + public void setDimensionFields(List<String> dimensionFields) { + this.dimensionFields = dimensionFields; + } + + public TimestampSelector getTimestampSelector() { + return timestampSelector; + } + + public void setTimestampSelector(TimestampSelector timestampSelector) { + this.timestampSelector = timestampSelector; + } + + public int getGranularity() { + return granularity; + } + + public void setGranularity(int granularity) { + this.granularity = granularity; + } + + public SiteIdSelector getSiteIdSelector() { + return siteIdSelector; + } + + public void setSiteIdSelector(SiteIdSelector siteIdSelector) { + this.siteIdSelector = siteIdSelector; + } + + + @FunctionalInterface + public interface MetricNameSelector extends Serializable { + String getMetricName(Map event); + } + + @FunctionalInterface + public interface MetricGroupSelector extends Serializable { + String getMetricGroup(Map event); + } + + public static class FixedMetricGroupSelector implements MetricGroupSelector { + private final String groupName; + + private FixedMetricGroupSelector(String groupName) { + this.groupName = groupName; + } + + @Override + public String getMetricGroup(Map event) { + return groupName; + } + } + + @FunctionalInterface + public interface TimestampSelector extends Serializable { + Long getTimestamp(Map event); + } + + @FunctionalInterface + public interface SiteIdSelector extends Serializable { + String getSiteId(Map event); + } + + public class FixedSiteIdSelector implements SiteIdSelector { + private final String siteId; + + private FixedSiteIdSelector(String siteId) { + this.siteId = siteId; + } + + @Override + public String getSiteId(Map event) { + return this.siteId; + } + } + + private class FieldSiteIdSelector implements SiteIdSelector { + private final String siteIdFieldName; + + public FieldSiteIdSelector(String siteIdFieldName) { + this.siteIdFieldName = siteIdFieldName; + } + + @Override + public String getSiteId(Map event) { + return (String) event.getOrDefault(this.siteIdFieldName, "UNKNOWN"); + } + } + + public MetricDescriptor namedBy(MetricNameSelector metricNameSelector) { + this.setMetricNameSelector(metricNameSelector); + return this; + } + + public MetricDescriptor siteAs(SiteIdSelector siteIdSelector) { + this.setSiteIdSelector(siteIdSelector); + return this; + } + + public MetricDescriptor siteAs(String siteId) { + this.setSiteIdSelector(new FixedSiteIdSelector(siteId)); + return this; + } + + public MetricDescriptor siteByField(String fieldName) { + this.setMetricNameSelector(new FieldMetricNameSelector(fieldName)); + return this; + } + + /** + * @see java.util.Calendar + */ + public MetricDescriptor granularity(int granularity) { + this.setGranularity(granularity); + return this; + } + + public MetricDescriptor namedByField(String nameField) { + this.setMetricNameSelector(new FieldMetricNameSelector(nameField)); + return this; + } + + public static MetricDescriptor metricGroupAs(String metricGroupName) { + return metricGroupAs(new FixedMetricGroupSelector(metricGroupName)); + } + + public static MetricDescriptor metricGroupAs(MetricGroupSelector groupSelector) { + MetricDescriptor metricDescriptor = new MetricDescriptor(); + metricDescriptor.setMetricGroupSelector(groupSelector); + return metricDescriptor; + } + + public static MetricDescriptor metricGroupByField(String fieldName, String defaultGroupName) { + MetricDescriptor metricDescriptor = new MetricDescriptor(); + metricDescriptor.setMetricGroupSelector((MetricGroupSelector) event -> { + if (event.containsKey(fieldName)) { + return (String) event.get(fieldName); + } else { + return defaultGroupName; + } + }); + return metricDescriptor; + } + + public static MetricDescriptor metricGroupByField(String fieldName) { + return metricGroupByField(fieldName, DEFAULT_METRIC_GROUP_NAME); + } + + public MetricDescriptor eventTimeByField(String timestampField) { + this.setTimestampSelector(new EventTimestampSelector(timestampField)); + return this; + } + + public MetricDescriptor dimensionFields(String... dimensionFields) { + this.setDimensionFields(Arrays.asList(dimensionFields)); + return this; + } + + public MetricDescriptor valueField(String valueField) { + this.setValueField(valueField); + return this; + } + + public class EventTimestampSelector implements TimestampSelector { + private final String timestampField; + + EventTimestampSelector(String timestampField) { + this.timestampField = timestampField; + } + + @Override + public Long getTimestamp(Map event) { + if (event.containsKey(timestampField)) { + Object timestampValue = event.get(timestampField); + if (timestampValue instanceof Integer) { + return Long.valueOf((Integer) timestampValue); + } + if (timestampValue instanceof String) { + return Long.valueOf((String) timestampValue); + } else { + return (Long) timestampValue; + } + } else { + throw new IllegalArgumentException("Timestamp field '" + timestampField + "' not exists"); + } + } + } + + public static class SystemTimestampSelector implements TimestampSelector { + @Override + public Long getTimestamp(Map event) { + return System.currentTimeMillis(); + } + } + + public static class FieldMetricNameSelector implements MetricNameSelector { + private final String fieldName; + + FieldMetricNameSelector(String fieldName) { + this.fieldName = fieldName; + } + + @Override + public String getMetricName(Map event) { + if (event.containsKey(fieldName)) { + return (String) event.get(fieldName); + } else { + throw new IllegalArgumentException("Metric name field '" + fieldName + "' not exists: " + event); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java index 942a0ac..6827eef 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java @@ -18,12 +18,11 @@ package org.apache.eagle.app.environment.impl; import org.apache.eagle.app.environment.AbstractEnvironment; import org.apache.eagle.app.environment.builder.ApplicationBuilder; -import org.apache.eagle.app.environment.builder.MetricDefinition; +import org.apache.eagle.app.environment.builder.MetricDescriptor; import org.apache.eagle.app.environment.builder.TransformFunction; import org.apache.eagle.app.environment.builder.TransformFunctionBolt; import org.apache.eagle.app.messaging.*; import com.typesafe.config.Config; -import org.apache.eagle.metadata.model.StreamSourceConfig; /** * Storm Execution Environment Context. @@ -44,16 +43,16 @@ public class StormEnvironment extends AbstractEnvironment { return (StormStreamSource) stream().getSource(streamId,config); } - public MetricStreamPersist getMetricPersist(MetricDefinition metricDefinition, Config config) { - return new MetricStreamPersist(metricDefinition, config); + public MetricStreamPersist getMetricPersist(MetricDescriptor metricDescriptor, Config config) { + return new MetricStreamPersist(metricDescriptor, config); } public EntityStreamPersist getEntityPersist(Config config) { return new EntityStreamPersist(config); } - public MetricSchemaGenerator getMetricSchemaGenerator(MetricDefinition metricDefinition, Config config) { - return new MetricSchemaGenerator(metricDefinition, config); + public MetricSchemaGenerator getMetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) { + return new MetricSchemaGenerator(metricDescriptor, config); } public TransformFunctionBolt getTransformer(TransformFunction function) { http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java index bb29cea..90e6481 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricSchemaGenerator.java @@ -24,7 +24,7 @@ import backtype.storm.tuple.Tuple; import com.typesafe.config.Config; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.metadata.model.MetricSchemaEntity; -import org.apache.eagle.app.environment.builder.MetricDefinition; +import org.apache.eagle.app.environment.builder.MetricDescriptor; import org.apache.eagle.service.client.EagleServiceClientException; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; @@ -40,14 +40,14 @@ public class MetricSchemaGenerator extends BaseRichBolt { public static final String GENERIC_METRIC_VALUE_NAME = "value"; private final HashSet<String> metricNameCache = new HashSet<>(MAX_CACHE_LENGTH); - private final MetricDefinition metricDefinition; + private final MetricDescriptor metricDescriptor; private final Config config; private OutputCollector collector; private IEagleServiceClient client; - public MetricSchemaGenerator(MetricDefinition metricDefinition, Config config) { - this.metricDefinition = metricDefinition; + public MetricSchemaGenerator(MetricDescriptor metricDescriptor, Config config) { + this.metricDescriptor = metricDescriptor; this.config = config; } @@ -63,7 +63,7 @@ public class MetricSchemaGenerator extends BaseRichBolt { String metricName = input.getStringByField(MetricStreamPersist.METRIC_NAME_FIELD); synchronized (metricNameCache) { if (!metricNameCache.contains(metricName)) { - createMetricSchemaEntity(metricName, this.metricDefinition); + createMetricSchemaEntity(metricName, (Map) input.getValueByField(MetricStreamPersist.METRIC_EVENT_FIELD),this.metricDescriptor); metricNameCache.add(metricName); } if (metricNameCache.size() > MAX_CACHE_LENGTH) { @@ -93,14 +93,15 @@ public class MetricSchemaGenerator extends BaseRichBolt { } } - private void createMetricSchemaEntity(String metricName, MetricDefinition metricDefinition) throws IOException, EagleServiceClientException { + private void createMetricSchemaEntity(String metricName, Map event, MetricDescriptor metricDescriptor) throws IOException, EagleServiceClientException { MetricSchemaEntity schemaEntity = new MetricSchemaEntity(); Map<String, String> schemaTags = new HashMap<>(); schemaEntity.setTags(schemaTags); + schemaTags.put(MetricSchemaEntity.METRIC_SITE_TAG, metricDescriptor.getSiteIdSelector().getSiteId(event)); schemaTags.put(MetricSchemaEntity.METRIC_NAME_TAG, metricName); - schemaTags.put(MetricSchemaEntity.METRIC_TYPE_TAG, metricDefinition.getMetricType()); - schemaEntity.setGranularityByField(metricDefinition.getGranularity()); - schemaEntity.setDimensionFields(metricDefinition.getDimensionFields()); + schemaTags.put(MetricSchemaEntity.METRIC_GROUP_TAG, metricDescriptor.getMetricGroupSelector().getMetricGroup(event)); + schemaEntity.setGranularityByField(metricDescriptor.getGranularity()); + schemaEntity.setDimensionFields(metricDescriptor.getDimensionFields()); schemaEntity.setMetricFields(Collections.singletonList(GENERIC_METRIC_VALUE_NAME)); schemaEntity.setModifiedTimestamp(System.currentTimeMillis()); GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(schemaEntity)); http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java index ba99911..c9b43e5 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/MetricStreamPersist.java @@ -24,7 +24,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import com.google.common.base.Preconditions; import com.typesafe.config.Config; -import org.apache.eagle.app.environment.builder.MetricDefinition; +import org.apache.eagle.app.environment.builder.MetricDescriptor; import org.apache.eagle.app.utils.StreamConvertHelper; import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.log.entity.GenericMetricEntity; @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -44,6 +45,7 @@ import java.util.Map; public class MetricStreamPersist extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(MetricStreamPersist.class); public static final String METRIC_NAME_FIELD = "metricName"; + public static final String METRIC_EVENT_FIELD = "metricEvent"; private final Config config; private final MetricMapper mapper; @@ -52,9 +54,9 @@ public class MetricStreamPersist extends BaseRichBolt { private OutputCollector collector; private BatchSender batchSender; - public MetricStreamPersist(MetricDefinition metricDefinition, Config config) { + public MetricStreamPersist(MetricDescriptor metricDescriptor, Config config) { this.config = config; - this.mapper = new StructuredMetricMapper(metricDefinition); + this.mapper = new StructuredMetricMapper(metricDescriptor); this.batchSize = config.hasPath("service.batchSize") ? config.getInt("service.batchSize") : 1; } @@ -76,8 +78,10 @@ public class MetricStreamPersist extends BaseRichBolt { @Override public void execute(Tuple input) { GenericMetricEntity metricEntity = null; + Map event = null; try { - metricEntity = this.mapper.map(StreamConvertHelper.tupleToEvent(input).f1()); + event = StreamConvertHelper.tupleToEvent(input).f1(); + metricEntity = this.mapper.map(event); if (batchSize <= 1) { GenericServiceAPIResponseEntity<String> response = this.client.create(Collections.singletonList(metricEntity)); if (!response.isSuccess()) { @@ -91,8 +95,8 @@ public class MetricStreamPersist extends BaseRichBolt { LOG.error(ex.getMessage(), ex); collector.reportError(ex); } finally { - if (metricEntity != null) { - collector.emit(Collections.singletonList(metricEntity.getPrefix())); + if (metricEntity != null && event != null) { + collector.emit(Arrays.asList(metricEntity.getPrefix(), event)); } collector.ack(input); } @@ -100,7 +104,7 @@ public class MetricStreamPersist extends BaseRichBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(METRIC_NAME_FIELD)); + declarer.declare(new Fields(METRIC_NAME_FIELD, METRIC_EVENT_FIELD)); } @Override @@ -120,35 +124,35 @@ public class MetricStreamPersist extends BaseRichBolt { } public class StructuredMetricMapper implements MetricMapper { - private final MetricDefinition metricDefinition; + private final MetricDescriptor metricDescriptor; - private StructuredMetricMapper(MetricDefinition metricDefinition) { - this.metricDefinition = metricDefinition; + private StructuredMetricMapper(MetricDescriptor metricDescriptor) { + this.metricDescriptor = metricDescriptor; } @Override public GenericMetricEntity map(Map event) { - String metricName = metricDefinition.getNameSelector().getMetricName(event); + String metricName = metricDescriptor.getMetricNameSelector().getMetricName(event); Preconditions.checkNotNull(metricName, "Metric name is null"); - Long timestamp = metricDefinition.getTimestampSelector().getTimestamp(event); + Long timestamp = metricDescriptor.getTimestampSelector().getTimestamp(event); Preconditions.checkNotNull(timestamp, "Timestamp is null"); Map<String, String> tags = new HashMap<>(); - for (String dimensionField : metricDefinition.getDimensionFields()) { + for (String dimensionField : metricDescriptor.getDimensionFields()) { Preconditions.checkNotNull(dimensionField, "Dimension field name is null"); tags.put(dimensionField, (String) event.get(dimensionField)); } double[] values; - if (event.containsKey(metricDefinition.getValueField())) { - values = new double[] {(double) event.get(metricDefinition.getValueField())}; + if (event.containsKey(metricDescriptor.getValueField())) { + values = new double[] {(double) event.get(metricDescriptor.getValueField())}; } else { - LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDefinition.getValueField(), event); + LOG.warn("Event has no value field '{}': {}, use 0 by default", metricDescriptor.getValueField(), event); values = new double[] {0}; } GenericMetricEntity entity = new GenericMetricEntity(); entity.setPrefix(metricName); - entity.setTimestamp(DateTimeUtil.roundDown(metricDefinition.getGranularity(), timestamp)); + entity.setTimestamp(DateTimeUtil.roundDown(metricDescriptor.getGranularity(), timestamp)); entity.setTags(tags); entity.setValue(values); return entity; http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java new file mode 100644 index 0000000..7bad0bd --- /dev/null +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/utils/AppConfigUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.app.utils; + +import com.typesafe.config.Config; + +public class AppConfigUtils { + public static String getSiteId(Config config) { + return config.getString("siteId"); + } + + public static String getAppId(Config config) { + return config.getString("appId"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java index f18932c..3bd5825 100644 --- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java +++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/MetricSchemaEntity.java @@ -30,11 +30,12 @@ import java.util.List; @Service(MetricSchemaEntity.METRIC_SCHEMA_SERVICE) @JsonIgnoreProperties(ignoreUnknown = true) @TimeSeries(false) -@Tags({"metricName","metricType"}) +@Tags({"site","site","group"}) public class MetricSchemaEntity extends TaggedLogAPIEntity { static final String METRIC_SCHEMA_SERVICE = "MetricSchemaService"; - public static final String METRIC_NAME_TAG = "metricName"; - public static final String METRIC_TYPE_TAG = "metricType"; + public static final String METRIC_NAME_TAG = "name"; + public static final String METRIC_SITE_TAG = "site"; + public static final String METRIC_GROUP_TAG = "group"; @Column("a") private List<String> dimensionFields; http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index bf1d4df..c3fdb43 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -210,6 +210,7 @@ class KafkaMetricSender(MetricSender): self.default_topic = None if kafka_config.has_key("default_topic"): self.default_topic = kafka_config["default_topic"].encode('utf-8') + logging.info("Using default topic: %s" % self.default_topic) self.component_topic_mapping = {} if kafka_config.has_key("component_topic_mapping"): self.component_topic_mapping = kafka_config["component_topic_mapping"] @@ -266,12 +267,18 @@ class MetricCollector(threading.Thread): filters = [] config = None closed = False + collected_event_count = 0 + ignored_event_count = 0 + emit_event_count = 0 def __init__(self, config=None): threading.Thread.__init__(self) self.config = None self.sender = None self.fqdn = socket.getfqdn() + self.ignored_event_count = 0 + self.collected_event_count = 0 + self.emit_event_count = 0 def init(self, config): self.config = config @@ -296,6 +303,7 @@ class MetricCollector(threading.Thread): def collect(self, msg, type='float'): try: + self.collected_event_count = self.collected_event_count + 1 if not msg.has_key("timestamp"): msg["timestamp"] = int(round(time.time() * 1000)) if msg.has_key("value") and type == 'float': @@ -304,21 +312,28 @@ class MetricCollector(threading.Thread): msg["value"] = str(msg["value"]) if not msg.has_key("host") or len(msg["host"]) == 0: raise Exception("host is null: " + str(msg)) + if not msg.has_key("site"): msg["site"] = self.config["env"]["site"] + if len(self.filters) == 0: + self.emit_event_count = self.emit_event_count + 1 self.sender.send(msg) return else: for filter in self.filters: if filter.filter_metric(msg): + self.emit_event_count = self.emit_event_count + 1 self.sender.send(msg) return + self.ignored_event_count = self.ignored_event_count + 1 except Exception as e: logging.error("Failed to emit metric: %s" % msg) logging.exception(e) def close(self): + logging.info("Collected %s events (emitted: %s, ignored: %s)" + % (self.collected_event_count, self.emit_event_count, self.ignored_event_count)) self.sender.close() self.closed = True http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/system_metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/system_metric_collector.py b/eagle-external/hadoop_jmx_collector/system_metric_collector.py index e0ffecc..4c95a6c 100644 --- a/eagle-external/hadoop_jmx_collector/system_metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/system_metric_collector.py @@ -17,8 +17,7 @@ # from metric_collector import MetricCollector, Runner -import logging, socket, string, os, re, time - +import logging, socket, string, os, re, time, json class SystemMetricCollector(MetricCollector): METRIC_PREFIX = "system" @@ -86,7 +85,6 @@ class SystemMetricCollector(MetricCollector): """ - cpu_metric = self.new_metric() cpu_info = os.popen('cat /proc/stat').readlines() dimensions = ["cpu", "user", "nice", "system", "idle", "wait", "irq", "softirq", "steal", "guest"] @@ -109,6 +107,7 @@ class SystemMetricCollector(MetricCollector): metric_event = dict() for i in range(1, demens): metric_event[dimensions[i]] = int(items[i]) + cpu_metric = self.new_metric("system.cpu") cpu_metric['timestamp'] = int(round(time.time() * 1000)) cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + dimensions[i] cpu_metric['device'] = items[0] @@ -123,6 +122,7 @@ class SystemMetricCollector(MetricCollector): total_cpu_usage += per_cpu_usage # system.cpu.usage + cpu_metric = self.new_metric("system.cpu") cpu_metric['timestamp'] = int(round(time.time() * 1000)) cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "usage" cpu_metric['device'] = items[0] @@ -141,6 +141,7 @@ class SystemMetricCollector(MetricCollector): result = re.split("\s+", cpu_stat_pre.rstrip()) pre_total_cpu_usage = int(result[0]) pre_total_cpu = int(result[1]) + cpu_metric['timestamp'] = int(round(time.time() * 1000)) cpu_metric['metric'] = self.METRIC_PREFIX + "." + 'cpu.' + "totalusage" cpu_metric['device'] = "cpu" @@ -153,7 +154,7 @@ class SystemMetricCollector(MetricCollector): # ==================================== def collect_uptime_metric(self): - metric = self.new_metric() + metric = self.new_metric("system.os") demension = ["uptime.day", "idletime.day"] output = os.popen('cat /proc/uptime').readlines() @@ -170,7 +171,7 @@ class SystemMetricCollector(MetricCollector): # ==================================== def collect_memory_metric(self): - event = self.new_metric() + event = self.new_metric("system.memory") event["host"] = self.fqdn output = os.popen('cat /proc/meminfo').readlines() mem_info = dict() @@ -208,7 +209,7 @@ class SystemMetricCollector(MetricCollector): items = re.split("\s+", item.rstrip()) demens = min(len(demension), len(items)) for i in range(demens): - event = self.new_metric() + event = self.new_metric("system.cpu") event["timestamp"] = int(round(time.time() * 1000)) event["metric"] = self.METRIC_PREFIX + "." + demension[i] event["value"] = items[i] @@ -223,7 +224,7 @@ class SystemMetricCollector(MetricCollector): output = os.popen('sudo ipmitool sdr | grep Temp | grep CPU').readlines() for item in output: items = re.split("^(CPU\d+)\sTemp\.\s+\|\s+(\d+|\d+\.\d+)\s", item.rstrip()) - event = self.new_metric() + event = self.new_metric("System.CPU") event["timestamp"] = int(round(time.time() * 1000)) event["metric"] = DATA_TYPE + "." + 'cpu.temp' event["value"] = items[2] @@ -247,7 +248,7 @@ class SystemMetricCollector(MetricCollector): filtered_items = items[1:5] + items[9:13] for i in range(len(demension)): - kafka_dict = self.new_metric() + kafka_dict = self.new_metric("system.network") kafka_dict["timestamp"] = int(round(time.time() * 1000)) kafka_dict['metric'] = self.METRIC_PREFIX + "." + 'nic.' + demension[i] kafka_dict["value"] = filtered_items[i] @@ -270,7 +271,7 @@ class SystemMetricCollector(MetricCollector): continue lineitems = re.split("\s+", line) metric = 'smartdisk.' + lineitems[1] - kafka_dict = self.new_metric() + kafka_dict = self.new_metric("system.disk") kafka_dict['metric'] = DATA_TYPE + "." + metric.lower() kafka_dict["timestamp"] = int(round(time.time() * 1000)) kafka_dict["value"] = lineitems[-1] @@ -308,7 +309,7 @@ class SystemMetricCollector(MetricCollector): for key, metrics in iostat_dict.iteritems(): for i in range(len(metrics)): metric = 'disk.' + demension[i] - kafka_dict = self.new_metric() + kafka_dict = self.new_metric("system.disk") kafka_dict['metric'] = DATA_TYPE + "." + metric.lower() kafka_dict["timestamp"] = int(round(time.time() * 1000)) kafka_dict["value"] = metrics[i] @@ -326,9 +327,10 @@ class SystemMetricCollector(MetricCollector): event["device"] = device self.collect(event) - def new_metric(self): + def new_metric(self, group): metric = dict() metric["host"] = self.fqdn + metric["group"] = group return metric http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json index 6fcd43b..27ad5bb 100644 --- a/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json +++ b/eagle-external/hadoop_jmx_collector/system_metric_config-sample.json @@ -1,20 +1,21 @@ { "env": { "site": "sandbox", - "log_file": "/tmp/hadoop-jmx-collector.log", "cpu_stat_file": "/tmp/eagle_cpu_usage_state" }, - "input": [ - ], "filter": { + "bean_group_filter": ["hadoop","java.lang","java.nio"], + "metric_name_filter": [ + "system.*" + ] }, "output": { "kafka": { - "debug": false, + "debug": true, "default_topic": "system_metric_sandbox", "broker_list": [ "sandbox.hortonworks.com:6667" ] } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java ---------------------------------------------------------------------- diff --git a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java index 304e500..0c49d82 100644 --- a/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java +++ b/eagle-metric/eagle-hadoop-metric/src/main/java/org/apache/eagle/metric/HadoopMetricMonitorApp.java @@ -19,8 +19,10 @@ package org.apache.eagle.metric; import backtype.storm.generated.StormTopology; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; -import org.apache.eagle.app.environment.builder.MetricDefinition; +import org.apache.eagle.app.environment.builder.MetricDescriptor; +import org.apache.eagle.app.environment.builder.MetricDescriptor.MetricGroupSelector; import org.apache.eagle.app.environment.impl.StormEnvironment; +import org.apache.eagle.app.utils.AppConfigUtils; import java.util.Calendar; @@ -28,14 +30,30 @@ public class HadoopMetricMonitorApp extends StormApplication { @Override public StormTopology execute(Config config, StormEnvironment environment) { return environment.newApp(config) - .fromStream("HADOOP_JMX_METRIC_STREAM") - .saveAsMetric(MetricDefinition - .metricType("HADOOP_JMX_METRICS") - .namedByField("metric") - .eventTimeByField("timestamp") - .dimensionFields("host","component","site") - .granularity(Calendar.MINUTE) - .valueField("value")) - .toTopology(); + .fromStream("HADOOP_JMX_METRIC_STREAM") + .saveAsMetric( + MetricDescriptor.metricGroupAs((MetricGroupSelector) event -> { + if (event.containsKey("component")) { + return String.format("hadoop.%s", ((String) event.get("component")).toLowerCase()); + } else { + return "hadoop.metrics"; + } + }) + .siteAs(AppConfigUtils.getSiteId(config)) + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "component", "site") + .granularity(Calendar.MINUTE) + .valueField("value")) + .fromStream("SYSTEM_METRIC_STREAM") + .saveAsMetric(MetricDescriptor.metricGroupByField("group") + .siteAs(AppConfigUtils.getSiteId(config)) + .namedByField("metric") + .eventTimeByField("timestamp") + .dimensionFields("host", "group", "site", "device") + .granularity(Calendar.MINUTE) + .valueField("value") + ) + .toTopology(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml ---------------------------------------------------------------------- diff --git a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml index 073c900..51a9257 100644 --- a/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml +++ b/eagle-metric/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml @@ -20,7 +20,6 @@ <type>HADOOP_METRIC_MONITOR</type> <name>Hadoop Metrics Monitor</name> <configuration> - <!-- data fromStream configurations --> <property> <name>dataSinkConfig.HADOOP_JMX_METRIC_STREAM.topic</name> <displayName>JMX Metric Kafka Topic</displayName> @@ -29,6 +28,12 @@ <required>true</required> </property> <property> + <name>dataSinkConfig.SYSTEM_METRIC_STREAM.topic</name> + <displayName>System Metric Kafka Topic</displayName> + <value>system_metric_${siteId}</value> + <description>System JMX metric kafka topic name for stream: SYSTEM_METRIC_STREAM</description> + </property> + <property> <name>dataSinkConfig.HADOOP_JMX_RESOURCE_STREAM.topic</name> <displayName>JMX Resource Kafka Topic</displayName> <value>hadoop_jmx_resource_${siteId}</value> @@ -134,6 +139,41 @@ </columns> </stream> <stream> + <streamId>SYSTEM_METRIC_STREAM</streamId> + <description>System Metrics Stream including CPU, Network, Disk, etc.</description> + <columns> + <column> + <name>host</name> + <type>string</type> + </column> + <column> + <name>timestamp</name> + <type>long</type> + </column> + <column> + <name>metric</name> + <type>string</type> + </column> + <column> + <name>group</name> + <type>string</type> + </column> + <column> + <name>site</name> + <type>string</type> + </column> + <column> + <name>device</name> + <type>string</type> + </column> + <column> + <name>value</name> + <type>double</type> + <defaultValue>0.0</defaultValue> + </column> + </columns> + </stream> + <stream> <streamId>HADOOP_JMX_RESOURCE_STREAM</streamId> <description>Hadoop JMX Resource Stream including name node, resource manager, etc.</description> <columns> http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java ---------------------------------------------------------------------- diff --git a/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java b/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java index e4589b7..e6f1463 100644 --- a/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java +++ b/eagle-metric/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/HadoopMetricMonitorAppDebug.java @@ -16,6 +16,9 @@ */ package org.apache.eagle.metric; +import org.junit.Ignore; + +@Ignore public class HadoopMetricMonitorAppDebug { public static void main(String[] args) { new HadoopMetricMonitorApp().run(args); http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf ---------------------------------------------------------------------- diff --git a/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf b/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf index 4d74666..c864bf4 100644 --- a/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf +++ b/eagle-metric/eagle-hadoop-metric/src/test/resources/application.conf @@ -32,18 +32,14 @@ "mode" : "LOCAL", "siteId" : "testsite", "dataSourceConfig": { - "topic" : "hadoop_jmx_metric", + "HADOOP_JMX_METRIC_STREAM": { + "topic": "hadoop_jmx_metric_sandbox", + } + "SYSTEM_METRIC_STREAM": { + "topic": "system_metric_sandbox", + } + // "topic" : "hadoop_jmx_metric_sandbox", "zkConnection" : "localhost:2181", "txZkServers" : "localhost:2181" } - "dataSinkConfig": { - "topic" : "hadoop_jmx_metric", - "brokerList" : "localhost:6667", - "serializerClass" : "kafka.serializer.StringEncoder", - "keySerializerClass" : "kafka.serializer.StringEncoder" - "producerType" : "async", - "numBatchMessages" : "4096", - "maxQueueBufferMs" : "5000", - "requestRequiredAcks" : "0" - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/6e0fc410/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js ---------------------------------------------------------------------- diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js index e0f0f15..b310738 100644 --- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js +++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/metricCtrl.js @@ -40,7 +40,7 @@ $scope.metricList = [$scope.metricName]; CompatibleEntity.groups({ query: 'MetricSchemaService', - groups: 'metricName', + groups: 'name', fields: 'count', limit: 9999, })._promise.then(function (res) {
