This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ffaf143b44ea713416b3a5ebf9cbc476812f0838
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 15 07:07:14 2024 -0700

    [HUDI-7624] Fixing index tagging duration (#11035)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../apache/hudi/client/BaseHoodieWriteClient.java    |  3 ---
 .../java/org/apache/hudi/metrics/HoodieMetrics.java  | 20 ++++++++++++++++++++
 .../hudi/table/action/HoodieWriteMetadata.java       | 12 ++++++++++++
 .../hudi/table/action/commit/BaseWriteHelper.java    |  6 ------
 .../org/apache/hudi/metrics/TestHoodieMetrics.java   |  8 ++++++++
 .../org/apache/hudi/client/SparkRDDWriteClient.java  |  5 +++--
 .../action/commit/BaseSparkCommitActionExecutor.java |  5 +++++
 .../SparkUpsertDeltaCommitActionExecutor.java        |  2 +-
 8 files changed, 49 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index f089a6b89d4..b9da3387654 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -517,9 +517,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @return Write Status
    */
   public O postWrite(HoodieWriteMetadata<O> result, String instantTime, 
HoodieTable hoodieTable) {
-    if (result.getIndexLookupDuration().isPresent()) {
-      metrics.updateIndexMetrics(getOperationType().name(), 
result.getIndexUpdateDuration().get().toMillis());
-    }
     if (result.isCommitted()) {
       // Perform post commit operations.
       if (result.getFinalizeDuration().isPresent()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 82dca3c43bb..5edfa7fd4d7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -55,6 +55,9 @@ public class HoodieMetrics {
   public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted";
   public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = 
"totalCorruptedLogBlocks";
   public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = 
"totalRollbackLogBlocks";
+  public static final String TIMER_ACTION = "timer";
+  public static final String DURATION_STR = "duration";
+  public static final String SOURCE_READ_AND_INDEX_ACTION = 
"source_read_and_index";
 
   private Metrics metrics;
   // Some timers
@@ -67,6 +70,7 @@ public class HoodieMetrics {
   public String finalizeTimerName = null;
   public String compactionTimerName = null;
   public String indexTimerName = null;
+  public String sourceReadAndIndexTimerName = null;
   private String conflictResolutionTimerName = null;
   private String conflictResolutionSuccessCounterName = null;
   private String conflictResolutionFailureCounterName = null;
@@ -83,6 +87,7 @@ public class HoodieMetrics {
   private Timer logCompactionTimer = null;
   private Timer clusteringTimer = null;
   private Timer indexTimer = null;
+  private Timer sourceReadAndIndexTimer = null;
   private Timer conflictResolutionTimer = null;
   private Counter conflictResolutionSuccessCounter = null;
   private Counter conflictResolutionFailureCounter = null;
@@ -103,6 +108,7 @@ public class HoodieMetrics {
       this.compactionTimerName = getMetricsName("timer", 
HoodieTimeline.COMPACTION_ACTION);
       this.logCompactionTimerName = getMetricsName("timer", 
HoodieTimeline.LOG_COMPACTION_ACTION);
       this.indexTimerName = getMetricsName("timer", "index");
+      this.sourceReadAndIndexTimerName = getMetricsName(TIMER_ACTION, 
SOURCE_READ_AND_INDEX_ACTION);
       this.conflictResolutionTimerName = getMetricsName("timer", 
"conflict_resolution");
       this.conflictResolutionSuccessCounterName = getMetricsName("counter", 
"conflict_resolution.success");
       this.conflictResolutionFailureCounterName = getMetricsName("counter", 
"conflict_resolution.failure");
@@ -182,6 +188,13 @@ public class HoodieMetrics {
     return indexTimer == null ? null : indexTimer.time();
   }
 
+  public Timer.Context getSourceReadAndIndexTimerCtx() {
+    if (config.isMetricsOn() && sourceReadAndIndexTimer == null) {
+      sourceReadAndIndexTimer = createTimer(sourceReadAndIndexTimerName);
+    }
+    return sourceReadAndIndexTimer == null ? null : 
sourceReadAndIndexTimer.time();
+  }
+
   public Timer.Context getConflictResolutionCtx() {
     if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) {
       conflictResolutionTimer = createTimer(conflictResolutionTimerName);
@@ -302,6 +315,13 @@ public class HoodieMetrics {
     }
   }
 
+  public void updateSourceReadAndIndexMetrics(final String action, final long 
durationInMs) {
+    if (config.isMetricsOn()) {
+      LOG.info(String.format("Sending %s metrics (%s.duration, %d)", 
SOURCE_READ_AND_INDEX_ACTION, action, durationInMs));
+      metrics.registerGauge(getMetricsName(SOURCE_READ_AND_INDEX_ACTION, 
String.format("%s.duration", action)), durationInMs);
+    }
+  }
+
   @VisibleForTesting
   public String getMetricsName(String action, String metric) {
     if (config == null) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
index d771a574e37..d67ca637603 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
@@ -34,6 +34,7 @@ public class HoodieWriteMetadata<O> {
 
   private O writeStatuses;
   private Option<Duration> indexLookupDuration = Option.empty();
+  private Option<Long> sourceReadAndIndexDurationMs = Option.empty();
 
   // Will be set when auto-commit happens
   private boolean isCommitted;
@@ -59,6 +60,9 @@ public class HoodieWriteMetadata<O> {
     if (indexLookupDuration.isPresent()) {
       newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get());
     }
+    if (sourceReadAndIndexDurationMs.isPresent()) {
+      
newMetadataInstance.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs.get());
+    }
     newMetadataInstance.setCommitted(isCommitted);
     newMetadataInstance.setCommitMetadata(commitMetadata);
     if (writeStats.isPresent()) {
@@ -132,6 +136,14 @@ public class HoodieWriteMetadata<O> {
     this.indexLookupDuration = Option.ofNullable(indexLookupDuration);
   }
 
+  public Option<Long> getSourceReadAndIndexDurationMs() {
+    return sourceReadAndIndexDurationMs;
+  }
+
+  public void setSourceReadAndIndexDurationMs(Long 
sourceReadAndIndexDurationMs) {
+    this.sourceReadAndIndexDurationMs = 
Option.of(sourceReadAndIndexDurationMs);
+  }
+
   public Map<String, List<String>> getPartitionToReplaceFileIds() {
     return partitionToReplaceFileIds.orElse(Collections.emptyMap());
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index b5edc7878f9..ff47b636098 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -29,9 +29,6 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import java.time.Duration;
-import java.time.Instant;
-
 public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I> {
 
   protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor) {
@@ -51,17 +48,14 @@ public abstract class BaseWriteHelper<T, I, K, O, R> 
extends ParallelismHelper<I
       I dedupedRecords =
           combineOnCondition(shouldCombine, inputRecords, 
configuredShuffleParallelism, table);
 
-      Instant lookupBegin = Instant.now();
       I taggedRecords = dedupedRecords;
       if (table.getIndex().requiresTagging(operationType)) {
         // perform index loop up to get existing location of records
         context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + 
table.getConfig().getTableName());
         taggedRecords = tag(dedupedRecords, context, table);
       }
-      Duration indexLookupDuration = Duration.between(lookupBegin, 
Instant.now());
 
       HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
-      result.setIndexLookupDuration(indexLookupDuration);
       return result;
     } catch (Throwable e) {
       if (e instanceof HoodieUpsertException) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 73b9646d577..39cd0dc444f 100755
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -87,6 +87,14 @@ public class TestHoodieMetrics {
     long msec = 
(Long)metrics.getRegistry().getGauges().get(metricName).getValue();
     assertTrue(msec > 0);
 
+    // Source read and index metrics
+    timer = hoodieMetrics.getSourceReadAndIndexTimerCtx();
+    Thread.sleep(5); // Ensure timer duration is > 0
+    hoodieMetrics.updateSourceReadAndIndexMetrics("some_action", 
hoodieMetrics.getDurationInMs(timer.stop()));
+    metricName = hoodieMetrics.getMetricsName("source_read_and_index", 
"some_action.duration");
+    msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
+    assertTrue(msec > 0);
+
     // test index type
     metricName = hoodieMetrics.getMetricsName("index", "type");
     for (HoodieIndex.IndexType indexType: HoodieIndex.IndexType.values()) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index a438df4e047..bbdd34835ad 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -45,6 +45,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metrics.DistributedRegistry;
+import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -155,8 +156,8 @@ public class SparkRDDWriteClient<T> extends
     preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.upsert(context, instantTime, HoodieJavaRDD.of(records));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
-    if (result.getIndexLookupDuration().isPresent()) {
-      metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
+    if (result.getSourceReadAndIndexDurationMs().isPresent()) {
+      metrics.updateSourceReadAndIndexMetrics(HoodieMetrics.DURATION_STR, 
result.getSourceReadAndIndexDurationMs().get());
     }
     return postWrite(resultRDD, instantTime, table);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 30e3cb533b1..129ace5f8d1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -162,9 +163,12 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = 
clusteringHandleUpdate(inputRecords);
 
     context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile:" + config.getTableName());
+    HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start(); // time taken 
from dedup -> tag location -> building workload profile
     WorkloadProfile workloadProfile =
             new 
WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, 
table.getIndex().canIndexLogFiles());
     LOG.debug("Input workload profile :" + workloadProfile);
+    long sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.endTimer();
+    LOG.info("Source read and index timer " + sourceReadAndIndexDurationMs);
 
     // partition using the insert partitioner
     final Partitioner partitioner = getPartitioner(workloadProfile);
@@ -174,6 +178,7 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = new 
HoodieWriteMetadata<>();
     updateIndexAndCommitIfNeeded(writeStatuses, result);
+    result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs);
     return result;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
index 270ac864012..2976234245b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
@@ -43,6 +43,6 @@ public class SparkUpsertDeltaCommitActionExecutor<T>
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, 
context, table,
-        config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(),this, operationType);
+        config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(), this, operationType);
   }
 }

Reply via email to