This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a898dfd4152 [HUDI-3727] Add metrics for async indexer (#9559)
a898dfd4152 is described below
commit a898dfd415202bee85442382502b311626fb65da
Author: Sagar Sumit <[email protected]>
AuthorDate: Thu Aug 31 03:04:01 2023 +0530
[HUDI-3727] Add metrics for async indexer (#9559)
---
.../apache/hudi/metadata/HoodieMetadataWriteUtils.java | 1 -
.../hudi/table/action/index/RunIndexActionExecutor.java | 16 +++++++++++++++-
.../org/apache/hudi/metadata/HoodieMetadataMetrics.java | 3 ++-
3 files changed, 17 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 2078896987d..e73f6fb7bc3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -68,7 +68,6 @@ public class HoodieMetadataWriteUtils {
// eventually depend on the number of file groups selected for each
partition (See estimateFileGroupCount function)
private static final long MDT_MAX_HFILE_SIZE_BYTES = 10 * 1024 * 1024 *
1024L; // 10GB
-
/**
* Create a {@code HoodieWriteConfig} to use for the Metadata Table. This
is used by async
* indexer only.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 9b91167899c..461c525a1d5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -35,11 +36,13 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieMetadataMetrics;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieTable;
@@ -90,6 +93,8 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
private static final int MAX_CONCURRENT_INDEXING = 1;
private static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000;
+ private final Option<HoodieMetadataMetrics> metrics;
+
// we use this to update the latest instant in data timeline that has been
indexed in metadata table
// this needs to be volatile as it can be updated in the IndexingCheckTask
spawned by this executor
// assumption is that only one indexer can execute at a time
@@ -100,6 +105,11 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig
config, HoodieTable<T, I, K, O> table, String instantTime) {
super(context, config, table, instantTime);
this.txnManager = new TransactionManager(config,
table.getMetaClient().getFs());
+ if (config.getMetadataConfig().enableMetrics()) {
+ this.metrics = Option.of(new
HoodieMetadataMetrics(Registry.getRegistry("HoodieIndexer")));
+ } else {
+ this.metrics = Option.empty();
+ }
}
@Override
@@ -143,7 +153,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
// this will only build index upto base instant as generated by the
plan, we will be doing catchup later
String indexUptoInstant =
indexPartitionInfos.get(0).getIndexUptoInstant();
LOG.info("Starting Index Building with base instant: " +
indexUptoInstant);
+ HoodieTimer timer = HoodieTimer.start();
metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
// get remaining instants to catchup
List<HoodieInstant> instantsToCatchup =
getInstantsToCatchup(indexUptoInstant);
@@ -167,7 +179,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
.collect(Collectors.toList());
} catch (Exception e) {
throw new HoodieMetadataException("Failed to index partition " +
Arrays.toString(indexPartitionInfos.stream()
- .map(entry ->
entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray()));
+ .map(entry ->
entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray()));
}
} else {
String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
@@ -275,7 +287,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
new IndexingCatchupTask(metadataWriter, instantsToIndex,
metadataCompletedTimestamps, table.getMetaClient(), metadataMetaClient));
try {
LOG.info("Starting index catchup task");
+ HoodieTimer timer = HoodieTimer.start();
indexingCatchupTaskFuture.get(config.getIndexingCheckTimeoutSeconds(),
TimeUnit.SECONDS);
+ metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.ASYNC_INDEXER_CATCHUP_TIME,
timer.endTimer()));
} catch (Exception e) {
indexingCatchupTaskFuture.cancel(true);
throw new HoodieIndexException(String.format("Index catchup failed.
Current indexed instant = %s. Aborting!", currentCaughtupInstant), e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
index 521b55efaed..ca9bf7b0834 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -69,6 +69,7 @@ public class HoodieMetadataMetrics implements Serializable {
public static final String SKIP_TABLE_SERVICES = "skip_table_services";
public static final String TABLE_SERVICE_EXECUTION_STATUS =
"table_service_execution_status";
public static final String TABLE_SERVICE_EXECUTION_DURATION =
"table_service_execution_duration";
+ public static final String ASYNC_INDEXER_CATCHUP_TIME =
"async_indexer_catchup_time";
private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetadataMetrics.class);
@@ -126,7 +127,7 @@ public class HoodieMetadataMetrics implements Serializable {
return stats;
}
- protected void updateMetrics(String action, long durationInMs) {
+ public void updateMetrics(String action, long durationInMs) {
if (metricsRegistry == null) {
return;
}