This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 35897ba2764 [HUDI-6313] Add metrics counters for compaction
requested/completed events. (#8759)
35897ba2764 is described below
commit 35897ba276480b398e35a8c518244e2b81af7d9c
Author: Amrish Lal <[email protected]>
AuthorDate: Tue Jun 20 15:39:13 2023 -0700
[HUDI-6313] Add metrics counters for compaction requested/completed events.
(#8759)
- Add metrics counters for compaction start/stop events so that we can keep
track of how many compactions were requested, how many finished, and how many
produced error (interfered as number of starts - number of finished).
---
.../org/apache/hudi/metrics/HoodieMetrics.java | 24 +++++++++-
.../compact/RunCompactionActionExecutor.java | 14 ++++++
.../table/action/compact/TestHoodieCompactor.java | 52 +++++++++++++++++++---
.../hudi/common/table/timeline/HoodieTimeline.java | 3 +-
4 files changed, 84 insertions(+), 9 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 4c849edcb6c..dac680a5c40 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metrics;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -50,6 +51,8 @@ public class HoodieMetrics {
private String conflictResolutionTimerName = null;
private String conflictResolutionSuccessCounterName = null;
private String conflictResolutionFailureCounterName = null;
+ private String compactionRequestedCounterName = null;
+ private String compactionCompletedCounterName = null;
private HoodieWriteConfig config;
private String tableName;
private Timer rollbackTimer = null;
@@ -64,6 +67,8 @@ public class HoodieMetrics {
private Timer conflictResolutionTimer = null;
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;
+ private Counter compactionRequestedCounter = null;
+ private Counter compactionCompletedCounter = null;
public HoodieMetrics(HoodieWriteConfig config) {
this.config = config;
@@ -82,6 +87,8 @@ public class HoodieMetrics {
this.conflictResolutionTimerName = getMetricsName("timer",
"conflict_resolution");
this.conflictResolutionSuccessCounterName = getMetricsName("counter",
"conflict_resolution.success");
this.conflictResolutionFailureCounterName = getMetricsName("counter",
"conflict_resolution.failure");
+ this.compactionRequestedCounterName = getMetricsName("counter",
"compaction.requested");
+ this.compactionCompletedCounterName = getMetricsName("counter",
"compaction.completed");
}
}
@@ -270,7 +277,8 @@ public class HoodieMetrics {
}
}
- String getMetricsName(String action, String metric) {
+ @VisibleForTesting
+ public String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s",
config.getMetricReporterMetricsNamePrefix(), action, metric);
}
@@ -308,6 +316,20 @@ public class HoodieMetrics {
}
}
+ public void emitCompactionRequested() {
+ if (config.isMetricsOn()) {
+ compactionRequestedCounter = getCounter(compactionRequestedCounter,
compactionRequestedCounterName);
+ compactionRequestedCounter.inc();
+ }
+ }
+
+ public void emitCompactionCompleted() {
+ if (config.isMetricsOn()) {
+ compactionCompletedCounter = getCounter(compactionCompletedCounter,
compactionCompletedCounterName);
+ compactionCompletedCounter.inc();
+ }
+ }
+
private Counter getCounter(Counter counter, String name) {
if (counter == null) {
return metrics.getRegistry().counter(name);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index 5bd1894f26d..055cdb5910b 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -35,11 +35,15 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCompactionException;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieCompactionHandler;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@@ -48,10 +52,14 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkArgument;
public class RunCompactionActionExecutor<T> extends
BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RunCompactionActionExecutor.class);
+
private final HoodieCompactor compactor;
private final HoodieCompactionHandler compactionHandler;
private WriteOperationType operationType;
+ private final HoodieMetrics metrics;
+
public RunCompactionActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable table,
@@ -65,10 +73,14 @@ public class RunCompactionActionExecutor<T> extends
this.operationType = operationType;
checkArgument(operationType == WriteOperationType.COMPACT || operationType
== WriteOperationType.LOG_COMPACT,
"Only COMPACT and LOG_COMPACT is supported");
+ metrics = new HoodieMetrics(config);
}
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+ LOG.info("Compaction requested. Instant time: {}.", instantTime);
+ metrics.emitCompactionRequested();
+
HoodieTimeline pendingMajorOrMinorCompactionTimeline =
WriteOperationType.COMPACT.equals(operationType)
? table.getActiveTimeline().filterPendingCompactionTimeline()
: table.getActiveTimeline().filterPendingLogCompactionTimeline();
@@ -117,6 +129,8 @@ public class RunCompactionActionExecutor<T> extends
throw new HoodieCompactionException("Could not compact " +
config.getBasePath(), e);
}
+ LOG.info("Compaction completed. Instant time: {}.", instantTime);
+ metrics.emitCompactionCompleted();
return compactionMetadata;
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c0e62631664..2fe4e146ae5 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -40,14 +40,17 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bloom.HoodieBloomIndex;
import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper;
+import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestHarness;
+import com.codahale.metrics.Counter;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
@@ -56,6 +59,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
+import java.util.SortedMap;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -89,9 +93,22 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
private HoodieWriteConfig getConfig() {
return getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withMetricsConfig(getMetricsConfig())
.build();
}
+ private static HoodieMetricsConfig getMetricsConfig() {
+ return
HoodieMetricsConfig.newBuilder().on(true).withReporterType("INMEMORY").build();
+ }
+
+ private long getCompactionMetricCount(String metric) {
+ HoodieMetrics metrics = writeClient.getMetrics();
+ String metricName = metrics.getMetricsName("counter", metric);
+ SortedMap<String, Counter> counters =
metrics.getMetrics().getRegistry().getCounters();
+
+ return counters.containsKey(metricName) ?
counters.get(metricName).getCount() : 0;
+ }
+
private HoodieWriteConfig.Builder getConfigBuilder() {
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
@@ -106,12 +123,18 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
@Test
public void testCompactionOnCopyOnWriteFail() throws Exception {
metaClient = HoodieTestUtils.init(hadoopConf, basePath,
HoodieTableType.COPY_ON_WRITE);
- HoodieTable table = HoodieSparkTable.create(getConfig(), context,
metaClient);
- String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
- assertThrows(HoodieNotSupportedException.class, () -> {
- table.scheduleCompaction(context, compactionInstantTime, Option.empty());
- table.compact(context, compactionInstantTime);
- });
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(getConfig());)
{
+ HoodieTable table = HoodieSparkTable.create(getConfig(), context,
metaClient);
+ String compactionInstantTime =
HoodieActiveTimeline.createNewInstantTime();
+ assertThrows(HoodieNotSupportedException.class, () -> {
+ table.scheduleCompaction(context, compactionInstantTime,
Option.empty());
+ table.compact(context, compactionInstantTime);
+ });
+
+ // Verify compaction.requested, compaction.completed metrics counts.
+ assertEquals(0,
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+ assertEquals(0,
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
+ }
}
@Test
@@ -129,6 +152,10 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
String compactionInstantTime =
HoodieActiveTimeline.createNewInstantTime();
Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context,
compactionInstantTime, Option.empty());
assertFalse(plan.isPresent(), "If there is nothing to compact, result
will be empty");
+
+ // Verify compaction.requested, compaction.completed metrics counts.
+ assertEquals(0,
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+ assertEquals(0,
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}
@@ -148,7 +175,7 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
newCommitTime = "102";
writeClient.startCommitWithTime(newCommitTime);
metaClient.getActiveTimeline().transitionRequestedToInflight(new
HoodieInstant(State.REQUESTED,
- HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime),
Option.empty());
+ HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
// create one compaction instance before exist inflight instance.
String compactionTime = "101";
@@ -161,6 +188,7 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
// insert 100 records
HoodieWriteConfig config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+ .withMetricsConfig(getMetricsConfig())
.build();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
@@ -180,6 +208,10 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
HoodieData<WriteStatus> result = compact(writeClient,
compactionInstantTime);
verifyCompaction(result);
+
+ // Verify compaction.requested, compaction.completed metrics counts.
+ assertEquals(1,
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+ assertEquals(1,
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}
@@ -190,7 +222,9 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withMemoryConfig(HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(1L, 1L).build()) // force spill
+ .withMetricsConfig(getMetricsConfig())
.build();
+
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime);
@@ -210,6 +244,10 @@ public class TestHoodieCompactor extends
HoodieClientTestHarness {
HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));
verifyCompaction(result);
+
+ // Verify compaction.requested, compaction.completed metrics counts.
+ assertEquals(i / 2 + 1,
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+ assertEquals(i / 2 + 1,
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
}
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 2cfcffb623d..72a6e910b76 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -56,11 +56,11 @@ public interface HoodieTimeline extends Serializable {
String COMPACTION_ACTION = "compaction";
String LOG_COMPACTION_ACTION = "logcompaction";
String REQUESTED_EXTENSION = ".requested";
+ String COMPLETED_EXTENSION = ".completed";
String RESTORE_ACTION = "restore";
String INDEXING_ACTION = "indexing";
// only for schema save
String SCHEMA_COMMIT_ACTION = "schemacommit";
-
String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEXING_ACTION};
@@ -81,6 +81,7 @@ public interface HoodieTimeline extends Serializable {
String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION +
REQUESTED_EXTENSION;
String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION +
INFLIGHT_EXTENSION;
String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION,
REQUESTED_EXTENSION);
+ String COMPLETED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION,
COMPLETED_EXTENSION);
String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".",
REQUESTED_COMPACTION_SUFFIX);
String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".",
COMPACTION_ACTION, INFLIGHT_EXTENSION);
String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION +
REQUESTED_EXTENSION;