This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 66c2c8661 [core] Refactor metrics to remove Metrics singleton (#2169)
66c2c8661 is described below
commit 66c2c86610daecc6516d3cf3fcbd33e10a411d82
Author: tsreaper <[email protected]>
AuthorDate: Wed Oct 25 14:09:00 2023 +0800
[core] Refactor metrics to remove Metrics singleton (#2169)
This closes #2169.
---
.../apache/paimon/metrics/AbstractMetricGroup.java | 203 ---------------------
.../org/apache/paimon/metrics/MetricGroup.java | 27 +--
.../org/apache/paimon/metrics/MetricGroupImpl.java | 122 +++++++++++++
.../org/apache/paimon/metrics/MetricRegistry.java | 50 +++++
...ricMetricGroup.java => MetricRegistryImpl.java} | 27 +--
.../java/org/apache/paimon/metrics/Metrics.java | 63 -------
.../paimon/metrics/groups/BucketMetricGroup.java | 50 -----
.../apache/paimon/operation/FileStoreCommit.java | 2 +-
.../paimon/operation/FileStoreCommitImpl.java | 4 +-
.../metrics}/CommitMetrics.java | 61 +++----
.../commit => operation/metrics}/CommitStats.java | 2 +-
.../apache/paimon/table/sink/InnerTableCommit.java | 4 +
.../apache/paimon/table/sink/TableCommitImpl.java | 25 ++-
...cGroupTest.java => MetricRegistryImplTest.java} | 36 ++--
.../metrics/groups/BucketMetricGroupTest.java | 44 -----
.../metrics}/CommitMetricsTest.java | 59 +-----
.../metrics}/CommitStatsTest.java | 4 +-
17 files changed, 252 insertions(+), 531 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
deleted file mode 100644
index 41980ff3e..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/metrics/AbstractMetricGroup.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.metrics;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Contains key functionality for adding metrics and carries metrics.
- *
- * <p>A MetricGroup can be {@link #close() closed}. Upon closing, the group
de-register all metrics.
- *
- * <p>The {@link #close()} method and {@link #addMetric(String, Metric)}
method should never be
- * invoked by multiple threads at the same time, {@link #addMetric(String,
Metric)} and {@link
- * #getMetrics()} have multi-threads problems, like the reporter is reading
the metrics map and the
- * group is adding metrics to the map at the same time.
- */
-public abstract class AbstractMetricGroup implements MetricGroup {
- protected static final Logger LOG =
LoggerFactory.getLogger(MetricGroup.class);
-
- // ------------------------------------------------------------------------
-
- /** The map containing all tags and their associated values. */
- protected Map<String, String> tags;
-
- /** Flag indicating whether this group has been closed. */
- private boolean closed = false;
-
- private final ConcurrentHashMap<String, Metric> metrics = new
ConcurrentHashMap<>();
-
- // ------------------------------------------------------------------------
-
- public AbstractMetricGroup(Map<String, String> tags) {
- this.tags = tags;
- Metrics.getInstance().addGroup(this);
- }
-
- @Override
- public Map<String, String> getAllTags() {
- return tags;
- }
-
- /**
- * Returns the fully qualified metric name using the configured delimiter
for the reporter with
- * the given index, for example {@code "commit.metricName"}.
- *
- * @param metricName metric name
- * @param delimiter delimiter to use
- * @return fully qualified metric name
- */
- public String getMetricIdentifier(String metricName, String delimiter) {
- return String.join(delimiter, getGroupName(), metricName);
- }
-
- /**
- * Creates and registers a new {@link org.apache.paimon.metrics.Counter}
or return the existing
- * {@link org.apache.paimon.metrics.Counter}.
- *
- * @param name name of the counter
- * @return the created or existing counter
- */
- public Counter counter(String name) {
- return counter(name, new SimpleCounter());
- }
-
- /**
- * Registers a {@link org.apache.paimon.metrics.Counter}.
- *
- * @param name name of the counter
- * @param counter counter to register
- * @param <C> counter type
- * @return the given counter
- */
- public <C extends Counter> C counter(String name, C counter) {
- return (C) addMetric(name, counter);
- }
-
- /**
- * Registers a new {@link org.apache.paimon.metrics.Gauge}.
- *
- * @param name name of the gauge
- * @param gauge gauge to register
- * @param <T> return type of the gauge
- * @return the given gauge
- */
- public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
- return (G) addMetric(name, gauge);
- }
-
- /**
- * Registers a new {@link Histogram} with Paimon.
- *
- * @param name name of the histogram
- * @param histogram histogram to register
- * @param <H> histogram type
- * @return the registered histogram
- */
- public <H extends Histogram> H histogram(String name, H histogram) {
- return (H) addMetric(name, histogram);
- }
-
- /**
- * Adds the given metric to the group and registers it at the registry, if
the group is not yet
- * closed, and if no metric with the same name has been registered before.
- *
- * @param metricName the name to register the metric under
- * @param metric the metric to register
- */
- protected Metric addMetric(String metricName, Metric metric) {
- if (metric == null) {
- LOG.warn(
- "Ignoring attempted registration of a metric due to being
null for name {}.",
- metricName);
- return null;
- }
- // add the metric only if the group is still open
- if (!isClosed()) {
- switch (metric.getMetricType()) {
- case COUNTER:
- case GAUGE:
- case HISTOGRAM:
- // immediately put without a 'contains' check to optimize
the common case
- // (no collision), collisions are resolved later
- Metric prior = metrics.put(metricName, metric);
-
- // check for collisions with other metric names
- if (prior != null) {
- // we had a collision. put back the original value
- metrics.put(metricName, prior);
-
- // we warn here, rather than failing, because metrics
are tools that
- // should not fail the program when used incorrectly
- LOG.warn(
- "Name collision: Group already contains a
Metric with the name '"
- + metricName
- + "'. The new added Metric will not be
reported.");
- }
- break;
- default:
- LOG.warn(
- "Cannot add unknown metric type {}. This indicates
that the paimon "
- + "does not support this metric type.",
- metric.getClass().getName());
- }
- }
- return metrics.get(metricName);
- }
-
- @Override
- public Map<String, Metric> getMetrics() {
- return Collections.unmodifiableMap(metrics);
- }
-
- /**
- * Returns the name for this group, meaning what kind of entity it
represents, for example
- * "commit".
- *
- * @return logical name for this group
- */
- public abstract String getGroupName();
-
- public void close() {
- if (!closed) {
- closed = true;
- metrics.clear();
- Metrics.getInstance().removeGroup(this);
- }
- }
-
- public final boolean isClosed() {
- return closed;
- }
-
- @Override
- public String toString() {
- return "MetricGroup{"
- + "groupName="
- + getGroupName()
- + ", metrics="
- + String.join(",", metrics.keySet())
- + '}';
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java
index c980f0be7..53f27d7c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroup.java
@@ -38,16 +38,6 @@ public interface MetricGroup {
*/
Counter counter(String name);
- /**
- * Registers a {@link org.apache.paimon.metrics.Counter} with Paimon.
- *
- * @param name name of the counter
- * @param counter counter to register
- * @param <C> counter type
- * @return the given counter
- */
- <C extends Counter> C counter(String name, C counter);
-
/**
* Registers a new {@link org.apache.paimon.metrics.Gauge} with Paimon.
*
@@ -56,28 +46,19 @@ public interface MetricGroup {
* @param <T> return type of the gauge
* @return the given gauge
*/
- <T, G extends Gauge<T>> G gauge(String name, G gauge);
+ <T> Gauge<T> gauge(String name, Gauge<T> gauge);
/**
* Registers a new {@link Histogram} with Paimon.
*
* @param name name of the histogram
- * @param histogram histogram to register
- * @param <H> histogram type
+ * @param windowSize number of records this histogram keeps
* @return the registered histogram
*/
- <H extends Histogram> H histogram(String name, H histogram);
-
- /**
- * Returns the fully qualified metric name, for example {@code
"myTable.bucket-1.metricName"}.
- *
- * @param metricName metric name
- * @return fully qualified metric name
- */
- String getMetricIdentifier(String metricName, String delimiter);
+ Histogram histogram(String name, int windowSize);
/** Returns a map of all variables and their associated value. */
- Map<String, String> getAllTags();
+ Map<String, String> getAllVariables();
/**
* Returns the name for this group, meaning what kind of entity it
represents, for example
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroupImpl.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroupImpl.java
new file mode 100644
index 000000000..17c0802c0
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricGroupImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Default implementation of {@link MetricGroup}. */
+public class MetricGroupImpl implements MetricGroup {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MetricGroupImpl.class);
+
+ private final String groupName;
+ private final Map<String, String> variables;
+ private final Map<String, Metric> metrics;
+
+ public MetricGroupImpl(String groupName) {
+ this(groupName, new HashMap<>());
+ }
+
+ public MetricGroupImpl(String groupName, Map<String, String> variables) {
+ this.groupName = groupName;
+ this.variables = variables;
+ this.metrics = new HashMap<>();
+ }
+
+ @Override
+ public String getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public Map<String, String> getAllVariables() {
+ return Collections.unmodifiableMap(variables);
+ }
+
+ @Override
+ public Counter counter(String name) {
+ return (Counter) addMetric(name, new SimpleCounter());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+ return (Gauge<T>) addMetric(name, gauge);
+ }
+
+ @Override
+ public Histogram histogram(String name, int windowSize) {
+ return (Histogram) addMetric(name, new
DescriptiveStatisticsHistogram(windowSize));
+ }
+
+ /**
+ * Adds the given metric to the group and registers it at the registry, if
the group is not yet
+ * closed, and if no metric with the same name has been registered before.
+ *
+ * @param metricName the name to register the metric under
+ * @param metric the metric to register
+ */
+ private Metric addMetric(String metricName, Metric metric) {
+ if (metric == null) {
+ LOG.warn(
+ "Ignoring attempted registration of a metric due to being
null for name {}.",
+ metricName);
+ return null;
+ }
+
+ switch (metric.getMetricType()) {
+ case COUNTER:
+ case GAUGE:
+ case HISTOGRAM:
+ // immediately put without a 'contains' check to optimize the
common case
+ // (no collision), collisions are resolved later
+ Metric prior = metrics.put(metricName, metric);
+
+ // check for collisions with other metric names
+ if (prior != null) {
+ // we had a collision. put back the original value
+ metrics.put(metricName, prior);
+
+ // we warn here, rather than failing, because metrics are
tools that
+ // should not fail the program when used incorrectly
+ LOG.warn(
+ "Name collision: Group already contains a Metric
with the name '"
+ + metricName
+ + "'. The new added Metric will not be
reported.");
+ }
+ break;
+ default:
+ LOG.warn(
+ "Cannot add unknown metric type {}. This indicates
that the paimon "
+ + "does not support this metric type.",
+ metric.getClass().getName());
+ }
+ return metrics.get(metricName);
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ return Collections.unmodifiableMap(metrics);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
new file mode 100644
index 000000000..60f0e2a07
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistry.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metrics;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Factory to create {@link MetricGroup}s. */
+public abstract class MetricRegistry {
+
+ private static final String KEY_TABLE = "table";
+ private static final String KEY_PARTITION = "partition";
+ private static final String KEY_BUCKET = "bucket";
+
+ public MetricGroup tableMetricGroup(String groupName, String tableName) {
+ Map<String, String> variables = new LinkedHashMap<>();
+ variables.put(KEY_TABLE, tableName);
+
+ return createMetricGroup(groupName, variables);
+ }
+
+ public MetricGroup bucketMetricGroup(
+ String groupName, String tableName, String partition, int bucket) {
+ Map<String, String> variables = new LinkedHashMap<>();
+ variables.put(KEY_TABLE, tableName);
+ variables.put(KEY_PARTITION, partition);
+ variables.put(KEY_BUCKET, String.valueOf(bucket));
+
+ return createMetricGroup(groupName, variables);
+ }
+
+ protected abstract MetricGroup createMetricGroup(
+ String groupName, Map<String, String> variables);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistryImpl.java
similarity index 51%
rename from
paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
rename to
paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistryImpl.java
index f1d5c3157..b815364a8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/GenericMetricGroup.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metrics/MetricRegistryImpl.java
@@ -16,32 +16,15 @@
* limitations under the License.
*/
-package org.apache.paimon.metrics.groups;
+package org.apache.paimon.metrics;
-import org.apache.paimon.metrics.AbstractMetricGroup;
-
-import java.util.HashMap;
import java.util.Map;
-/** A simple named {@link org.apache.paimon.metrics.MetricGroup} that is
untagged. */
-public class GenericMetricGroup extends AbstractMetricGroup {
-
- private final String groupName;
-
- GenericMetricGroup(final Map<String, String> tags, final String groupName)
{
- super(tags);
- this.groupName = groupName;
- }
-
- public static GenericMetricGroup createGenericMetricGroup(
- final String tableName, final String groupName) {
- Map<String, String> tags = new HashMap<>();
- tags.put("table", tableName);
- return new GenericMetricGroup(tags, groupName);
- }
+/** Default implementation of {@link MetricRegistry}. */
+public class MetricRegistryImpl extends MetricRegistry {
@Override
- public String getGroupName() {
- return groupName;
+ protected MetricGroup createMetricGroup(String groupName, Map<String,
String> variables) {
+ return new MetricGroupImpl(groupName, variables);
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
deleted file mode 100644
index 379f3d6c6..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/metrics/Metrics.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.metrics;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
-
-/** Core of Paimon metrics system. */
-public class Metrics {
- private static volatile Metrics instance = new Metrics();
-
- /**
- * The metrics groups. All the commit & compaction & scan metric groups
are collected in this
- * group container, there is no need to distinguish the groups by group
name for reporters.
- */
- private final ConcurrentLinkedQueue<MetricGroup> metricGroups = new
ConcurrentLinkedQueue<>();
-
- private Metrics() {}
-
- public static Metrics getInstance() {
- return instance;
- }
-
- /**
- * Add a metric group. Which is called by metrics instances, like commit /
compaction metrics
- * instances.
- */
- public void addGroup(AbstractMetricGroup group) {
- metricGroups.add(group);
- }
-
- /** Remove a metric group. Called when closing the corresponding
instances, like committer. */
- public void removeGroup(AbstractMetricGroup group) {
- metricGroups.remove(group);
- }
-
- /** Get metric groups. */
- public ConcurrentLinkedQueue<MetricGroup> getMetricGroups() {
- return metricGroups;
- }
-
- public static String groupsInfo() {
- return getInstance().getMetricGroups().stream()
- .map(Object::toString)
- .collect(Collectors.joining(", ", "[", "]"));
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/BucketMetricGroup.java
b/paimon-core/src/main/java/org/apache/paimon/metrics/groups/BucketMetricGroup.java
deleted file mode 100644
index e31d7f385..000000000
---
a/paimon-core/src/main/java/org/apache/paimon/metrics/groups/BucketMetricGroup.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.metrics.groups;
-
-import org.apache.paimon.metrics.AbstractMetricGroup;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/** Special {@link org.apache.paimon.metrics.MetricGroup} representing a
bucket. */
-public class BucketMetricGroup extends AbstractMetricGroup {
- private final String groupName;
-
- // ------------------------------------------------------------------------
-
- BucketMetricGroup(final Map<String, String> tags, final String groupName) {
- super(tags);
- this.groupName = groupName;
- }
-
- public static BucketMetricGroup createBucketMetricGroup(
- final String table, final int bucket, final String partition,
final String groupName) {
- Map<String, String> tags = new HashMap<>();
- tags.put("table", table);
- tags.put("bucket", String.valueOf(bucket));
- tags.put("partition", partition);
- return new BucketMetricGroup(tags, groupName);
- }
-
- @Override
- public String getGroupName() {
- return this.groupName;
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 31d22c37e..a1025d9f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -20,7 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.metrics.commit.CommitMetrics;
+import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import java.util.List;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 77c22687c..b137798d3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -34,8 +34,8 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
-import org.apache.paimon.metrics.commit.CommitMetrics;
-import org.apache.paimon.metrics.commit.CommitStats;
+import org.apache.paimon.operation.metrics.CommitMetrics;
+import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
similarity index 78%
rename from
paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
rename to
paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
index a216a94e2..8e408ac8e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitMetrics.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitMetrics.java
@@ -16,35 +16,32 @@
* limitations under the License.
*/
-package org.apache.paimon.metrics.commit;
+package org.apache.paimon.operation.metrics;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.metrics.AbstractMetricGroup;
-import org.apache.paimon.metrics.DescriptiveStatisticsHistogram;
import org.apache.paimon.metrics.Histogram;
-import org.apache.paimon.metrics.groups.GenericMetricGroup;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
/** Metrics to measure a commit. */
public class CommitMetrics {
+
private static final int HISTOGRAM_WINDOW_SIZE = 10_000;
- protected static final String GROUP_NAME = "commit";
+ private static final String GROUP_NAME = "commit";
- private final AbstractMetricGroup genericMetricGroup;
+ private final MetricGroup metricGroup;
- public CommitMetrics(String tableName) {
- this.genericMetricGroup =
- GenericMetricGroup.createGenericMetricGroup(tableName,
GROUP_NAME);
+ public CommitMetrics(MetricRegistry registry, String tableName) {
+ this.metricGroup = registry.tableMetricGroup(GROUP_NAME, tableName);
registerGenericCommitMetrics();
}
@VisibleForTesting
- public AbstractMetricGroup getMetricGroup() {
- return genericMetricGroup;
+ public MetricGroup getMetricGroup() {
+ return metricGroup;
}
- private final Histogram durationHistogram =
- new DescriptiveStatisticsHistogram(HISTOGRAM_WINDOW_SIZE);
-
+ private Histogram durationHistogram;
private CommitStats latestCommit;
@VisibleForTesting static final String LAST_COMMIT_DURATION =
"lastCommitDuration";
@@ -80,48 +77,48 @@ public class CommitMetrics {
@VisibleForTesting static final String LAST_BUCKETS_WRITTEN =
"lastBucketsWritten";
private void registerGenericCommitMetrics() {
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_COMMIT_DURATION, () -> latestCommit == null ? 0L :
latestCommit.getDuration());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_COMMIT_ATTEMPTS, () -> latestCommit == null ? 0L :
latestCommit.getAttempts());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_GENERATED_SNAPSHOTS,
() -> latestCommit == null ? 0L :
latestCommit.getGeneratedSnapshots());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_PARTITIONS_WRITTEN,
() -> latestCommit == null ? 0L :
latestCommit.getNumPartitionsWritten());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_BUCKETS_WRITTEN,
() -> latestCommit == null ? 0L :
latestCommit.getNumBucketsWritten());
- genericMetricGroup.histogram(COMMIT_DURATION, durationHistogram);
- genericMetricGroup.gauge(
+ durationHistogram = metricGroup.histogram(COMMIT_DURATION,
HISTOGRAM_WINDOW_SIZE);
+ metricGroup.gauge(
LAST_TABLE_FILES_ADDED,
() -> latestCommit == null ? 0L :
latestCommit.getTableFilesAdded());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_TABLE_FILES_DELETED,
() -> latestCommit == null ? 0L :
latestCommit.getTableFilesDeleted());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_TABLE_FILES_APPENDED,
() -> latestCommit == null ? 0L :
latestCommit.getTableFilesAppended());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_TABLE_FILES_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L :
latestCommit.getTableFilesCompacted());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_CHANGELOG_FILES_APPENDED,
() -> latestCommit == null ? 0L :
latestCommit.getChangelogFilesAppended());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L :
latestCommit.getChangelogFilesCompacted());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_DELTA_RECORDS_APPENDED,
() -> latestCommit == null ? 0L :
latestCommit.getDeltaRecordsAppended());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_CHANGELOG_RECORDS_APPENDED,
() -> latestCommit == null ? 0L :
latestCommit.getChangelogRecordsAppended());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_DELTA_RECORDS_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L :
latestCommit.getDeltaRecordsCompacted());
- genericMetricGroup.gauge(
+ metricGroup.gauge(
LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED,
() -> latestCommit == null ? 0L :
latestCommit.getChangelogRecordsCompacted());
}
@@ -130,8 +127,4 @@ public class CommitMetrics {
latestCommit = commitStats;
durationHistogram.update(commitStats.getDuration());
}
-
- public void close() {
- this.genericMetricGroup.close();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
similarity index 99%
rename from
paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
rename to
paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
index 0a2772f63..a19d93508 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metrics/commit/CommitStats.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/CommitStats.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.metrics.commit;
+package org.apache.paimon.operation.metrics;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 8a8033aed..8355575d8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -18,6 +18,8 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.metrics.MetricRegistry;
+
import javax.annotation.Nullable;
import java.util.Map;
@@ -41,4 +43,6 @@ public interface InnerTableCommit extends StreamTableCommit,
BatchTableCommit {
* generated regardless of the configuration (No one trigger commit
interface).
*/
InnerTableCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
+
+ InnerTableCommit withMetricRegistry(MetricRegistry registry);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 19e95bb0c..3d6dc98d7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -21,11 +21,12 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
-import org.apache.paimon.metrics.commit.CommitMetrics;
+import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.IOUtils;
@@ -70,13 +71,13 @@ public class TableCommitImpl implements InnerTableCommit {
@Nullable private final Duration consumerExpireTime;
private final ConsumerManager consumerManager;
- @Nullable private Map<String, String> overwritePartition = null;
+ private final ExecutorService expireMainExecutor;
+ private final AtomicReference<Throwable> expireError;
- private boolean batchCommitted = false;
+ private final String tableName;
- private ExecutorService expireMainExecutor;
- private AtomicReference<Throwable> expireError;
- private final CommitMetrics commitMetrics;
+ @Nullable private Map<String, String> overwritePartition = null;
+ private boolean batchCommitted = false;
public TableCommitImpl(
FileStoreCommit commit,
@@ -89,8 +90,7 @@ public class TableCommitImpl implements InnerTableCommit {
ConsumerManager consumerManager,
ExpireExecutionMode expireExecutionMode,
String tableName) {
- this.commitMetrics = new CommitMetrics(tableName);
- commit.withLock(lock).withMetrics(commitMetrics);
+ commit.withLock(lock);
if (expire != null) {
expire.withLock(lock);
}
@@ -115,6 +115,8 @@ public class TableCommitImpl implements InnerTableCommit {
new ExecutorThreadFactory(
Thread.currentThread().getName() +
"expire-main-thread"));
this.expireError = new AtomicReference<>(null);
+
+ this.tableName = tableName;
}
@Override
@@ -129,6 +131,12 @@ public class TableCommitImpl implements InnerTableCommit {
return this;
}
+ @Override
+ public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
+ commit.withMetrics(new CommitMetrics(registry, tableName));
+ return this;
+ }
+
@Override
public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {
return commit.filterCommitted(commitIdentifiers);
@@ -265,7 +273,6 @@ public class TableCommitImpl implements InnerTableCommit {
}
IOUtils.closeQuietly(lock);
expireMainExecutor.shutdownNow();
- commitMetrics.close();
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricRegistryImplTest.java
similarity index 61%
rename from
paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/metrics/MetricRegistryImplTest.java
index 7db7fc03c..27f9e70ec 100644
--- a/paimon-core/src/test/java/org/apache/paimon/metrics/MetricGroupTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/metrics/MetricRegistryImplTest.java
@@ -18,26 +18,26 @@
package org.apache.paimon.metrics;
-import org.apache.paimon.metrics.groups.GenericMetricGroup;
-
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for the {@link MetricGroup}. */
-public class MetricGroupTest {
+/** Tests for the {@link MetricRegistryImpl}. */
+public class MetricRegistryImplTest {
+
@Test
public void testGroupRegisterMetrics() {
- GenericMetricGroup group =
GenericMetricGroup.createGenericMetricGroup("myTable", "commit");
- assertThat(group.isClosed()).isFalse();
+ MetricRegistryImpl registry = new MetricRegistryImpl();
+ MetricGroup group = registry.tableMetricGroup("commit", "myTable");
+
// these will fail is the registration is propagated
group.counter("testcounter");
group.gauge("testgauge", () -> null);
assertThat(group.getGroupName()).isEqualTo("commit");
- assertThat(group.getAllTags().size()).isEqualTo(1);
- assertThat(group.getAllTags())
+ assertThat(group.getAllVariables().size()).isEqualTo(1);
+ assertThat(group.getAllVariables())
.containsExactlyEntriesOf(
new HashMap<String, String>() {
{
@@ -45,28 +45,16 @@ public class MetricGroupTest {
}
});
assertThat(group.getMetrics().size()).isEqualTo(2);
- group.close();
- assertThat(group.isClosed()).isTrue();
}
@Test
public void testTolerateMetricNameCollisions() {
final String name = "abctestname";
- GenericMetricGroup group =
GenericMetricGroup.createGenericMetricGroup("myTable", "commit");
-
- Counter counter1 = group.counter(name);
+ MetricRegistryImpl registry = new MetricRegistryImpl();
+ MetricGroup group = registry.tableMetricGroup("commit", "myTable");
+ Counter counter = group.counter(name);
// return the old one with the metric name collision
- assertThat(group.counter(name)).isSameAs(counter1);
- group.close();
- }
-
- @Test
- public void testAddAndRemoveMetricGroups() {
- AbstractMetricGroup metricGroup =
- GenericMetricGroup.createGenericMetricGroup("myTable",
"commit");
-
assertThat(Metrics.getInstance().getMetricGroups()).contains(metricGroup);
- metricGroup.close();
-
assertThat(Metrics.getInstance().getMetricGroups()).doesNotContain(metricGroup);
+ assertThat(group.counter(name)).isSameAs(counter);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java
b/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java
deleted file mode 100644
index ba32302d9..000000000
---
a/paimon-core/src/test/java/org/apache/paimon/metrics/groups/BucketMetricGroupTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.metrics.groups;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Tests for the {@link BucketMetricGroup}. */
-public class BucketMetricGroupTest {
- // ------------------------------------------------------------------------
- // scope name tests
- // ------------------------------------------------------------------------
-
- @Test
- public void testGenerateScopeDefault() {
- BucketMetricGroup group =
- BucketMetricGroup.createBucketMetricGroup("myTable", 1,
"dt=1", "commit");
-
- assertThat(group.getAllTags().size()).isEqualTo(3);
- assertThat(group.getAllTags().get("table")).isEqualTo("myTable");
- assertThat(group.getAllTags().get("bucket")).isEqualTo("1");
- assertThat(group.getAllTags().get("partition")).isEqualTo("dt=1");
- assertThat(group.getMetricIdentifier("myMetric",
".")).isEqualTo("commit.myMetric");
- assertThat(group.getGroupName()).isEqualTo("commit");
- group.close();
- }
-}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java
similarity index 82%
rename from
paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java
index dead22c2e..476790a8c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitMetricsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitMetricsTest.java
@@ -16,18 +16,15 @@
* limitations under the License.
*/
-package org.apache.paimon.metrics.commit;
+package org.apache.paimon.operation.metrics;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.metrics.Gauge;
import org.apache.paimon.metrics.Histogram;
import org.apache.paimon.metrics.Metric;
-import org.apache.paimon.metrics.MetricGroup;
-import org.apache.paimon.metrics.Metrics;
+import org.apache.paimon.metrics.MetricRegistryImpl;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -40,58 +37,14 @@ import static org.assertj.core.api.Assertions.offset;
/** Tests for {@link CommitMetrics}. */
public class CommitMetricsTest {
- private static final String TABLE_NAME = "myTable";
-
- private CommitMetrics commitMetrics;
-
- @BeforeEach
- public void beforeEach() {
- commitMetrics = getCommitMetrics();
- }
-
- @AfterEach
- public void afterEach() {
- commitMetrics.close();
- }
- /** Tests the registration of the commit metrics. */
- @Test
- public void testGenericMetricsRegistration() {
- MetricGroup genericMetricGroup = commitMetrics.getMetricGroup();
- assertThat(Metrics.getInstance().getMetricGroups().size())
- .withFailMessage(
- String.format(
- "Please close the created metric groups %s in
case of metrics resource leak.",
- Metrics.groupsInfo()))
- .isEqualTo(1);
-
assertThat(genericMetricGroup.getGroupName()).isEqualTo(CommitMetrics.GROUP_NAME);
- Map<String, Metric> registeredMetrics =
genericMetricGroup.getMetrics();
- assertThat(registeredMetrics.keySet())
- .containsExactlyInAnyOrder(
- CommitMetrics.LAST_COMMIT_DURATION,
- CommitMetrics.LAST_COMMIT_ATTEMPTS,
- CommitMetrics.LAST_GENERATED_SNAPSHOTS,
- CommitMetrics.LAST_PARTITIONS_WRITTEN,
- CommitMetrics.LAST_BUCKETS_WRITTEN,
- CommitMetrics.COMMIT_DURATION,
- CommitMetrics.LAST_TABLE_FILES_ADDED,
- CommitMetrics.LAST_TABLE_FILES_DELETED,
- CommitMetrics.LAST_TABLE_FILES_APPENDED,
- CommitMetrics.LAST_TABLE_FILES_COMMIT_COMPACTED,
- CommitMetrics.LAST_CHANGELOG_FILES_APPENDED,
- CommitMetrics.LAST_CHANGELOG_FILES_COMMIT_COMPACTED,
- CommitMetrics.LAST_DELTA_RECORDS_APPENDED,
- CommitMetrics.LAST_CHANGELOG_RECORDS_APPENDED,
- CommitMetrics.LAST_DELTA_RECORDS_COMMIT_COMPACTED,
- CommitMetrics.LAST_CHANGELOG_RECORDS_COMMIT_COMPACTED);
-
- reportOnce(commitMetrics);
-
assertThat(Metrics.getInstance().getMetricGroups().size()).isEqualTo(1);
- }
+ private static final String TABLE_NAME = "myTable";
/** Tests that the metrics are updated properly. */
+ @SuppressWarnings("unchecked")
@Test
public void testMetricsAreUpdated() {
+ CommitMetrics commitMetrics = getCommitMetrics();
Map<String, Metric> registeredGenericMetrics =
commitMetrics.getMetricGroup().getMetrics();
// Check initial values
@@ -281,6 +234,6 @@ public class CommitMetricsTest {
}
private CommitMetrics getCommitMetrics() {
- return new CommitMetrics(TABLE_NAME);
+ return new CommitMetrics(new MetricRegistryImpl(), TABLE_NAME);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java
similarity index 98%
rename from
paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java
index 640dba08f..e4a2a7fd2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/metrics/commit/CommitStatsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/CommitStatsTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.metrics.commit;
+package org.apache.paimon.operation.metrics;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
@@ -32,7 +32,7 @@ import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.apache.paimon.manifest.ManifestFileMetaTestBase.makeEntry;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link CommitStats}. */
+/** Tests for {@link org.apache.paimon.operation.metrics.CommitStats}. */
public class CommitStatsTest {
private static List<ManifestEntry> files = new ArrayList<>();
private static List<ManifestEntry> appendDataFiles = new ArrayList<>();