Repository: incubator-gobblin Updated Branches: refs/heads/master a080ad843 -> 0dfa2bd91
Graphite prefix in configuration Graphite prefix in configuration Review fix in graphite prefix configuration Merge remote-tracking branch 'origin/master' into du Closes #1883 from wikp/prefix-graphite Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0dfa2bd9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0dfa2bd9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0dfa2bd9 Branch: refs/heads/master Commit: 0dfa2bd911792d78eec74b4a5036e8c169d2e9ab Parents: a080ad8 Author: Piotr Wikiel <[email protected]> Authored: Sat Jul 29 01:08:24 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Sat Jul 29 01:08:24 2017 -0700 ---------------------------------------------------------------------- .../gobblin/configuration/ConfigurationKeys.java | 4 ++++ .../reporter/ConfiguredScheduledReporter.java | 19 ++++++++++++++++++- .../java/gobblin/metrics/GobblinMetrics.java | 9 ++++----- .../metrics/graphite/GraphiteEventReporter.java | 18 +++++++++++++----- 4 files changed, 39 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0dfa2bd9/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java index e119d43..bb54b5d 100644 --- a/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/gobblin/configuration/ConfigurationKeys.java @@ -641,6 +641,10 @@ public class ConfigurationKeys { public static final String DEFAULT_METRICS_REPORTING_GRAPHITE_EVENTS_VALUE_AS_KEY = Boolean.toString(false); public static final String METRICS_REPORTING_GRAPHITE_SENDING_TYPE = METRICS_CONFIGURATIONS_PREFIX + "reporting.graphite.sending.type"; + public static final String METRICS_REPORTING_GRAPHITE_PREFIX = METRICS_CONFIGURATIONS_PREFIX + + "reporting.graphite.prefix"; + public static final String DEFAULT_METRICS_REPORTING_GRAPHITE_PREFIX = ""; + public static final String DEFAULT_METRICS_REPORTING_GRAPHITE_SENDING_TYPE = "TCP"; //InfluxDB-based reporting http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0dfa2bd9/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/gobblin/metrics/reporter/ConfiguredScheduledReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/gobblin/metrics/reporter/ConfiguredScheduledReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/gobblin/metrics/reporter/ConfiguredScheduledReporter.java index bf1afa9..7b076ff 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/gobblin/metrics/reporter/ConfiguredScheduledReporter.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/gobblin/metrics/reporter/ConfiguredScheduledReporter.java @@ -23,6 +23,8 @@ import java.util.Map; import java.util.SortedMap; import java.util.concurrent.TimeUnit; +import com.google.common.base.Strings; +import gobblin.configuration.ConfigurationKeys; import lombok.Getter; import org.slf4j.Logger; @@ -72,6 +74,7 @@ public abstract class ConfiguredScheduledReporter extends ScheduledReporter { protected final ImmutableMap<String, String> tags; protected final Closer closer; protected final String metricContextName; + protected final String metricsPrefix; protected static final Joiner JOINER = Joiner.on('.').skipNulls(); @@ -84,6 +87,7 @@ public abstract class ConfiguredScheduledReporter extends ScheduledReporter { this.tags = ImmutableMap.copyOf(builder.tags); this.closer = Closer.create(); this.metricContextName = builder.metricContextName; + this.metricsPrefix = builder.metricsPrefix; } /** @@ -97,6 +101,7 @@ public abstract class ConfiguredScheduledReporter extends ScheduledReporter { protected TimeUnit durationUnit; protected Map<String, String> tags; protected String metricContextName; + protected String metricsPrefix; protected Builder() { this.name = "ConfiguredScheduledReporter"; @@ -174,6 +179,16 @@ public abstract class ConfiguredScheduledReporter extends ScheduledReporter { } /** + * Sets metrics prefix independent from the context (useful for grouping metrics in Graphite or other metric-store) + * @param metricsPrefix + * @return + */ + public T withMetricsPrefix(String metricsPrefix) { + this.metricsPrefix = metricsPrefix; + return self(); + } + + /** * Add the name of the base metrics context as prefix to the metric keys * * @param metricContextName name of the metrics context @@ -219,7 +234,9 @@ public abstract class ConfiguredScheduledReporter extends ScheduledReporter { if (metricContextName == null || (currentContextName.indexOf(metricContextName) > -1)) { return currentContextName; } - return JOINER.join(metricContextName, tags.get("taskId"), tags.get("forkBranchName"), tags.get("class")); + + return JOINER.join(Strings.emptyToNull(metricsPrefix), + metricContextName, tags.get("taskId"), tags.get("forkBranchName"), tags.get("class")); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0dfa2bd9/gobblin-metrics-libs/gobblin-metrics/src/main/java/gobblin/metrics/GobblinMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics/src/main/java/gobblin/metrics/GobblinMetrics.java b/gobblin-metrics-libs/gobblin-metrics/src/main/java/gobblin/metrics/GobblinMetrics.java index 93ac85b..98bb100 100644 --- a/gobblin-metrics-libs/gobblin-metrics/src/main/java/gobblin/metrics/GobblinMetrics.java +++ b/gobblin-metrics-libs/gobblin-metrics/src/main/java/gobblin/metrics/GobblinMetrics.java @@ -23,7 +23,6 @@ import java.net.URI; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -39,18 +38,14 @@ import com.codahale.metrics.JmxReporter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Strings; -import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.io.Closer; import com.typesafe.config.Config; -import javax.annotation.Nullable; - import gobblin.configuration.ConfigurationKeys; import gobblin.configuration.State; import gobblin.metrics.graphite.GraphiteConnectionType; @@ -559,6 +554,8 @@ public class GobblinMetrics { GraphiteConnectionType connectionType; String type = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_SENDING_TYPE, ConfigurationKeys.DEFAULT_METRICS_REPORTING_GRAPHITE_SENDING_TYPE).toUpperCase(); + String prefix = properties.getProperty(ConfigurationKeys.METRICS_REPORTING_GRAPHITE_PREFIX, + ConfigurationKeys.DEFAULT_METRICS_REPORTING_GRAPHITE_PREFIX); try { connectionType = GraphiteConnectionType.valueOf(type); } catch (IllegalArgumentException exception) { @@ -572,6 +569,7 @@ public class GobblinMetrics { GraphiteReporter.Factory.newBuilder().withConnectionType(connectionType) .withConnection(hostname, port).withMetricContextName( this.metricContext.getName()) //contains the current job id + .withMetricsPrefix(prefix) .build(properties); } catch (IOException e) { LOGGER.error("Failed to create Graphite metrics reporter. Will not report metrics to Graphite.", e); @@ -590,6 +588,7 @@ public class GobblinMetrics { GraphiteEventReporter.Factory.forContext(RootMetricContext.get()) .withConnectionType(connectionType) .withConnection(hostname, eventsPort) + .withPrefix(prefix) .withEmitValueAsKey(emitValueAsKey) .build(); this.codahaleScheduledReporters.add(this.codahaleReportersCloser.register(eventReporter)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0dfa2bd9/gobblin-modules/gobblin-metrics-graphite/src/main/java/gobblin/metrics/graphite/GraphiteEventReporter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-metrics-graphite/src/main/java/gobblin/metrics/graphite/GraphiteEventReporter.java b/gobblin-modules/gobblin-metrics-graphite/src/main/java/gobblin/metrics/graphite/GraphiteEventReporter.java index 677d0b2..97590fb 100644 --- a/gobblin-modules/gobblin-metrics-graphite/src/main/java/gobblin/metrics/graphite/GraphiteEventReporter.java +++ b/gobblin-modules/gobblin-metrics-graphite/src/main/java/gobblin/metrics/graphite/GraphiteEventReporter.java @@ -54,6 +54,7 @@ public class GraphiteEventReporter extends EventReporter { private static final String EMTPY_VALUE = "0"; private static final Logger LOGGER = LoggerFactory.getLogger(GraphiteEventReporter.class); + private String prefix; public GraphiteEventReporter(Builder<?> builder) throws IOException { super(builder); @@ -64,6 +65,7 @@ public class GraphiteEventReporter extends EventReporter { this.closer.register(new GraphitePusher(builder.hostname, builder.port, builder.connectionType)); } this.emitValueAsKey = builder.emitValueAsKey; + this.prefix = builder.prefix; } @Override @@ -99,19 +101,19 @@ public class GraphiteEventReporter extends EventReporter { long timestamp = event.getTimestamp() / 1000l; MultiPartEvent multipartEvent = MultiPartEvent.getEvent(metadata.get(EventSubmitter.EVENT_TYPE)); if (multipartEvent == null) { - graphitePusher.push(name, EMTPY_VALUE, timestamp); + graphitePusher.push(JOINER.join(prefix, name), EMTPY_VALUE, timestamp); } else { for (String field : multipartEvent.getMetadataFields()) { String value = metadata.get(field); if (value == null) { - graphitePusher.push(JOINER.join(name, field), EMTPY_VALUE, timestamp); + graphitePusher.push(JOINER.join(prefix, name, field), EMTPY_VALUE, timestamp); } else { if (emitAsKey(field)) { // metric value is emitted as part of the keys - graphitePusher.push(JOINER.join(name, field, value), EMTPY_VALUE, timestamp); + graphitePusher.push(JOINER.join(prefix, name, field, value), EMTPY_VALUE, timestamp); } else { - graphitePusher.push(JOINER.join(name, field), convertValue(field, value), timestamp); + graphitePusher.push(JOINER.join(prefix, name, field), convertValue(field, value), timestamp); } } } @@ -125,7 +127,7 @@ public class GraphiteEventReporter extends EventReporter { /** * Non-numeric event values may be emitted as part of the key by applying them to the end of the key if - * {@link ConfigurationKeys#METRICS_REPORTING_GRAPHITE_EVENT_VALUE_AS_KEY} is set. Thus such events can be still + * {@link ConfigurationKeys#METRICS_REPORTING_GRAPHITE_EVENTS_VALUE_AS_KEY} is set. Thus such events can be still * reported even when the backend doesn't accept text values through Graphite * * @param field name of the metric's metadata fields @@ -184,6 +186,7 @@ public class GraphiteEventReporter extends EventReporter { protected GraphiteConnectionType connectionType; protected Optional<GraphitePusher> graphitePusher; protected boolean emitValueAsKey; + protected String prefix; protected Builder(MetricContext context) { super(context); @@ -208,6 +211,11 @@ public class GraphiteEventReporter extends EventReporter { return self(); } + public T withPrefix(String prefix) { + this.prefix = prefix; + return self(); + } + /** * Set {@link gobblin.metrics.graphite.GraphiteConnectionType} to use. */
