This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 35897ba2764 [HUDI-6313] Add metrics counters for compaction 
requested/completed events. (#8759)
35897ba2764 is described below

commit 35897ba276480b398e35a8c518244e2b81af7d9c
Author: Amrish Lal <[email protected]>
AuthorDate: Tue Jun 20 15:39:13 2023 -0700

    [HUDI-6313] Add metrics counters for compaction requested/completed events. 
(#8759)
    
    - Add metrics counters for compaction start/stop events so that we can keep 
track of how many compactions were requested, how many finished, and how many 
produced error (interfered as number of starts - number of finished).
---
 .../org/apache/hudi/metrics/HoodieMetrics.java     | 24 +++++++++-
 .../compact/RunCompactionActionExecutor.java       | 14 ++++++
 .../table/action/compact/TestHoodieCompactor.java  | 52 +++++++++++++++++++---
 .../hudi/common/table/timeline/HoodieTimeline.java |  3 +-
 4 files changed, 84 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 4c849edcb6c..dac680a5c40 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metrics;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
@@ -50,6 +51,8 @@ public class HoodieMetrics {
   private String conflictResolutionTimerName = null;
   private String conflictResolutionSuccessCounterName = null;
   private String conflictResolutionFailureCounterName = null;
+  private String compactionRequestedCounterName = null;
+  private String compactionCompletedCounterName = null;
   private HoodieWriteConfig config;
   private String tableName;
   private Timer rollbackTimer = null;
@@ -64,6 +67,8 @@ public class HoodieMetrics {
   private Timer conflictResolutionTimer = null;
   private Counter conflictResolutionSuccessCounter = null;
   private Counter conflictResolutionFailureCounter = null;
+  private Counter compactionRequestedCounter = null;
+  private Counter compactionCompletedCounter = null;
 
   public HoodieMetrics(HoodieWriteConfig config) {
     this.config = config;
@@ -82,6 +87,8 @@ public class HoodieMetrics {
       this.conflictResolutionTimerName = getMetricsName("timer", 
"conflict_resolution");
       this.conflictResolutionSuccessCounterName = getMetricsName("counter", 
"conflict_resolution.success");
       this.conflictResolutionFailureCounterName = getMetricsName("counter", 
"conflict_resolution.failure");
+      this.compactionRequestedCounterName = getMetricsName("counter", 
"compaction.requested");
+      this.compactionCompletedCounterName = getMetricsName("counter", 
"compaction.completed");
     }
   }
 
@@ -270,7 +277,8 @@ public class HoodieMetrics {
     }
   }
 
-  String getMetricsName(String action, String metric) {
+  @VisibleForTesting
+  public String getMetricsName(String action, String metric) {
     return config == null ? null : String.format("%s.%s.%s", 
config.getMetricReporterMetricsNamePrefix(), action, metric);
   }
 
@@ -308,6 +316,20 @@ public class HoodieMetrics {
     }
   }
 
+  public void emitCompactionRequested() {
+    if (config.isMetricsOn()) {
+      compactionRequestedCounter = getCounter(compactionRequestedCounter, 
compactionRequestedCounterName);
+      compactionRequestedCounter.inc();
+    }
+  }
+
+  public void emitCompactionCompleted() {
+    if (config.isMetricsOn()) {
+      compactionCompletedCounter = getCounter(compactionCompletedCounter, 
compactionCompletedCounterName);
+      compactionCompletedCounter.inc();
+    }
+  }
+
   private Counter getCounter(Counter counter, String name) {
     if (counter == null) {
       return metrics.getRegistry().counter(name);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
index 5bd1894f26d..055cdb5910b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
@@ -35,11 +35,15 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCompactionException;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
+import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.HoodieCompactionHandler;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.BaseActionExecutor;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
@@ -48,10 +52,14 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkArgument;
 public class RunCompactionActionExecutor<T> extends
     BaseActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, HoodieWriteMetadata<HoodieData<WriteStatus>>> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(RunCompactionActionExecutor.class);
+
   private final HoodieCompactor compactor;
   private final HoodieCompactionHandler compactionHandler;
   private WriteOperationType operationType;
 
+  private final HoodieMetrics metrics;
+
   public RunCompactionActionExecutor(HoodieEngineContext context,
                                      HoodieWriteConfig config,
                                      HoodieTable table,
@@ -65,10 +73,14 @@ public class RunCompactionActionExecutor<T> extends
     this.operationType = operationType;
     checkArgument(operationType == WriteOperationType.COMPACT || operationType 
== WriteOperationType.LOG_COMPACT,
         "Only COMPACT and LOG_COMPACT is supported");
+    metrics = new HoodieMetrics(config);
   }
 
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    LOG.info("Compaction requested. Instant time: {}.", instantTime);
+    metrics.emitCompactionRequested();
+
     HoodieTimeline pendingMajorOrMinorCompactionTimeline = 
WriteOperationType.COMPACT.equals(operationType)
         ? table.getActiveTimeline().filterPendingCompactionTimeline()
         : table.getActiveTimeline().filterPendingLogCompactionTimeline();
@@ -117,6 +129,8 @@ public class RunCompactionActionExecutor<T> extends
       throw new HoodieCompactionException("Could not compact " + 
config.getBasePath(), e);
     }
 
+    LOG.info("Compaction completed. Instant time: {}.", instantTime);
+    metrics.emitCompactionCompleted();
     return compactionMetadata;
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index c0e62631664..2fe4e146ae5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -40,14 +40,17 @@ import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieMemoryConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bloom.HoodieBloomIndex;
 import org.apache.hudi.index.bloom.SparkHoodieBloomIndexHelper;
+import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 
+import com.codahale.metrics.Counter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
@@ -56,6 +59,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.SortedMap;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -89,9 +93,22 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
   private HoodieWriteConfig getConfig() {
     return getConfigBuilder()
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withMetricsConfig(getMetricsConfig())
         .build();
   }
 
+  private static HoodieMetricsConfig getMetricsConfig() {
+    return 
HoodieMetricsConfig.newBuilder().on(true).withReporterType("INMEMORY").build();
+  }
+
+  private long getCompactionMetricCount(String metric) {
+    HoodieMetrics metrics = writeClient.getMetrics();
+    String metricName = metrics.getMetricsName("counter", metric);
+    SortedMap<String, Counter> counters = 
metrics.getMetrics().getRegistry().getCounters();
+
+    return counters.containsKey(metricName) ? 
counters.get(metricName).getCount() : 0;
+  }
+
   private HoodieWriteConfig.Builder getConfigBuilder() {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2)
@@ -106,12 +123,18 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
   @Test
   public void testCompactionOnCopyOnWriteFail() throws Exception {
     metaClient = HoodieTestUtils.init(hadoopConf, basePath, 
HoodieTableType.COPY_ON_WRITE);
-    HoodieTable table = HoodieSparkTable.create(getConfig(), context, 
metaClient);
-    String compactionInstantTime = HoodieActiveTimeline.createNewInstantTime();
-    assertThrows(HoodieNotSupportedException.class, () -> {
-      table.scheduleCompaction(context, compactionInstantTime, Option.empty());
-      table.compact(context, compactionInstantTime);
-    });
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(getConfig());) 
{
+      HoodieTable table = HoodieSparkTable.create(getConfig(), context, 
metaClient);
+      String compactionInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
+      assertThrows(HoodieNotSupportedException.class, () -> {
+        table.scheduleCompaction(context, compactionInstantTime, 
Option.empty());
+        table.compact(context, compactionInstantTime);
+      });
+
+      // Verify compaction.requested, compaction.completed metrics counts.
+      assertEquals(0, 
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+      assertEquals(0, 
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
+    }
   }
 
   @Test
@@ -129,6 +152,10 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
       String compactionInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
       Option<HoodieCompactionPlan> plan = table.scheduleCompaction(context, 
compactionInstantTime, Option.empty());
       assertFalse(plan.isPresent(), "If there is nothing to compact, result 
will be empty");
+
+      // Verify compaction.requested, compaction.completed metrics counts.
+      assertEquals(0, 
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+      assertEquals(0, 
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
     }
   }
 
@@ -148,7 +175,7 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
       newCommitTime = "102";
       writeClient.startCommitWithTime(newCommitTime);
       metaClient.getActiveTimeline().transitionRequestedToInflight(new 
HoodieInstant(State.REQUESTED,
-              HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), 
Option.empty());
+          HoodieTimeline.DELTA_COMMIT_ACTION, newCommitTime), Option.empty());
 
       // create one compaction instance before exist inflight instance.
       String compactionTime = "101";
@@ -161,6 +188,7 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
     // insert 100 records
     HoodieWriteConfig config = getConfigBuilder()
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
+        .withMetricsConfig(getMetricsConfig())
         .build();
     try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
       String newCommitTime = "100";
@@ -180,6 +208,10 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
       HoodieData<WriteStatus> result = compact(writeClient, 
compactionInstantTime);
 
       verifyCompaction(result);
+
+      // Verify compaction.requested, compaction.completed metrics counts.
+      assertEquals(1, 
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+      assertEquals(1, 
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
     }
   }
 
@@ -190,7 +222,9 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
         .withMemoryConfig(HoodieMemoryConfig.newBuilder()
             .withMaxMemoryMaxSize(1L, 1L).build()) // force spill
+        .withMetricsConfig(getMetricsConfig())
         .build();
+
     try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config)) {
       String newCommitTime = "100";
       writeClient.startCommitWithTime(newCommitTime);
@@ -210,6 +244,10 @@ public class TestHoodieCompactor extends 
HoodieClientTestHarness {
         HoodieData<WriteStatus> result = compact(writeClient, "10" + (i + 1));
 
         verifyCompaction(result);
+
+        // Verify compaction.requested, compaction.completed metrics counts.
+        assertEquals(i / 2 + 1, 
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
+        assertEquals(i / 2 + 1, 
getCompactionMetricCount(HoodieTimeline.COMPLETED_COMPACTION_SUFFIX));
       }
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index 2cfcffb623d..72a6e910b76 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -56,11 +56,11 @@ public interface HoodieTimeline extends Serializable {
   String COMPACTION_ACTION = "compaction";
   String LOG_COMPACTION_ACTION = "logcompaction";
   String REQUESTED_EXTENSION = ".requested";
+  String COMPLETED_EXTENSION = ".completed";
   String RESTORE_ACTION = "restore";
   String INDEXING_ACTION = "indexing";
   // only for schema save
   String SCHEMA_COMMIT_ACTION = "schemacommit";
-
   String[] VALID_ACTIONS_IN_TIMELINE = {COMMIT_ACTION, DELTA_COMMIT_ACTION,
       CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
       COMPACTION_ACTION, REPLACE_COMMIT_ACTION, INDEXING_ACTION};
@@ -81,6 +81,7 @@ public interface HoodieTimeline extends Serializable {
   String REQUESTED_ROLLBACK_EXTENSION = "." + ROLLBACK_ACTION + 
REQUESTED_EXTENSION;
   String INFLIGHT_SAVEPOINT_EXTENSION = "." + SAVEPOINT_ACTION + 
INFLIGHT_EXTENSION;
   String REQUESTED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, 
REQUESTED_EXTENSION);
+  String COMPLETED_COMPACTION_SUFFIX = StringUtils.join(COMPACTION_ACTION, 
COMPLETED_EXTENSION);
   String REQUESTED_COMPACTION_EXTENSION = StringUtils.join(".", 
REQUESTED_COMPACTION_SUFFIX);
   String INFLIGHT_COMPACTION_EXTENSION = StringUtils.join(".", 
COMPACTION_ACTION, INFLIGHT_EXTENSION);
   String REQUESTED_RESTORE_EXTENSION = "." + RESTORE_ACTION + 
REQUESTED_EXTENSION;

Reply via email to