This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ffaf143b44ea713416b3a5ebf9cbc476812f0838 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed May 15 07:07:14 2024 -0700 [HUDI-7624] Fixing index tagging duration (#11035) Co-authored-by: Y Ethan Guo <[email protected]> --- .../apache/hudi/client/BaseHoodieWriteClient.java | 3 --- .../java/org/apache/hudi/metrics/HoodieMetrics.java | 20 ++++++++++++++++++++ .../hudi/table/action/HoodieWriteMetadata.java | 12 ++++++++++++ .../hudi/table/action/commit/BaseWriteHelper.java | 6 ------ .../org/apache/hudi/metrics/TestHoodieMetrics.java | 8 ++++++++ .../org/apache/hudi/client/SparkRDDWriteClient.java | 5 +++-- .../action/commit/BaseSparkCommitActionExecutor.java | 5 +++++ .../SparkUpsertDeltaCommitActionExecutor.java | 2 +- 8 files changed, 49 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index f089a6b89d4..b9da3387654 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -517,9 +517,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient * @return Write Status */ public O postWrite(HoodieWriteMetadata<O> result, String instantTime, HoodieTable hoodieTable) { - if (result.getIndexLookupDuration().isPresent()) { - metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis()); - } if (result.isCommitted()) { // Perform post commit operations. if (result.getFinalizeDuration().isPresent()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 82dca3c43bb..5edfa7fd4d7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -55,6 +55,9 @@ public class HoodieMetrics { public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted"; public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = "totalCorruptedLogBlocks"; public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = "totalRollbackLogBlocks"; + public static final String TIMER_ACTION = "timer"; + public static final String DURATION_STR = "duration"; + public static final String SOURCE_READ_AND_INDEX_ACTION = "source_read_and_index"; private Metrics metrics; // Some timers @@ -67,6 +70,7 @@ public class HoodieMetrics { public String finalizeTimerName = null; public String compactionTimerName = null; public String indexTimerName = null; + public String sourceReadAndIndexTimerName = null; private String conflictResolutionTimerName = null; private String conflictResolutionSuccessCounterName = null; private String conflictResolutionFailureCounterName = null; @@ -83,6 +87,7 @@ public class HoodieMetrics { private Timer logCompactionTimer = null; private Timer clusteringTimer = null; private Timer indexTimer = null; + private Timer sourceReadAndIndexTimer = null; private Timer conflictResolutionTimer = null; private Counter conflictResolutionSuccessCounter = null; private Counter conflictResolutionFailureCounter = null; @@ -103,6 +108,7 @@ public class HoodieMetrics { this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION); this.logCompactionTimerName = getMetricsName("timer", HoodieTimeline.LOG_COMPACTION_ACTION); this.indexTimerName = getMetricsName("timer", "index"); + this.sourceReadAndIndexTimerName = getMetricsName(TIMER_ACTION, SOURCE_READ_AND_INDEX_ACTION); this.conflictResolutionTimerName = getMetricsName("timer", "conflict_resolution"); this.conflictResolutionSuccessCounterName = getMetricsName("counter", "conflict_resolution.success"); this.conflictResolutionFailureCounterName = getMetricsName("counter", "conflict_resolution.failure"); @@ -182,6 +188,13 @@ public class HoodieMetrics { return indexTimer == null ? null : indexTimer.time(); } + public Timer.Context getSourceReadAndIndexTimerCtx() { + if (config.isMetricsOn() && sourceReadAndIndexTimer == null) { + sourceReadAndIndexTimer = createTimer(sourceReadAndIndexTimerName); + } + return sourceReadAndIndexTimer == null ? null : sourceReadAndIndexTimer.time(); + } + public Timer.Context getConflictResolutionCtx() { if (config.isLockingMetricsEnabled() && conflictResolutionTimer == null) { conflictResolutionTimer = createTimer(conflictResolutionTimerName); @@ -302,6 +315,13 @@ public class HoodieMetrics { } } + public void updateSourceReadAndIndexMetrics(final String action, final long durationInMs) { + if (config.isMetricsOn()) { + LOG.info(String.format("Sending %s metrics (%s.duration, %d)", SOURCE_READ_AND_INDEX_ACTION, action, durationInMs)); + metrics.registerGauge(getMetricsName(SOURCE_READ_AND_INDEX_ACTION, String.format("%s.duration", action)), durationInMs); + } + } + @VisibleForTesting public String getMetricsName(String action, String metric) { if (config == null) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java index d771a574e37..d67ca637603 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/HoodieWriteMetadata.java @@ -34,6 +34,7 @@ public class HoodieWriteMetadata<O> { private O writeStatuses; private Option<Duration> indexLookupDuration = Option.empty(); + private Option<Long> sourceReadAndIndexDurationMs = Option.empty(); // Will be set when auto-commit happens private boolean isCommitted; @@ -59,6 +60,9 @@ public class HoodieWriteMetadata<O> { if (indexLookupDuration.isPresent()) { newMetadataInstance.setIndexLookupDuration(indexLookupDuration.get()); } + if (sourceReadAndIndexDurationMs.isPresent()) { + newMetadataInstance.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs.get()); + } newMetadataInstance.setCommitted(isCommitted); newMetadataInstance.setCommitMetadata(commitMetadata); if (writeStats.isPresent()) { @@ -132,6 +136,14 @@ public class HoodieWriteMetadata<O> { this.indexLookupDuration = Option.ofNullable(indexLookupDuration); } + public Option<Long> getSourceReadAndIndexDurationMs() { + return sourceReadAndIndexDurationMs; + } + + public void setSourceReadAndIndexDurationMs(Long sourceReadAndIndexDurationMs) { + this.sourceReadAndIndexDurationMs = Option.of(sourceReadAndIndexDurationMs); + } + public Map<String, List<String>> getPartitionToReplaceFileIds() { return partitionToReplaceFileIds.orElse(Collections.emptyMap()); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java index b5edc7878f9..ff47b636098 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java @@ -29,9 +29,6 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; -import java.time.Duration; -import java.time.Instant; - public abstract class BaseWriteHelper<T, I, K, O, R> extends ParallelismHelper<I> { protected BaseWriteHelper(SerializableFunctionUnchecked<I, Integer> partitionNumberExtractor) { @@ -51,17 +48,14 @@ public abstract class BaseWriteHelper<T, I, K, O, R> extends ParallelismHelper<I I dedupedRecords = combineOnCondition(shouldCombine, inputRecords, configuredShuffleParallelism, table); - Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; if (table.getIndex().requiresTagging(operationType)) { // perform index loop up to get existing location of records context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName()); taggedRecords = tag(dedupedRecords, context, table); } - Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now()); HoodieWriteMetadata<O> result = executor.execute(taggedRecords); - result.setIndexLookupDuration(indexLookupDuration); return result; } catch (Throwable e) { if (e instanceof HoodieUpsertException) { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java index 73b9646d577..39cd0dc444f 100755 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -87,6 +87,14 @@ public class TestHoodieMetrics { long msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue(); assertTrue(msec > 0); + // Source read and index metrics + timer = hoodieMetrics.getSourceReadAndIndexTimerCtx(); + Thread.sleep(5); // Ensure timer duration is > 0 + hoodieMetrics.updateSourceReadAndIndexMetrics("some_action", hoodieMetrics.getDurationInMs(timer.stop())); + metricName = hoodieMetrics.getMetricsName("source_read_and_index", "some_action.duration"); + msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue(); + assertTrue(msec > 0); + // test index type metricName = hoodieMetrics.getMetricsName("index", "type"); for (HoodieIndex.IndexType indexType: HoodieIndex.IndexType.values()) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index a438df4e047..bbdd34835ad 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -45,6 +45,7 @@ import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; import org.apache.hudi.metrics.DistributedRegistry; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; @@ -155,8 +156,8 @@ public class SparkRDDWriteClient<T> extends preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient()); HoodieWriteMetadata<HoodieData<WriteStatus>> result = table.upsert(context, instantTime, HoodieJavaRDD.of(records)); HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses())); - if (result.getIndexLookupDuration().isPresent()) { - metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis()); + if (result.getSourceReadAndIndexDurationMs().isPresent()) { + metrics.updateSourceReadAndIndexMetrics(HoodieMetrics.DURATION_STR, result.getSourceReadAndIndexDurationMs().get()); } return postWrite(resultRDD, instantTime, table); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 30e3cb533b1..129ace5f8d1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.CommitUtils; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; @@ -162,9 +163,12 @@ public abstract class BaseSparkCommitActionExecutor<T> extends HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate = clusteringHandleUpdate(inputRecords); context.setJobStatus(this.getClass().getSimpleName(), "Building workload profile:" + config.getTableName()); + HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start(); // time taken from dedup -> tag location -> building workload profile WorkloadProfile workloadProfile = new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), operationType, table.getIndex().canIndexLogFiles()); LOG.debug("Input workload profile :" + workloadProfile); + long sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.endTimer(); + LOG.info("Source read and index timer " + sourceReadAndIndexDurationMs); // partition using the insert partitioner final Partitioner partitioner = getPartitioner(workloadProfile); @@ -174,6 +178,7 @@ public abstract class BaseSparkCommitActionExecutor<T> extends HoodieData<WriteStatus> writeStatuses = mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner); HoodieWriteMetadata<HoodieData<WriteStatus>> result = new HoodieWriteMetadata<>(); updateIndexAndCommitIfNeeded(writeStatuses, result); + result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs); return result; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java index 270ac864012..2976234245b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java @@ -43,6 +43,6 @@ public class SparkUpsertDeltaCommitActionExecutor<T> @Override public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() { return HoodieWriteHelper.newInstance().write(instantTime, inputRecordsRDD, context, table, - config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(),this, operationType); + config.shouldCombineBeforeUpsert(), config.getUpsertShuffleParallelism(), this, operationType); } }
