This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8d103ad87d8 [HUDI-7624] Fixing index tagging duration (#11035)
8d103ad87d8 is described below
commit 8d103ad87d8517cfdfcb0b29a67df1591f906de5
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue May 14 10:03:59 2024 -0700
[HUDI-7624] Fixing index tagging duration (#11035)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/client/BaseHoodieWriteClient.java | 3 ---
.../java/org/apache/hudi/metrics/HoodieMetrics.java | 18 ++++++++++++++++++
.../apache/hudi/table/action/HoodieWriteMetadata.java | 12 ++++++++++++
.../hudi/table/action/commit/BaseWriteHelper.java | 6 ------
.../org/apache/hudi/metrics/TestHoodieMetrics.java | 8 ++++++++
.../org/apache/hudi/client/SparkRDDWriteClient.java | 5 +++--
.../action/commit/BaseSparkCommitActionExecutor.java | 5 +++++
.../SparkUpsertDeltaCommitActionExecutor.java | 2 +-
8 files changed, 47 insertions(+), 12 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 003836d0cac..544bce20f3d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -511,9 +511,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @return Write Status
*/
public O postWrite(HoodieWriteMetadata<O> result, String instantTime,
HoodieTable hoodieTable) {
- if (result.getIndexLookupDuration().isPresent()) {
- metrics.updateIndexMetrics(getOperationType().name(),
result.getIndexUpdateDuration().get().toMillis());
- }
if (result.isCommitted()) {
// Perform post commit operations.
if (result.getFinalizeDuration().isPresent()) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index d8c60d5f660..d5020014036 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -71,6 +71,7 @@ public class HoodieMetrics {
public static final String ARCHIVE_ACTION = "archive";
public static final String FINALIZE_ACTION = "finalize";
public static final String INDEX_ACTION = "index";
+ public static final String SOURCE_READ_AND_INDEX_ACTION =
"source_read_and_index";
private Metrics metrics;
// Some timers
@@ -84,6 +85,7 @@ public class HoodieMetrics {
public String finalizeTimerName = null;
public String compactionTimerName = null;
public String indexTimerName = null;
+ public String sourceReadAndIndexTimerName = null;
private String conflictResolutionTimerName = null;
private String conflictResolutionSuccessCounterName = null;
private String conflictResolutionFailureCounterName = null;
@@ -101,6 +103,7 @@ public class HoodieMetrics {
private Timer logCompactionTimer = null;
private Timer clusteringTimer = null;
private Timer indexTimer = null;
+ private Timer sourceReadAndIndexTimer = null;
private Timer conflictResolutionTimer = null;
private Counter conflictResolutionSuccessCounter = null;
private Counter conflictResolutionFailureCounter = null;
@@ -122,6 +125,7 @@ public class HoodieMetrics {
this.compactionTimerName = getMetricsName(TIMER_ACTION,
HoodieTimeline.COMPACTION_ACTION);
this.logCompactionTimerName = getMetricsName(TIMER_ACTION,
HoodieTimeline.LOG_COMPACTION_ACTION);
this.indexTimerName = getMetricsName(TIMER_ACTION, INDEX_ACTION);
+ this.sourceReadAndIndexTimerName = getMetricsName(TIMER_ACTION,
SOURCE_READ_AND_INDEX_ACTION);
this.conflictResolutionTimerName = getMetricsName(TIMER_ACTION,
CONFLICT_RESOLUTION_STR);
this.conflictResolutionSuccessCounterName =
getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + SUCCESS_EXTENSION);
this.conflictResolutionFailureCounterName =
getMetricsName(COUNTER_ACTION, CONFLICT_RESOLUTION_STR + FAILURE_EXTENSION);
@@ -208,6 +212,13 @@ public class HoodieMetrics {
return indexTimer == null ? null : indexTimer.time();
}
+ public Timer.Context getSourceReadAndIndexTimerCtx() {
+ if (config.isMetricsOn() && sourceReadAndIndexTimer == null) {
+ sourceReadAndIndexTimer = createTimer(sourceReadAndIndexTimerName);
+ }
+ return sourceReadAndIndexTimer == null ? null :
sourceReadAndIndexTimer.time();
+ }
+
public Timer.Context getConflictResolutionCtx() {
if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) {
conflictResolutionTimer = createTimer(conflictResolutionTimerName);
@@ -340,6 +351,13 @@ public class HoodieMetrics {
}
}
+ public void updateSourceReadAndIndexMetrics(final String action, final long
durationInMs) {
+ if (config.isMetricsOn()) {
+ LOG.info(String.format("Sending %s metrics (%s.duration, %d)",
SOURCE_READ_AND_INDEX_ACTION, action, durationInMs));
+ metrics.registerGauge(getMetricsName(SOURCE_READ_AND_INDEX_ACTION,
String.format("%s.duration", action)), durationInMs);
+ }
+ }
+
@VisibleForTesting
public String getMetricsName(String action, String metric) {
if (config == null) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
index d771a574e37..d67ca637603 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java
@@ -34,6 +34,7 @@ public class HoodieWriteMetadata<O> {
private O writeStatuses;
private Option<Duration> indexLookupDuration = Option.empty();
+ private Option<Long> sourceReadAndIndexDurationMs = Option.empty();
// Will be set when auto-commit happens
private boolean isCommitted;
@@ -59,6 +60,9 @@ public class HoodieWriteMetadata<O> {
if (indexLookupDuration.isPresent()) {
newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get());
}
+ if (sourceReadAndIndexDurationMs.isPresent()) {
+
newMetadataInstance.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs.get());
+ }
newMetadataInstance.setCommitted(isCommitted);
newMetadataInstance.setCommitMetadata(commitMetadata);
if (writeStats.isPresent()) {
@@ -132,6 +136,14 @@ public class HoodieWriteMetadata<O> {
this.indexLookupDuration = Option.ofNullable(indexLookupDuration);
}
+ public Option<Long> getSourceReadAndIndexDurationMs() {
+ return sourceReadAndIndexDurationMs;
+ }
+
+ public void setSourceReadAndIndexDurationMs(Long
sourceReadAndIndexDurationMs) {
+ this.sourceReadAndIndexDurationMs =
Option.of(sourceReadAndIndexDurationMs);
+ }
+
public Map<String, List<String>> getPartitionToReplaceFileIds() {
return partitionToReplaceFileIds.orElse(Collections.emptyMap());
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index b5edc7878f9..ff47b636098 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -29,9 +29,6 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
-import java.time.Duration;
-import java.time.Instant;
-
public abstract class BaseWriteHelper<T, I, K, O, R> extends
ParallelismHelper<I> {
protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer>
partitionNumberExtractor) {
@@ -51,17 +48,14 @@ public abstract class BaseWriteHelper<T, I, K, O, R>
extends ParallelismHelper<I
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords,
configuredShuffleParallelism, table);
- Instant lookupBegin = Instant.now();
I taggedRecords = dedupedRecords;
if (table.getIndex().requiresTagging(operationType)) {
// perform index loop up to get existing location of records
context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " +
table.getConfig().getTableName());
taggedRecords = tag(dedupedRecords, context, table);
}
- Duration indexLookupDuration = Duration.between(lookupBegin,
Instant.now());
HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
- result.setIndexLookupDuration(indexLookupDuration);
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 73b9646d577..39cd0dc444f 100755
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -87,6 +87,14 @@ public class TestHoodieMetrics {
long msec =
(Long)metrics.getRegistry().getGauges().get(metricName).getValue();
assertTrue(msec > 0);
+ // Source read and index metrics
+ timer = hoodieMetrics.getSourceReadAndIndexTimerCtx();
+ Thread.sleep(5); // Ensure timer duration is > 0
+ hoodieMetrics.updateSourceReadAndIndexMetrics("some_action",
hoodieMetrics.getDurationInMs(timer.stop()));
+ metricName = hoodieMetrics.getMetricsName("source_read_and_index",
"some_action.duration");
+ msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
+ assertTrue(msec > 0);
+
// test index type
metricName = hoodieMetrics.getMetricsName("index", "type");
for (HoodieIndex.IndexType indexType: HoodieIndex.IndexType.values()) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 4997faebe70..6cf866482f7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -43,6 +43,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metrics.DistributedRegistry;
+import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -141,8 +142,8 @@ public class SparkRDDWriteClient<T> extends
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
HoodieWriteMetadata<HoodieData<WriteStatus>> result =
table.upsert(context, instantTime, HoodieJavaRDD.of(records));
HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD =
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
- if (result.getIndexLookupDuration().isPresent()) {
- metrics.updateIndexMetrics(LOOKUP_STR,
result.getIndexLookupDuration().get().toMillis());
+ if (result.getSourceReadAndIndexDurationMs().isPresent()) {
+ metrics.updateSourceReadAndIndexMetrics(HoodieMetrics.DURATION_STR,
result.getSourceReadAndIndexDurationMs().get());
}
return postWrite(resultRDD, instantTime, table);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index c47ba72949c..32e4824b8b8 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -158,9 +159,12 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate =
clusteringHandleUpdate(inputRecords);
context.setJobStatus(this.getClass().getSimpleName(), "Building workload
profile:" + config.getTableName());
+ HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start(); // time taken
from dedup -> tag location -> building workload profile
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate),
operationType, table.getIndex().canIndexLogFiles());
LOG.debug("Input workload profile :" + workloadProfile);
+ long sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.endTimer();
+ LOG.info("Source read and index timer " + sourceReadAndIndexDurationMs);
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(workloadProfile);
@@ -170,6 +174,7 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
HoodieData<WriteStatus> writeStatuses =
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
HoodieWriteMetadata<HoodieData<WriteStatus>> result = new
HoodieWriteMetadata<>();
updateIndexAndCommitIfNeeded(writeStatuses, result);
+ result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs);
return result;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
index 270ac864012..2976234245b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
@@ -43,6 +43,6 @@ public class SparkUpsertDeltaCommitActionExecutor<T>
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD,
context, table,
- config.shouldCombineBeforeUpsert(),
config.getUpsertShuffleParallelism(),this, operationType);
+ config.shouldCombineBeforeUpsert(),
config.getUpsertShuffleParallelism(), this, operationType);
}
}