This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 780e10676d6ee8b0e7b690ddbd7732b5e8c955f0 Author: Krishen <[email protected]> AuthorDate: Tue Mar 5 08:41:39 2024 -0800 [HUDI-7337] Implement MetricsReporter that reports metrics to M3 (#10565) --------- Co-authored-by: Krishen Bhan <“[email protected]”> --- hudi-client/hudi-client-common/pom.xml | 10 ++ .../org/apache/hudi/config/HoodieWriteConfig.java | 28 ++++ .../hudi/config/metrics/HoodieMetricsM3Config.java | 126 ++++++++++++++++++ .../hudi/metadata/HoodieMetadataWriteUtils.java | 10 ++ .../hudi/metrics/MetricsReporterFactory.java | 4 + .../apache/hudi/metrics/MetricsReporterType.java | 2 +- .../apache/hudi/metrics/m3/M3MetricsReporter.java | 120 +++++++++++++++++ .../hudi/metrics/m3/M3ScopeReporterAdaptor.java | 145 +++++++++++++++++++++ .../org/apache/hudi/metrics/m3/TestM3Metrics.java | 92 +++++++++++++ packaging/hudi-flink-bundle/pom.xml | 6 + packaging/hudi-integ-test-bundle/pom.xml | 6 + packaging/hudi-kafka-connect-bundle/pom.xml | 6 + packaging/hudi-spark-bundle/pom.xml | 7 + packaging/hudi-utilities-bundle/pom.xml | 6 + packaging/hudi-utilities-slim-bundle/pom.xml | 6 + pom.xml | 12 +- 16 files changed, 584 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/pom.xml b/hudi-client/hudi-client-common/pom.xml index 47b2741bd9d..6caccd0b0a6 100644 --- a/hudi-client/hudi-client-common/pom.xml +++ b/hudi-client/hudi-client-common/pom.xml @@ -120,6 +120,16 @@ <groupId>io.prometheus</groupId> <artifactId>simpleclient_pushgateway</artifactId> </dependency> + <dependency> + <groupId>com.uber.m3</groupId> + <artifactId>tally-m3</artifactId> + <version>${tally.version}</version> + </dependency> + <dependency> + <groupId>com.uber.m3</groupId> + <artifactId>tally-core</artifactId> + <version>${tally.version}</version> + </dependency> <!-- Lock --> <dependency> diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 99915fca25a..3220ef22c2f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -62,6 +62,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; +import org.apache.hudi.config.metrics.HoodieMetricsM3Config; import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; @@ -2178,6 +2179,26 @@ public class HoodieWriteConfig extends HoodieConfig { return getInt(HoodieMetricsGraphiteConfig.GRAPHITE_REPORT_PERIOD_IN_SECONDS); } + public String getM3ServerHost() { + return getString(HoodieMetricsM3Config.M3_SERVER_HOST_NAME); + } + + public int getM3ServerPort() { + return getInt(HoodieMetricsM3Config.M3_SERVER_PORT_NUM); + } + + public String getM3Tags() { + return getString(HoodieMetricsM3Config.M3_TAGS); + } + + public String getM3Env() { + return getString(HoodieMetricsM3Config.M3_ENV); + } + + public String getM3Service() { + return getString(HoodieMetricsM3Config.M3_SERVICE); + } + public String getJmxHost() { return getString(HoodieMetricsJmxConfig.JMX_HOST_NAME); } @@ -2633,6 +2654,7 @@ public class HoodieWriteConfig extends HoodieConfig { private boolean isPreCommitValidationConfigSet = false; private boolean isMetricsJmxConfigSet = false; private boolean isMetricsGraphiteConfigSet = false; + private boolean isMetricsM3ConfigSet = false; private boolean isLayoutConfigSet = false; public Builder withEngineType(EngineType engineType) { @@ -2867,6 +2889,12 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withMetricsM3Config(HoodieMetricsM3Config metricsM3Config) { + writeConfig.getProps().putAll(metricsM3Config.getProps()); + isMetricsM3ConfigSet = true; + return this; + } + public Builder withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig validatorConfig) { writeConfig.getProps().putAll(validatorConfig.getProps()); isPreCommitValidationConfigSet = true; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java new file mode 100644 index 00000000000..cc675eebfbb --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsM3Config.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config.metrics; + +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +/** + * Configs for M3 reporter type. + * <p> + * {@link org.apache.hudi.metrics.MetricsReporterType#M3} + */ +@ConfigClassProperty(name = "Metrics Configurations for M3", + groupName = ConfigGroups.Names.METRICS, + description = "Enables reporting on Hudi metrics using M3. " + + " Hudi publishes metrics on every commit, clean, rollback etc.") +public class HoodieMetricsM3Config extends HoodieConfig { + + public static final String M3_PREFIX = METRIC_PREFIX + ".m3"; + + public static final ConfigProperty<String> M3_SERVER_HOST_NAME = ConfigProperty + .key(M3_PREFIX + ".host") + .defaultValue("localhost") + .withDocumentation("M3 host to connect to."); + + public static final ConfigProperty<Integer> M3_SERVER_PORT_NUM = ConfigProperty + .key(M3_PREFIX + ".port") + .defaultValue(9052) + .withDocumentation("M3 port to connect to."); + + public static final ConfigProperty<String> M3_TAGS = ConfigProperty + .key(M3_PREFIX + ".tags") + .defaultValue("") + .withDocumentation("Optional M3 tags applied to all metrics."); + + public static final ConfigProperty<String> M3_ENV = ConfigProperty + .key(M3_PREFIX + ".env") + .defaultValue("production") + .withDocumentation("M3 tag to label the environment (defaults to 'production'), " + + "applied to all metrics."); + + public static final ConfigProperty<String> M3_SERVICE = ConfigProperty + .key(M3_PREFIX + ".service") + .defaultValue("hoodie") + .withDocumentation("M3 tag to label the service name (defaults to 'hoodie'), " + + "applied to all metrics."); + + private HoodieMetricsM3Config() { + super(); + } + + public static HoodieMetricsM3Config.Builder newBuilder() { + return new HoodieMetricsM3Config.Builder(); + } + + public static class Builder { + + private final HoodieMetricsM3Config hoodieMetricsM3Config = new HoodieMetricsM3Config(); + + public HoodieMetricsM3Config.Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.hoodieMetricsM3Config.getProps().load(reader); + return this; + } + } + + public HoodieMetricsM3Config.Builder fromProperties(Properties props) { + this.hoodieMetricsM3Config.getProps().putAll(props); + return this; + } + + public HoodieMetricsM3Config.Builder toM3Host(String host) { + hoodieMetricsM3Config.setValue(M3_SERVER_HOST_NAME, host); + return this; + } + + public HoodieMetricsM3Config.Builder onM3Port(int port) { + hoodieMetricsM3Config.setValue(M3_SERVER_PORT_NUM, String.valueOf(port)); + return this; + } + + public HoodieMetricsM3Config.Builder useM3Tags(String tags) { + hoodieMetricsM3Config.setValue(M3_TAGS, tags); + return this; + } + + public HoodieMetricsM3Config.Builder useM3Env(String env) { + hoodieMetricsM3Config.setValue(M3_ENV, env); + return this; + } + + public HoodieMetricsM3Config.Builder useM3Service(String service) { + hoodieMetricsM3Config.setValue(M3_SERVICE, service); + return this; + } + + public HoodieMetricsM3Config build() { + hoodieMetricsM3Config.setDefaults(HoodieMetricsM3Config.class.getName()); + return hoodieMetricsM3Config; + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index 243b74b9199..76fffd5d0df 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -37,6 +37,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.metrics.HoodieMetricsConfig; import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig; import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig; +import org.apache.hudi.config.metrics.HoodieMetricsM3Config; import org.apache.hudi.config.metrics.HoodieMetricsPrometheusConfig; import org.apache.hudi.config.metrics.HoodieMetricsDatadogConfig; import org.apache.hudi.exception.HoodieMetadataException; @@ -183,6 +184,15 @@ public class HoodieMetadataWriteUtils { .withPushgatewayPortNum(writeConfig.getPushGatewayPort()).build(); builder.withProperties(prometheusConfig.getProps()); break; + case M3: + HoodieMetricsM3Config m3Config = HoodieMetricsM3Config.newBuilder() + .onM3Port(writeConfig.getM3ServerPort()) + .toM3Host(writeConfig.getM3ServerHost()) + .useM3Tags(writeConfig.getM3Tags()) + .useM3Service(writeConfig.getM3Service()) + .useM3Env(writeConfig.getM3Env()).build(); + builder.withProperties(m3Config.getProps()); + break; case DATADOG: HoodieMetricsDatadogConfig.Builder datadogConfig = HoodieMetricsDatadogConfig.newBuilder() .withDatadogApiKey(writeConfig.getDatadogApiKey()) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java index 27034735a04..0d20337fa5c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterFactory.java @@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metrics.cloudwatch.CloudWatchMetricsReporter; import org.apache.hudi.metrics.custom.CustomizableMetricsReporter; import org.apache.hudi.metrics.datadog.DatadogMetricsReporter; +import org.apache.hudi.metrics.m3.M3MetricsReporter; import org.apache.hudi.metrics.prometheus.PrometheusReporter; import org.apache.hudi.metrics.prometheus.PushGatewayMetricsReporter; @@ -89,6 +90,9 @@ public class MetricsReporterFactory { case CLOUDWATCH: reporter = new CloudWatchMetricsReporter(config, registry); break; + case M3: + reporter = new M3MetricsReporter(config, registry); + break; default: LOG.error("Reporter type[" + type + "] is not supported."); break; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java index 3c860015928..6d05e443e6b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/MetricsReporterType.java @@ -22,5 +22,5 @@ package org.apache.hudi.metrics; * Types of the reporter supported, hudi also supports user defined reporter. */ public enum MetricsReporterType { - GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH + GRAPHITE, INMEMORY, JMX, DATADOG, CONSOLE, PROMETHEUS_PUSHGATEWAY, PROMETHEUS, CLOUDWATCH, M3 } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java new file mode 100644 index 00000000000..a658476ef75 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3MetricsReporter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.m3; + +import com.codahale.metrics.MetricRegistry; +import com.uber.m3.tally.m3.M3Reporter; +import com.uber.m3.util.Duration; +import com.uber.m3.util.ImmutableMap; +import com.uber.m3.tally.RootScopeBuilder; +import com.uber.m3.tally.Scope; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.MetricsReporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation of M3 Metrics reporter, which can report metrics to a https://m3db.io/ service + */ +public class M3MetricsReporter extends MetricsReporter { + + private static final Logger LOG = LoggerFactory.getLogger(M3MetricsReporter.class); + private final HoodieWriteConfig config; + private final MetricRegistry registry; + private final ImmutableMap<String, String> tags; + + public M3MetricsReporter(HoodieWriteConfig config, MetricRegistry registry) { + this.config = config; + this.registry = registry; + + ImmutableMap.Builder tagBuilder = new ImmutableMap.Builder<>(); + tagBuilder.putAll(parseOptionalTags(config.getM3Tags())); + tagBuilder.put("service", config.getM3Service()); + tagBuilder.put("env", config.getM3Env()); + this.tags = tagBuilder.build(); + LOG.info(String.format("Building M3 Reporter with M3 tags mapping: %s", tags)); + } + + private static Map parseOptionalTags(String tagValueString) { + Map parsedTags = new HashMap(); + if (!tagValueString.isEmpty()) { + Arrays.stream(tagValueString.split(",")).forEach((tagValuePair) -> { + String[] parsedTagValuePair = Arrays.stream(tagValuePair.split("=")) + .map((tagOrValue) -> tagOrValue.trim()).filter((tagOrValue) -> !tagOrValue.isEmpty()) + .toArray(String[]::new); + if (parsedTagValuePair.length != 2) { + throw new RuntimeException(String.format( + "M3 Reporter tags cannot be initialized with tags [%s] due to not being in format `tag=value, . . .`.", + tagValuePair)); + } + parsedTags.put(parsedTagValuePair[0], parsedTagValuePair[1]); + }); + } + return parsedTags; + } + + @Override + public void start() {} + + @Override + public void report() { + /* + Although com.uber.m3.tally.Scope supports automatically submitting metrics in an interval + via a background task, it does not seem to support + - an API for explicitly flushing/emitting all metrics + - Taking in an external com.codahale.metrics.MetricRegistry metrics registry and automatically + adding any new counters/gauges whenever they are added to the registry + Due to this, this implementation emits metrics by creating a Scope, adding all metrics from + the HUDI metircs registry as counters/gauges to the scope, and then closing the Scope. Since + closing this Scope will implicitly flush all M3 metrics, the reporting intervals + are configured to be Integer.MAX_VALUE. + */ + synchronized (this) { + try (Scope scope = new RootScopeBuilder() + .reporter(new M3Reporter.Builder( + new InetSocketAddress(config.getM3ServerHost(), config.getM3ServerPort())) + .includeHost(true).commonTags(tags) + .build()) + .reportEvery(Duration.ofSeconds(Integer.MAX_VALUE)) + .tagged(tags)) { + + M3ScopeReporterAdaptor scopeReporter = new M3ScopeReporterAdaptor(registry, scope); + scopeReporter.start(Integer.MAX_VALUE, TimeUnit.SECONDS); + scopeReporter.report(); + scopeReporter.stop(); + } catch (Exception e) { + LOG.error(String.format("Error reporting metrics to M3: %s", e)); + } + } + } + + @Override + public void stop() {} +} + + + + + + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java new file mode 100644 index 00000000000..ae66914400b --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/m3/M3ScopeReporterAdaptor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.m3; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metered; +import com.codahale.metrics.MetricFilter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.ScheduledReporter; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import com.uber.m3.tally.Scope; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import org.apache.hudi.common.util.collection.Pair; + +/** + * Implementation of com.codahale.metrics.ScheduledReporter, to emit metrics from + * com.codahale.metrics.MetricRegistry to M3 + */ +public class M3ScopeReporterAdaptor extends ScheduledReporter { + private final Scope scope; + + protected M3ScopeReporterAdaptor(MetricRegistry registry, Scope scope) { + super(registry, "hudi-m3-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.SECONDS); + this.scope = scope; + } + + @Override + public void start(long period, TimeUnit unit) { + } + + @Override + public void stop() { + } + + @Override + public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters, + SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters, + SortedMap<String, Timer> timers) { + /* + When reporting, process each com.codahale.metrics metric and add counters & gauges to + the passed-in com.uber.m3.tally.Scope with the same name and value. This is needed + for the Scope to register these metrics + */ + report(scope, + gauges, + counters, + histograms, + meters, + timers); + } + + private void report(Scope scope, + Map<String, Gauge> gauges, + Map<String, Counter> counters, + Map<String, Histogram> histograms, + Map<String, Meter> meters, + Map<String, Timer> timers) { + + for (Entry<String, Gauge> entry : gauges.entrySet()) { + scope.gauge(entry.getKey()).update( + ((Number) entry.getValue().getValue()).doubleValue()); + } + + for (Entry<String, Counter> entry : counters.entrySet()) { + scope.counter(entry.getKey()).inc( + ((Number) entry.getValue().getCount()).longValue()); + } + + for (Entry<String, Histogram> entry : histograms.entrySet()) { + scope.gauge(MetricRegistry.name(entry.getKey(), "count")).update( + entry.getValue().getCount()); + reportSnapshot(entry.getKey(), entry.getValue().getSnapshot()); + } + + for (Entry<String, Meter> entry : meters.entrySet()) { + reportMetered(entry.getKey(), entry.getValue()); + } + + for (Entry<String, Timer> entry : timers.entrySet()) { + reportTimer(entry.getKey(), entry.getValue()); + } + } + + private void reportMetered(String name, Metered meter) { + scope.counter(MetricRegistry.name(name, "count")).inc(meter.getCount()); + List<Pair<String, Double>> meterGauges = Arrays.asList( + Pair.of("m1_rate", meter.getOneMinuteRate()), + Pair.of("m5_rate", meter.getFiveMinuteRate()), + Pair.of("m15_rate", meter.getFifteenMinuteRate()), + Pair.of("mean_rate", meter.getMeanRate()) + ); + for (Pair<String, Double> pair : meterGauges) { + scope.gauge(MetricRegistry.name(name, pair.getLeft())).update(pair.getRight()); + } + } + + private void reportSnapshot(String name, Snapshot snapshot) { + List<Pair<String, Number>> snapshotGauges = Arrays.asList( + Pair.of("max", snapshot.getMax()), + Pair.of("mean", snapshot.getMean()), + Pair.of("min", snapshot.getMin()), + Pair.of("stddev", snapshot.getStdDev()), + Pair.of("p50", snapshot.getMedian()), + Pair.of("p75", snapshot.get75thPercentile()), + Pair.of("p95", snapshot.get95thPercentile()), + Pair.of("p98", snapshot.get98thPercentile()), + Pair.of("p99", snapshot.get99thPercentile()), + Pair.of("p999", snapshot.get999thPercentile()) + ); + for (Pair<String, Number> pair : snapshotGauges) { + scope.gauge(MetricRegistry.name(name, pair.getLeft())).update(pair.getRight().doubleValue()); + } + } + + private void reportTimer(String name, Timer timer) { + reportMetered(name, timer); + reportSnapshot(name, timer.getSnapshot()); + } + +} diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java new file mode 100644 index 00000000000..e7299d706b8 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/m3/TestM3Metrics.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics.m3; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.when; + +import java.util.UUID; +import org.apache.hudi.common.testutils.NetworkTestUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metrics.HoodieMetrics; +import org.apache.hudi.metrics.Metrics; +import org.apache.hudi.metrics.MetricsReporterType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TestM3Metrics { + + @Mock + HoodieWriteConfig config; + HoodieMetrics hoodieMetrics; + Metrics metrics; + + @BeforeEach + public void start() { + when(config.isMetricsOn()).thenReturn(true); + when(config.getMetricsReporterType()).thenReturn(MetricsReporterType.M3); + when(config.getBasePath()).thenReturn("s3://test" + UUID.randomUUID()); + } + + @Test + public void testRegisterGauge() { + when(config.getM3ServerHost()).thenReturn("localhost"); + when(config.getM3ServerPort()).thenReturn(NetworkTestUtils.nextFreePort()); + when(config.getTableName()).thenReturn("raw_table"); + when(config.getM3Env()).thenReturn("dev"); + when(config.getM3Service()).thenReturn("hoodie"); + when(config.getM3Tags()).thenReturn("tag1=value1,tag2=value2"); + when(config.getMetricReporterMetricsNamePrefix()).thenReturn(""); + hoodieMetrics = new HoodieMetrics(config); + metrics = hoodieMetrics.getMetrics(); + metrics.registerGauge("metric1", 123L); + assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); + metrics.shutdown(); + } + + @Test + public void testEmptyM3Tags() { + when(config.getM3ServerHost()).thenReturn("localhost"); + when(config.getM3ServerPort()).thenReturn(NetworkTestUtils.nextFreePort()); + when(config.getTableName()).thenReturn("raw_table"); + when(config.getM3Env()).thenReturn("dev"); + when(config.getM3Service()).thenReturn("hoodie"); + when(config.getM3Tags()).thenReturn(""); + when(config.getMetricReporterMetricsNamePrefix()).thenReturn(""); + hoodieMetrics = new HoodieMetrics(config); + metrics = hoodieMetrics.getMetrics(); + metrics.registerGauge("metric1", 123L); + assertEquals("123", metrics.getRegistry().getGauges().get("metric1").getValue().toString()); + metrics.shutdown(); + } + + @Test + public void testInvalidM3Tags() { + when(config.getTableName()).thenReturn("raw_table"); + when(config.getMetricReporterMetricsNamePrefix()).thenReturn(""); + assertThrows(RuntimeException.class, () -> { + hoodieMetrics = new HoodieMetrics(config); + }); + } +} diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 8fc4ff869c1..71d5abc7008 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -127,6 +127,8 @@ <include>io.prometheus:simpleclient_dropwizard</include> <include>io.prometheus:simpleclient_pushgateway</include> <include>io.prometheus:simpleclient_common</include> + <include>com.uber.m3:tally-m3</include> + <include>com.uber.m3:tally-core</include> <!-- Used for HUDI TimelineService --> <include>org.eclipse.jetty:*</include> @@ -210,6 +212,10 @@ <pattern>org.openjdk.jol.</pattern> <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern> </relocation> + <relocation> + <pattern>com.uber.m3.</pattern> + <shadedPattern>org.apache.hudi.com.uber.m3.</shadedPattern> + </relocation> </relocations> <filters> <filter> diff --git a/packaging/hudi-integ-test-bundle/pom.xml b/packaging/hudi-integ-test-bundle/pom.xml index 01825a1ab99..678519701dd 100644 --- a/packaging/hudi-integ-test-bundle/pom.xml +++ b/packaging/hudi-integ-test-bundle/pom.xml @@ -164,6 +164,8 @@ <include>io.prometheus:simpleclient_dropwizard</include> <include>io.prometheus:simpleclient_pushgateway</include> <include>io.prometheus:simpleclient_common</include> + <include>com.uber.m3:tally-m3</include> + <include>com.uber.m3:tally-core</include> <include>org.openjdk.jol:jol-core</include> </includes> </artifactSet> @@ -272,6 +274,10 @@ <pattern>org.eclipse.jetty.</pattern> <shadedPattern>org.apache.hudi.org.eclipse.jetty.</shadedPattern> </relocation> + <relocation> + <pattern>com.uber.m3.</pattern> + <shadedPattern>org.apache.hudi.com.uber.m3.</shadedPattern> + </relocation> </relocations> <filters> <filter> diff --git a/packaging/hudi-kafka-connect-bundle/pom.xml b/packaging/hudi-kafka-connect-bundle/pom.xml index 4ec205c564c..f3400823b97 100644 --- a/packaging/hudi-kafka-connect-bundle/pom.xml +++ b/packaging/hudi-kafka-connect-bundle/pom.xml @@ -124,6 +124,8 @@ <include>io.prometheus:simpleclient_dropwizard</include> <include>io.prometheus:simpleclient_pushgateway</include> <include>io.prometheus:simpleclient_common</include> + <include>com.uber.m3:tally-m3</include> + <include>com.uber.m3:tally-core</include> <include>com.google.protobuf:protobuf-java</include> <include>org.scala-lang:*</include> @@ -181,6 +183,10 @@ <pattern>com.fasterxml.jackson.</pattern> <shadedPattern>org.apache.hudi.com.fasterxml.jackson.</shadedPattern> </relocation> + <relocation> + <pattern>com.uber.m3.</pattern> + <shadedPattern>org.apache.hudi.com.uber.m3.</shadedPattern> + </relocation> </relocations> <filters> <filter> diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 8e336fb47af..0f7384b775e 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -112,6 +112,9 @@ <include>io.prometheus:simpleclient_dropwizard</include> <include>io.prometheus:simpleclient_pushgateway</include> <include>io.prometheus:simpleclient_common</include> + <include>com.uber.m3:tally-m3</include> + <include>com.uber.m3:tally-core</include> + <include>com.yammer.metrics:metrics-core</include> <include>org.apache.hive:hive-common</include> @@ -201,6 +204,10 @@ <pattern>org.roaringbitmap.</pattern> <shadedPattern>org.apache.hudi.org.roaringbitmap.</shadedPattern> </relocation> + <relocation> + <pattern>com.uber.m3.</pattern> + <shadedPattern>org.apache.hudi.com.uber.m3.</shadedPattern> + </relocation> </relocations> <filters> <filter> diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index daa5abef154..c22122fc698 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -141,6 +141,8 @@ <include>io.prometheus:simpleclient_dropwizard</include> <include>io.prometheus:simpleclient_pushgateway</include> <include>io.prometheus:simpleclient_common</include> + <include>com.uber.m3:tally-m3</include> + <include>com.uber.m3:tally-core</include> <include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include> <include>org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version}</include> <include>org.apache.kafka:kafka_${scala.binary.version}</include> @@ -237,6 +239,10 @@ <pattern>org.roaringbitmap.</pattern> <shadedPattern>org.apache.hudi.org.roaringbitmap.</shadedPattern> </relocation> + <relocation> + <pattern>com.uber.m3.</pattern> + <shadedPattern>org.apache.hudi.com.uber.m3.</shadedPattern> + </relocation> </relocations> <filters> <filter> diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml b/packaging/hudi-utilities-slim-bundle/pom.xml index 21bea614efb..49fc8237afe 100644 --- a/packaging/hudi-utilities-slim-bundle/pom.xml +++ b/packaging/hudi-utilities-slim-bundle/pom.xml @@ -127,6 +127,8 @@ <include>io.prometheus:simpleclient_dropwizard</include> <include>io.prometheus:simpleclient_pushgateway</include> <include>io.prometheus:simpleclient_common</include> + <include>com.uber.m3:tally-m3</include> + <include>com.uber.m3:tally-core</include> <include>org.apache.spark:spark-streaming-kafka-0-10_${scala.binary.version}</include> <include>org.apache.spark:spark-token-provider-kafka-0-10_${scala.binary.version}</include> <include>org.apache.kafka:kafka_${scala.binary.version}</include> @@ -196,6 +198,10 @@ <pattern>com.google.protobuf.</pattern> <shadedPattern>org.apache.hudi.com.google.protobuf.</shadedPattern> </relocation> + <relocation> + <pattern>com.uber.m3.</pattern> + <shadedPattern>org.apache.hudi.com.uber.m3.</shadedPattern> + </relocation> </relocations> <filters> <filter> diff --git a/pom.xml b/pom.xml index 9158d65a890..d6c1bbae706 100644 --- a/pom.xml +++ b/pom.xml @@ -130,6 +130,7 @@ <orc.flink.version>1.5.6</orc.flink.version> <roaringbitmap.version>0.9.47</roaringbitmap.version> <airlift.version>0.25</airlift.version> + <tally.version>0.13.0</tally.version> <prometheus.version>0.8.0</prometheus.version> <aws.sdk.httpclient.version>4.5.13</aws.sdk.httpclient.version> <aws.sdk.httpcore.version>4.4.13</aws.sdk.httpcore.version> @@ -1110,7 +1111,6 @@ <artifactId>metrics-jmx</artifactId> <version>${metrics.version}</version> </dependency> - <dependency> <groupId>io.prometheus</groupId> <artifactId>simpleclient</artifactId> @@ -1131,6 +1131,16 @@ <artifactId>simpleclient_pushgateway</artifactId> <version>${prometheus.version}</version> </dependency> + <dependency> + <groupId>com.uber.m3</groupId> + <artifactId>tally-m3</artifactId> + <version>${tally.version}</version> + </dependency> + <dependency> + <groupId>com.uber.m3</groupId> + <artifactId>tally-core</artifactId> + <version>${tally.version}</version> + </dependency> <dependency> <groupId>com.beust</groupId>
