This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch metrics-refactor-preview in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 514ec30144122c34b8f5497addef9a3f0c698bf5 Author: tsreaper <[email protected]> AuthorDate: Fri Oct 20 17:25:41 2023 +0800 [core] Refactor metrics to remove Metrics singleton --- .../apache/paimon/metrics/AbstractMetricGroup.java | 203 --------------------- .../paimon/metrics/AbstractMetricRegistry.java | 50 +++++ .../org/apache/paimon/metrics/MetricGroup.java | 27 +-- .../org/apache/paimon/metrics/MetricGroupImpl.java | 122 +++++++++++++ ...ricMetricGroup.java => MetricRegistryImpl.java} | 27 +-- .../java/org/apache/paimon/metrics/Metrics.java | 63 ------- .../paimon/metrics/groups/BucketMetricGroup.java | 50 ----- .../apache/paimon/operation/FileStoreCommit.java | 2 +- .../paimon/operation/FileStoreCommitImpl.java | 4 +- .../metrics}/CommitMetrics.java | 61 +++---- .../commit => operation/metrics}/CommitStats.java | 2 +- .../apache/paimon/table/sink/InnerTableCommit.java | 4 + .../apache/paimon/table/sink/TableCommitImpl.java | 25 ++- ...cGroupTest.java => MetricRegistryImplTest.java} | 36 ++-- .../metrics/groups/BucketMetricGroupTest.java | 44 ----- .../metrics}/CommitMetricsTest.java | 59 +----- .../metrics}/CommitStatsTest.java | 4 +- 17 files changed, 252 insertions(+), 531 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java deleted file mode 100644 index 41980ff3e..000000000 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.metrics; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Contains key functionality for adding metrics and carries metrics. - * - * <p>A MetricGroup can be {@link #close() closed}. Upon closing, the group de-register all metrics. - * - * <p>The {@link #close()} method and {@link #addMetric(String, Metric)} method should never be - * invoked by multiple threads at the same time, {@link #addMetric(String, Metric)} and {@link - * #getMetrics()} have multi-threads problems, like the reporter is reading the metrics map and the - * group is adding metrics to the map at the same time. - */ -public abstract class AbstractMetricGroup implements MetricGroup { - protected static final Logger LOG = LoggerFactory.getLogger(MetricGroup.class); - - // ------------------------------------------------------------------------ - - /** The map containing all tags and their associated values. */ - protected Map<String, String> tags; - - /** Flag indicating whether this group has been closed. */ - private boolean closed = false; - - private final ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<>(); - - // ------------------------------------------------------------------------ - - public AbstractMetricGroup(Map<String, String> tags) { - this.tags = tags; - Metrics.getInstance().addGroup(this); - } - - @Override - public Map<String, String> getAllTags() { - return tags; - } - - /** - * Returns the fully qualified metric name using the configured delimiter for the reporter with - * the given index, for example {@code "commit.metricName"}. - * - * @param metricName metric name - * @param delimiter delimiter to use - * @return fully qualified metric name - */ - public String getMetricIdentifier(String metricName, String delimiter) { - return String.join(delimiter, getGroupName(), metricName); - } - - /** - * Creates and registers a new {@link org.apache.paimon.metrics.Counter} or return the existing - * {@link org.apache.paimon.metrics.Counter}. - * - * @param name name of the counter - * @return the created or existing counter - */ - public Counter counter(String name) { - return counter(name, new SimpleCounter()); - } - - /** - * Registers a {@link org.apache.paimon.metrics.Counter}. - * - * @param name name of the counter - * @param counter counter to register - * @param <C> counter type - * @return the given counter - */ - public <C extends Counter> C counter(String name, C counter) { - return (C) addMetric(name, counter); - } - - /** - * Registers a new {@link org.apache.paimon.metrics.Gauge}. - * - * @param name name of the gauge - * @param gauge gauge to register - * @param <T> return type of the gauge - * @return the given gauge - */ - public <T, G extends Gauge<T>> G gauge(String name, G gauge) { - return (G) addMetric(name, gauge); - } - - /** - * Registers a new {@link Histogram} with Paimon. - * - * @param name name of the histogram - * @param histogram histogram to register - * @param <H> histogram type - * @return the registered histogram - */ - public <H extends Histogram> H histogram(String name, H histogram) { - return (H) addMetric(name, histogram); - } - - /** - * Adds the given metric to the group and registers it at the registry, if the group is not yet - * closed, and if no metric with the same name has been registered before. - * - * @param metricName the name to register the metric under - * @param metric the metric to register - */ - protected Metric addMetric(String metricName, Metric metric) { - if (metric == null) { - LOG.warn( - "Ignoring attempted registration of a metric due to being null for name {}.", - metricName); - return null; - } - // add the metric only if the group is still open - if (!isClosed()) { - switch (metric.getMetricType()) { - case COUNTER: - case GAUGE: - case HISTOGRAM: - // immediately put without a 'contains' check to optimize the common case - // (no collision), collisions are resolved later - Metric prior = metrics.put(metricName, metric); - - // check for collisions with other metric names - if (prior != null) { - // we had a collision. put back the original value - metrics.put(metricName, prior); - - // we warn here, rather than failing, because metrics are tools that - // should not fail the program when used incorrectly - LOG.warn( - "Name collision: Group already contains a Metric with the name '" - + metricName - + "'. The new added Metric will not be reported."); - } - break; - default: - LOG.warn( - "Cannot add unknown metric type {}. This indicates that the paimon " - + "does not support this metric type.", - metric.getClass().getName()); - } - } - return metrics.get(metricName); - } - - @Override - public Map<String, Metric> getMetrics() { - return Collections.unmodifiableMap(metrics); - } - - /** - * Returns the name for this group, meaning what kind of entity it represents, for example - * "commit". - * - * @return logical name for this group - */ - public abstract String getGroupName(); - - public void close() { - if (!closed) { - closed = true; - metrics.clear(); - Metrics.getInstance().removeGroup(this); - } - } - - public final boolean isClosed() { - return closed; - } - - @Override - public String toString() { - return "MetricGroup{" - + "groupName=" - + getGroupName() - + ", metrics=" - + String.join(",", metrics.keySet()) - + '}'; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricRegistry.java b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricRegistry.java new file mode 100644 index 000000000..4389ac003 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricRegistry.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** Factory to create {@link MetricGroup}s. */ +public abstract class AbstractMetricRegistry { + + private static final String KEY_TABLE = "table"; + private static final String KEY_PARTITION = "partition"; + private static final String KEY_BUCKET = "bucket"; + + public MetricGroup tableMetricGroup(String groupName, String tableName) { + Map<String, String> variables = new LinkedHashMap<>(); + variables.put(KEY_TABLE, tableName); + + return createMetricGroup(groupName, variables); + } + + public MetricGroup bucketMetricGroup( + String groupName, String tableName, String partition, int bucket) { + Map<String, String> variables = new LinkedHashMap<>(); + variables.put(KEY_TABLE, tableName); + variables.put(KEY_PARTITION, partition); + variables.put(KEY_BUCKET, String.valueOf(bucket)); + + return createMetricGroup(groupName, variables); + } + + protected abstract MetricGroup createMetricGroup( + String groupName, Map<String, String> variables); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java index c980f0be7..53f27d7c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java @@ -38,16 +38,6 @@ public interface MetricGroup { */ Counter counter(String name); - /** - * Registers a {@link org.apache.paimon.metrics.Counter} with Paimon. - * - * @param name name of the counter - * @param counter counter to register - * @param <C> counter type - * @return the given counter - */ - <C extends Counter> C counter(String name, C counter); - /** * Registers a new {@link org.apache.paimon.metrics.Gauge} with Paimon. * @@ -56,28 +46,19 @@ public interface MetricGroup { * @param <T> return type of the gauge * @return the given gauge */ - <T, G extends Gauge<T>> G gauge(String name, G gauge); + <T> Gauge<T> gauge(String name, Gauge<T> gauge); /** * Registers a new {@link Histogram} with Paimon. * * @param name name of the histogram - * @param histogram histogram to register - * @param <H> histogram type + * @param windowSize number of records this histogram keeps * @return the registered histogram */ - <H extends Histogram> H histogram(String name, H histogram); - - /** - * Returns the fully qualified metric name, for example {@code "myTable.bucket-1.metricName"}. - * - * @param metricName metric name - * @return fully qualified metric name - */ - String getMetricIdentifier(String metricName, String delimiter); + Histogram histogram(String name, int windowSize); /** Returns a map of all variables and their associated value. */ - Map<String, String> getAllTags(); + Map<String, String> getAllVariables(); /** * Returns the name for this group, meaning what kind of entity it represents, for example diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroupImpl.java b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroupImpl.java new file mode 100644 index 000000000..e68ab5200 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroupImpl.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.metrics; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** Default implementation of {@link MetricGroup}. */ +public class MetricGroupImpl implements MetricGroup { + + private static final Logger LOG = LoggerFactory.getLogger(MetricGroupImpl.class); + + private final String groupName; + private final Map<String, String> variables; + private final Map<String, Metric> metrics; + + public MetricGroupImpl(String groupName) { + this(groupName, new HashMap<>()); + } + + public MetricGroupImpl(String groupName, Map<String, String> variables) { + this.groupName = groupName; + this.variables = variables; + this.metrics = new HashMap<>(); + } + + @Override + public String getGroupName() { + return groupName; + } + + @Override + public Map<String, String> getAllVariables() { + return Collections.unmodifiableMap(variables); + } + + @Override + public Counter counter(String name) { + return (Counter) addMetric(name, new SimpleCounter()); + } + + @SuppressWarnings("unchecked") + @Override + public <T> Gauge<T> gauge(String name, Gauge<T> gauge) { + return (Gauge<T>) addMetric(name, gauge); + } + + @Override + public Histogram histogram(String name, int windowSize) { + return (Histogram) addMetric(name, new DescriptiveStatisticsHistogram(windowSize)); + } + + /** + * Adds the given metric to the group and registers it at the registry, if the group is not yet + * closed, and if no metric with the same name has been registered before. + * + * @param metricName the name to register the metric under + * @param metric the metric to register + */ + private Metric addMetric(String metricName, Metric metric) { + if (metric == null) { + LOG.warn( + "Ignoring attempted registration of a metric due to being null for name {}.", + metricName); + return null; + } + // add the metric only if the group is still open + switch (metric.getMetricType()) { + case COUNTER: + case GAUGE: + case HISTOGRAM: + // immediately put without a 'contains' check to optimize the common case + // (no collision), collisions are resolved later + Metric prior = metrics.put(metricName, metric); + + // check for collisions with other metric names + if (prior != null) { + // we had a collision. put back the original value + metrics.put(metricName, prior); + + // we warn here, rather than failing, because metrics are tools that + // should not fail the program when used incorrectly + LOG.warn( + "Name collision: Group already contains a Metric with the name '" + + metricName + + "'. The new added Metric will not be reported."); + } + break; + default: + LOG.warn( + "Cannot add unknown metric type {}. This indicates that the paimon " + + "does not support this metric type.", + metric.getClass().getName()); + } + return metrics.get(metricName); + } + + @Override + public Map<String, Metric> getMetrics() { + return Collections.unmodifiableMap(metrics); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistryImpl.java similarity index 52% rename from paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java rename to paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistryImpl.java index f1d5c3157..2e753a371 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java +++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistryImpl.java @@ -16,32 +16,15 @@ * limitations under the License. */ -package org.apache.paimon.metrics.groups; +package org.apache.paimon.metrics; -import org.apache.paimon.metrics.AbstractMetricGroup; - -import java.util.HashMap; import java.util.Map; -/** A simple named {@link org.apache.paimon.metrics.MetricGroup} that is untagged. */ -public class GenericMetricGroup extends AbstractMetricGroup { - - private final String groupName; - - GenericMetricGroup(final Map<String, String> tags, final String groupName) { - super(tags); - this.groupName = groupName; - } - - public static GenericMetricGroup createGenericMetricGroup( - final String tableName, final String groupName) { - Map<String, String> tags = new HashMap<>(); - tags.put("table", tableName); - return new GenericMetricGroup(tags, groupName); - } +/** Default implementation of {@link AbstractMetricRegistry}. */ +public class MetricRegistryImpl extends AbstractMetricRegistry { @Override - public String getGroupName() { - return groupName; + protected MetricGroup createMetricGroup(String groupName, Map<String, String> variables) { + return new MetricGroupImpl(groupName, variables); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java deleted file mode 100644 index 379f3d6c6..000000000 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.metrics; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.stream.Collectors; - -/** Core of Paimon metrics system. */ -public class Metrics { - private static volatile Metrics instance = new Metrics(); - - /** - * The metrics groups. All the commit & compaction & scan metric groups are collected in this - * group container, there is no need to distinguish the groups by group name for reporters. - */ - private final ConcurrentLinkedQueue<MetricGroup> metricGroups = new ConcurrentLinkedQueue<>(); - - private Metrics() {} - - public static Metrics getInstance() { - return instance; - } - - /** - * Add a metric group. Which is called by metrics instances, like commit / compaction metrics - * instances. - */ - public void addGroup(AbstractMetricGroup group) { - metricGroups.add(group); - } - - /** Remove a metric group. Called when closing the corresponding instances, like committer. */ - public void removeGroup(AbstractMetricGroup group) { - metricGroups.remove(group); - } - - /** Get metric groups. */ - public ConcurrentLinkedQueue<MetricGroup> getMetricGroups() { - return metricGroups; - } - - public static String groupsInfo() { - return getInstance().getMetricGroups().stream() - .map(Object::toString) - .collect(Collectors.joining(", ", "[", "]")); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/BucketMetricGroup.java b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/BucketMetricGroup.java deleted file mode 100644 index e31d7f385..000000000 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/BucketMetricGroup.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.metrics.groups; - -import org.apache.paimon.metrics.AbstractMetricGroup; - -import java.util.HashMap; -import java.util.Map; - -/** Special {@link org.apache.paimon.metrics.MetricGroup} representing a bucket. */ -public class BucketMetricGroup extends AbstractMetricGroup { - private final String groupName; - - // ------------------------------------------------------------------------ - - BucketMetricGroup(final Map<String, String> tags, final String groupName) { - super(tags); - this.groupName = groupName; - } - - public static BucketMetricGroup createBucketMetricGroup( - final String table, final int bucket, final String partition, final String groupName) { - Map<String, String> tags = new HashMap<>(); - tags.put("table", table); - tags.put("bucket", String.valueOf(bucket)); - tags.put("partition", partition); - return new BucketMetricGroup(tags, groupName); - } - - @Override - public String getGroupName() { - return this.groupName; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index 31d22c37e..a1025d9f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -20,7 +20,7 @@ package org.apache.paimon.operation; import org.apache.paimon.Snapshot; import org.apache.paimon.manifest.ManifestCommittable; -import org.apache.paimon.metrics.commit.CommitMetrics; +import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.table.sink.CommitMessage; import java.util.List; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 77c22687c..b137798d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -34,8 +34,8 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; -import org.apache.paimon.metrics.commit.CommitMetrics; -import org.apache.paimon.metrics.commit.CommitStats; +import org.apache.paimon.operation.metrics.CommitMetrics; +import org.apache.paimon.operation.metrics.CommitStats; import org.apache.paimon.options.MemorySize; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java similarity index 78% rename from paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java rename to paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java index a216a94e2..4386a4290 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java @@ -16,35 +16,32 @@ * limitations under the License. */ -package org.apache.paimon.metrics.commit; +package org.apache.paimon.operation.metrics; import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.metrics.AbstractMetricGroup; -import org.apache.paimon.metrics.DescriptiveStatisticsHistogram; +import org.apache.paimon.metrics.AbstractMetricRegistry; import org.apache.paimon.metrics.Histogram; -import org.apache.paimon.metrics.groups.GenericMetricGroup; +import org.apache.paimon.metrics.MetricGroup; /** Metrics to measure a commit. */ public class CommitMetrics { + private static final int HISTOGRAM_WINDOW_SIZE = 10_000; - protected static final String GROUP_NAME = "commit"; + private static final String GROUP_NAME = "commit"; - private final AbstractMetricGroup genericMetricGroup; + private final MetricGroup metricGroup; - public CommitMetrics(String tableName) { - this.genericMetricGroup = - GenericMetricGroup.createGenericMetricGroup(tableName, GROUP_NAME); + public CommitMetrics(AbstractMetricRegistry registry, String tableName) { + this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName); registerGenericCommitMetrics(); } @VisibleForTesting - public AbstractMetricGroup getMetricGroup() { - return genericMetricGroup; + public MetricGroup getMetricGroup() { + return metricGroup; } - private final Histogram durationHistogram = - new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE); - + private Histogram durationHistogram; private CommitStats latestCommit; @VisibleForTesting static final String LAST_COMMIT_DURATION = "lastCommitDuration"; @@ -80,48 +77,48 @@ public class CommitMetrics { @VisibleForTesting static final String LAST_BUCKETS_WRITTEN = "lastBucketsWritten"; private void registerGenericCommitMetrics() { - genericMetricGroup.gauge( + metricGroup.gauge( LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L : latestCommit.getDuration()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_COMMIT_ATTEMPTS, () -> latestCommit == null ? 0L : latestCommit.getAttempts()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_GENERATED_SNAPSHOTS, () -> latestCommit == null ? 0L : latestCommit.getGeneratedSnapshots()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_PARTITIONS_WRITTEN, () -> latestCommit == null ? 0L : latestCommit.getNumPartitionsWritten()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_BUCKETS_WRITTEN, () -> latestCommit == null ? 0L : latestCommit.getNumBucketsWritten()); - genericMetricGroup.histogram(COMMIT_DURATION, durationHistogram); - genericMetricGroup.gauge( + durationHistogram = metricGroup.histogram(COMMIT_DURATION, HISTOGRAM_WINDOW_SIZE); + metricGroup.gauge( LAST_TABLE_FILES_ADDED, () -> latestCommit == null ? 0L : latestCommit.getTableFilesAdded()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_TABLE_FILES_DELETED, () -> latestCommit == null ? 0L : latestCommit.getTableFilesDeleted()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_TABLE_FILES_APPENDED, () -> latestCommit == null ? 0L : latestCommit.getTableFilesAppended()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_TABLE_FILES_COMMIT_COMPACTED, () -> latestCommit == null ? 0L : latestCommit.getTableFilesCompacted()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_CHANGELOG_FILES_APPENDED, () -> latestCommit == null ? 0L : latestCommit.getChangelogFilesAppended()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_CHANGELOG_FILES_COMMIT_COMPACTED, () -> latestCommit == null ? 0L : latestCommit.getChangelogFilesCompacted()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_DELTA_RECORDS_APPENDED, () -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsAppended()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_CHANGELOG_RECORDS_APPENDED, () -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsAppended()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_DELTA_RECORDS_COMMIT_COMPACTED, () -> latestCommit == null ? 0L : latestCommit.getDeltaRecordsCompacted()); - genericMetricGroup.gauge( + metricGroup.gauge( LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED, () -> latestCommit == null ? 0L : latestCommit.getChangelogRecordsCompacted()); } @@ -130,8 +127,4 @@ public class CommitMetrics { latestCommit = commitStats; durationHistogram.update(commitStats.getDuration()); } - - public void close() { - this.genericMetricGroup.close(); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java similarity index 99% rename from paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java rename to paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java index 0a2772f63..a19d93508 100644 --- a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.metrics.commit; +package org.apache.paimon.operation.metrics; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java index 8a8033aed..6b14dd29e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java @@ -18,6 +18,8 @@ package org.apache.paimon.table.sink; +import org.apache.paimon.metrics.AbstractMetricRegistry; + import javax.annotation.Nullable; import java.util.Map; @@ -41,4 +43,6 @@ public interface InnerTableCommit extends StreamTableCommit, BatchTableCommit { * generated regardless of the configuration (No one trigger commit interface). */ InnerTableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit); + + InnerTableCommit withMetricRegistry(AbstractMetricRegistry registry); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index 19e95bb0c..9951112a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -21,11 +21,12 @@ package org.apache.paimon.table.sink; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.manifest.ManifestCommittable; -import org.apache.paimon.metrics.commit.CommitMetrics; +import org.apache.paimon.metrics.AbstractMetricRegistry; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreExpire; import org.apache.paimon.operation.Lock; import org.apache.paimon.operation.PartitionExpire; +import org.apache.paimon.operation.metrics.CommitMetrics; import org.apache.paimon.tag.TagAutoCreation; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.IOUtils; @@ -70,13 +71,13 @@ public class TableCommitImpl implements InnerTableCommit { @Nullable private final Duration consumerExpireTime; private final ConsumerManager consumerManager; - @Nullable private Map<String, String> overwritePartition = null; + private final ExecutorService expireMainExecutor; + private final AtomicReference<Throwable> expireError; - private boolean batchCommitted = false; + private final String tableName; - private ExecutorService expireMainExecutor; - private AtomicReference<Throwable> expireError; - private final CommitMetrics commitMetrics; + @Nullable private Map<String, String> overwritePartition = null; + private boolean batchCommitted = false; public TableCommitImpl( FileStoreCommit commit, @@ -89,8 +90,7 @@ public class TableCommitImpl implements InnerTableCommit { ConsumerManager consumerManager, ExpireExecutionMode expireExecutionMode, String tableName) { - this.commitMetrics = new CommitMetrics(tableName); - commit.withLock(lock).withMetrics(commitMetrics); + commit.withLock(lock); if (expire != null) { expire.withLock(lock); } @@ -115,6 +115,8 @@ public class TableCommitImpl implements InnerTableCommit { new ExecutorThreadFactory( Thread.currentThread().getName() + "expire-main-thread")); this.expireError = new AtomicReference<>(null); + + this.tableName = tableName; } @Override @@ -129,6 +131,12 @@ public class TableCommitImpl implements InnerTableCommit { return this; } + @Override + public InnerTableCommit withMetricRegistry(AbstractMetricRegistry registry) { + commit.withMetrics(new CommitMetrics(registry, tableName)); + return this; + } + @Override public Set<Long> filterCommitted(Set<Long> commitIdentifiers) { return commit.filterCommitted(commitIdentifiers); @@ -265,7 +273,6 @@ public class TableCommitImpl implements InnerTableCommit { } IOUtils.closeQuietly(lock); expireMainExecutor.shutdownNow(); - commitMetrics.close(); } @Override diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricRegistryImplTest.java similarity index 61% rename from paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java rename to paimon-core/src/test/java/org/apache/paimon/metrics/MetricRegistryImplTest.java index 7db7fc03c..27f9e70ec 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricRegistryImplTest.java @@ -18,26 +18,26 @@ package org.apache.paimon.metrics; -import org.apache.paimon.metrics.groups.GenericMetricGroup; - import org.junit.jupiter.api.Test; import java.util.HashMap; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for the {@link MetricGroup}. */ -public class MetricGroupTest { +/** Tests for the {@link MetricRegistryImpl}. */ +public class MetricRegistryImplTest { + @Test public void testGroupRegisterMetrics() { - GenericMetricGroup group = GenericMetricGroup.createGenericMetricGroup("myTable", "commit"); - assertThat(group.isClosed()).isFalse(); + MetricRegistryImpl registry = new MetricRegistryImpl(); + MetricGroup group = registry.tableMetricGroup("commit", "myTable"); + // these will fail is the registration is propagated group.counter("testcounter"); group.gauge("testgauge", () -> null); assertThat(group.getGroupName()).isEqualTo("commit"); - assertThat(group.getAllTags().size()).isEqualTo(1); - assertThat(group.getAllTags()) + assertThat(group.getAllVariables().size()).isEqualTo(1); + assertThat(group.getAllVariables()) .containsExactlyEntriesOf( new HashMap<String, String>() { { @@ -45,28 +45,16 @@ public class MetricGroupTest { } }); assertThat(group.getMetrics().size()).isEqualTo(2); - group.close(); - assertThat(group.isClosed()).isTrue(); } @Test public void testTolerateMetricNameCollisions() { final String name = "abctestname"; - GenericMetricGroup group = GenericMetricGroup.createGenericMetricGroup("myTable", "commit"); - - Counter counter1 = group.counter(name); + MetricRegistryImpl registry = new MetricRegistryImpl(); + MetricGroup group = registry.tableMetricGroup("commit", "myTable"); + Counter counter = group.counter(name); // return the old one with the metric name collision - assertThat(group.counter(name)).isSameAs(counter1); - group.close(); - } - - @Test - public void testAddAndRemoveMetricGroups() { - AbstractMetricGroup metricGroup = - GenericMetricGroup.createGenericMetricGroup("myTable", "commit"); - assertThat(Metrics.getInstance().getMetricGroups()).contains(metricGroup); - metricGroup.close(); - assertThat(Metrics.getInstance().getMetricGroups()).doesNotContain(metricGroup); + assertThat(group.counter(name)).isSameAs(counter); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java b/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java deleted file mode 100644 index ba32302d9..000000000 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.metrics.groups; - -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for the {@link BucketMetricGroup}. */ -public class BucketMetricGroupTest { - // ------------------------------------------------------------------------ - // scope name tests - // ------------------------------------------------------------------------ - - @Test - public void testGenerateScopeDefault() { - BucketMetricGroup group = - BucketMetricGroup.createBucketMetricGroup("myTable", 1, "dt=1", "commit"); - - assertThat(group.getAllTags().size()).isEqualTo(3); - assertThat(group.getAllTags().get("table")).isEqualTo("myTable"); - assertThat(group.getAllTags().get("bucket")).isEqualTo("1"); - assertThat(group.getAllTags().get("partition")).isEqualTo("dt=1"); - assertThat(group.getMetricIdentifier("myMetric", ".")).isEqualTo("commit.myMetric"); - assertThat(group.getGroupName()).isEqualTo("commit"); - group.close(); - } -} diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java similarity index 82% rename from paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java rename to paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java index dead22c2e..476790a8c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java @@ -16,18 +16,15 @@ * limitations under the License. */ -package org.apache.paimon.metrics.commit; +package org.apache.paimon.operation.metrics; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.metrics.Gauge; import org.apache.paimon.metrics.Histogram; import org.apache.paimon.metrics.Metric; -import org.apache.paimon.metrics.MetricGroup; -import org.apache.paimon.metrics.Metrics; +import org.apache.paimon.metrics.MetricRegistryImpl; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.ArrayList; @@ -40,58 +37,14 @@ import static org.assertj.core.api.Assertions.offset; /** Tests for {@link CommitMetrics}. */ public class CommitMetricsTest { - private static final String TABLE_NAME = "myTable"; - - private CommitMetrics commitMetrics; - - @BeforeEach - public void beforeEach() { - commitMetrics = getCommitMetrics(); - } - - @AfterEach - public void afterEach() { - commitMetrics.close(); - } - /** Tests the registration of the commit metrics. */ - @Test - public void testGenericMetricsRegistration() { - MetricGroup genericMetricGroup = commitMetrics.getMetricGroup(); - assertThat(Metrics.getInstance().getMetricGroups().size()) - .withFailMessage( - String.format( - "Please close the created metric groups %s in case of metrics resource leak.", - Metrics.groupsInfo())) - .isEqualTo(1); - assertThat(genericMetricGroup.getGroupName()).isEqualTo(CommitMetrics.GROUP_NAME); - Map<String, Metric> registeredMetrics = genericMetricGroup.getMetrics(); - assertThat(registeredMetrics.keySet()) - .containsExactlyInAnyOrder( - CommitMetrics.LAST_COMMIT_DURATION, - CommitMetrics.LAST_COMMIT_ATTEMPTS, - CommitMetrics.LAST_GENERATED_SNAPSHOTS, - CommitMetrics.LAST_PARTITIONS_WRITTEN, - CommitMetrics.LAST_BUCKETS_WRITTEN, - CommitMetrics.COMMIT_DURATION, - CommitMetrics.LAST_TABLE_FILES_ADDED, - CommitMetrics.LAST_TABLE_FILES_DELETED, - CommitMetrics.LAST_TABLE_FILES_APPENDED, - CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED, - CommitMetrics.LAST_CHANGELOG_FILES_APPENDED, - CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED, - CommitMetrics.LAST_DELTA_RECORDS_APPENDED, - CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED, - CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED, - CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED); - - reportOnce(commitMetrics); - assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1); - } + private static final String TABLE_NAME = "myTable"; /** Tests that the metrics are updated properly. */ + @SuppressWarnings("unchecked") @Test public void testMetricsAreUpdated() { + CommitMetrics commitMetrics = getCommitMetrics(); Map<String, Metric> registeredGenericMetrics = commitMetrics.getMetricGroup().getMetrics(); // Check initial values @@ -281,6 +234,6 @@ public class CommitMetricsTest { } private CommitMetrics getCommitMetrics() { - return new CommitMetrics(TABLE_NAME); + return new CommitMetrics(new MetricRegistryImpl(), TABLE_NAME); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java similarity index 98% rename from paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java rename to paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java index 640dba08f..e4a2a7fd2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.metrics.commit; +package org.apache.paimon.operation.metrics; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; @@ -32,7 +32,7 @@ import static org.apache.paimon.io.DataFileTestUtils.row; import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link CommitStats}. */ +/** Tests for {@link org.apache.paimon.operation.metrics.CommitStats}. */ public class CommitStatsTest { private static List<ManifestEntry> files = new ArrayList<>(); private static List<ManifestEntry> appendDataFiles = new ArrayList<>();
