This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d103ad87d8 [HUDI-7624] Fixing index tagging duration (#11035)
8d103ad87d8 is described below

commit 8d103ad87d8517cfdfcb0b29a67df1591f906de5
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 14 10:03:59 2024 -0700

    [HUDI-7624] Fixing index tagging duration (#11035)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/client/BaseHoodieWriteClient.java  |  3 ---
 .../java/org/apache/hudi/metrics/HoodieMetrics.java    | 18 ++++++++++++++++++
 .../apache/hudi/table/action/HoodieWriteMetadata.java  | 12 ++++++++++++
 .../hudi/table/action/commit/BaseWriteHelper.java      |  6 ------
 .../org/apache/hudi/metrics/TestHoodieMetrics.java     |  8 ++++++++
 .../org/apache/hudi/client/SparkRDDWriteClient.java    |  5 +++--
 .../action/commit/BaseSparkCommitActionExecutor.java   |  5 +++++
 .../SparkUpsertDeltaCommitActionExecutor.java          |  2 +-
 8 files changed, 47 insertions(+), 12 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 003836d0cac..544bce20f3d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -511,9 +511,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * @return Write Status
    */
   public O postWrite(HoodieWriteMetadata<O> result, String instantTime, 
HoodieTable hoodieTable) {
-    if (result.getIndexLookupDuration().isPresent()) {
-      metrics.updateIndexMetrics(getOperationType().name(), 
result.getIndexUpdateDuration().get().toMillis());
-    }
     if (result.isCommitted()) {
       // Perform post commit operations.
       if (result.getFinalizeDuration().isPresent()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index d8c60d5f660..d5020014036 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -71,6 +71,7 @@ public class HoodieMetrics {
   public static final String ARCHIVE_ACTION = "archive";
   public static final String FINALIZE_ACTION = "finalize";
   public static final String INDEX_ACTION = "index";
+  public static final String SOURCE_READ_AND_INDEX_ACTION = 
"source_read_and_index";
 
   private Metrics metrics;
   // Some timers
@@ -84,6 +85,7 @@ public class HoodieMetrics {
   public String finalizeTimerName = null;
   public String compactionTimerName = null;
   public String indexTimerName = null;
+  public String sourceReadAndIndexTimerName = null;
   private String conflictResolutionTimerName = null;
   private String conflictResolutionSuccessCounterName = null;
   private String conflictResolutionFailureCounterName = null;
@@ -101,6 +103,7 @@ public class HoodieMetrics {
   private Timer logCompactionTimer = null;
   private Timer clusteringTimer = null;
   private Timer indexTimer = null;
+  private Timer sourceReadAndIndexTimer = null;
   private Timer conflictResolutionTimer = null;
   private Counter conflictResolutionSuccessCounter = null;
   private Counter conflictResolutionFailureCounter = null;
@@ -122,6 +125,7 @@ public class HoodieMetrics {
       this.compactionTimerName = getMetricsName(TIMER_ACTION, 
HoodieTimeline.COMPACTION_ACTION);
       this.logCompactionTimerName = getMetricsName(TIMER_ACTION, 
HoodieTimeline.LOG_COMPACTION_ACTION);
       this.indexTimerName = getMetricsName(TIMER_ACTION, INDEX_ACTION);
+      this.sourceReadAndIndexTimerName = getMetricsName(TIMER_ACTION, 
SOURCE_READ_AND_INDEX_ACTION);
       this.conflictResolutionTimerName = getMetricsName(TIMER_ACTION, 
CONFLICT_RESOLUTION_STR);
       this.conflictResolutionSuccessCounterName = 
getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + SUCCESS_EXTENSION);
       this.conflictResolutionFailureCounterName = 
getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + FAILURE_EXTENSION);
@@ -208,6 +212,13 @@ public class HoodieMetrics {
     return indexTimer == null ? null : indexTimer.time();
   }
 
+  public Timer.Context getSourceReadAndIndexTimerCtx() {
+    if (config.isMetricsOn() && sourceReadAndIndexTimer == null) {
+      sourceReadAndIndexTimer = createTimer(sourceReadAndIndexTimerName);
+    }
+    return sourceReadAndIndexTimer == null ? null : 
sourceReadAndIndexTimer.time();
+  }
+
   public Timer.Context getConflictResolutionCtx() {
     if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) {
       conflictResolutionTimer = createTimer(conflictResolutionTimerName);
@@ -340,6 +351,13 @@ public class HoodieMetrics {
     }
   }
 
+  public void updateSourceReadAndIndexMetrics(final String action, final long 
durationInMs) {
+    if (config.isMetricsOn()) {
+      LOG.info(String.format("Sending %s metrics (%s.duration, %d)", 
SOURCE_READ_AND_INDEX_ACTION, action, durationInMs));
+      metrics.registerGauge(getMetricsName(SOURCE_READ_AND_INDEX_ACTION, 
String.format("%s.duration", action)), durationInMs);
+    }
+  }
+
   @VisibleForTesting
   public String getMetricsName(String action, String metric) {
     if (config == null) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
index d771a574e37..d67ca637603 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
@@ -34,6 +34,7 @@ public class HoodieWriteMetadata<O> {
 
   private O writeStatuses;
   private Option<Duration> indexLookupDuration = Option.empty();
+  private Option<Long> sourceReadAndIndexDurationMs = Option.empty();
 
   // Will be set when auto-commit happens
   private boolean isCommitted;
@@ -59,6 +60,9 @@ public class HoodieWriteMetadata<O> {
     if (indexLookupDuration.isPresent()) {
       newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get());
     }
+    if (sourceReadAndIndexDurationMs.isPresent()) {
+      
newMetadataInstance.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs.get());
+    }
     newMetadataInstance.setCommitted(isCommitted);
     newMetadataInstance.setCommitMetadata(commitMetadata);
     if (writeStats.isPresent()) {
@@ -132,6 +136,14 @@ public class HoodieWriteMetadata<O> {
     this.indexLookupDuration = Option.ofNullable(indexLookupDuration);
   }
 
+  public Option<Long> getSourceReadAndIndexDurationMs() {
+    return sourceReadAndIndexDurationMs;
+  }
+
+  public void setSourceReadAndIndexDurationMs(Long 
sourceReadAndIndexDurationMs) {
+    this.sourceReadAndIndexDurationMs = 
Option.of(sourceReadAndIndexDurationMs);
+  }
+
   public Map<String, List<String>> getPartitionToReplaceFileIds() {
     return partitionToReplaceFileIds.orElse(Collections.emptyMap());
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index b5edc7878f9..ff47b636098 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -29,9 +29,6 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
-import java.time.Duration;
-import java.time.Instant;
-
 public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I> {
 
   protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer> 
partitionNumberExtractor) {
@@ -51,17 +48,14 @@ public abstract class BaseWriteHelper<T, I, K, O, R> 
extends ParallelismHelper<I
       I dedupedRecords =
           combineOnCondition(shouldCombine, inputRecords, 
configuredShuffleParallelism, table);
 
-      Instant lookupBegin = Instant.now();
       I taggedRecords = dedupedRecords;
       if (table.getIndex().requiresTagging(operationType)) {
         // perform index loop up to get existing location of records
         context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + 
table.getConfig().getTableName());
         taggedRecords = tag(dedupedRecords, context, table);
       }
-      Duration indexLookupDuration = Duration.between(lookupBegin, 
Instant.now());
 
       HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
-      result.setIndexLookupDuration(indexLookupDuration);
       return result;
     } catch (Throwable e) {
       if (e instanceof HoodieUpsertException) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 73b9646d577..39cd0dc444f 100755
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -87,6 +87,14 @@ public class TestHoodieMetrics {
     long msec = 
(Long)metrics.getRegistry().getGauges().get(metricName).getValue();
     assertTrue(msec > 0);
 
+    // Source read and index metrics
+    timer = hoodieMetrics.getSourceReadAndIndexTimerCtx();
+    Thread.sleep(5); // Ensure timer duration is > 0
+    hoodieMetrics.updateSourceReadAndIndexMetrics("some_action", 
hoodieMetrics.getDurationInMs(timer.stop()));
+    metricName = hoodieMetrics.getMetricsName("source_read_and_index", 
"some_action.duration");
+    msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
+    assertTrue(msec > 0);
+
     // test index type
     metricName = hoodieMetrics.getMetricsName("index", "type");
     for (HoodieIndex.IndexType indexType: HoodieIndex.IndexType.values()) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 4997faebe70..6cf866482f7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -43,6 +43,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.metrics.DistributedRegistry;
+import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -141,8 +142,8 @@ public class SparkRDDWriteClient<T> extends
     preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.upsert(context, instantTime, HoodieJavaRDD.of(records));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
-    if (result.getIndexLookupDuration().isPresent()) {
-      metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
+    if (result.getSourceReadAndIndexDurationMs().isPresent()) {
+      metrics.updateSourceReadAndIndexMetrics(HoodieMetrics.DURATION_STR, 
result.getSourceReadAndIndexDurationMs().get());
     }
     return postWrite(resultRDD, instantTime, table);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index c47ba72949c..32e4824b8b8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -158,9 +159,12 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = 
clusteringHandleUpdate(inputRecords);
 
     context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile:" + config.getTableName());
+    HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start(); // time taken 
from dedup -> tag location -> building workload profile
     WorkloadProfile workloadProfile =
         new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), 
operationType, table.getIndex().canIndexLogFiles());
     LOG.debug("Input workload profile :" + workloadProfile);
+    long sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.endTimer();
+    LOG.info("Source read and index timer " + sourceReadAndIndexDurationMs);
 
     // partition using the insert partitioner
     final Partitioner partitioner = getPartitioner(workloadProfile);
@@ -170,6 +174,7 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = new 
HoodieWriteMetadata<>();
     updateIndexAndCommitIfNeeded(writeStatuses, result);
+    result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs);
     return result;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
index 270ac864012..2976234245b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
@@ -43,6 +43,6 @@ public class SparkUpsertDeltaCommitActionExecutor<T>
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
     return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, 
context, table,
-        config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(),this, operationType);
+        config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(), this, operationType);
   }
 }

Reply via email to