This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5cdfacba1bd [HUDI-5496] Avoid unnecessary file system parsing to
initialize metadata table for a new data table (#7841)
5cdfacba1bd is described below
commit 5cdfacba1bddfddb7b2b6a75a3ff0e3b6766392b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Feb 3 17:59:18 2023 -0800
[HUDI-5496] Avoid unnecessary file system parsing to initialize metadata
table for a new data table (#7841)
- Optimizing instantiation of metadata table for a fresh table by avoiding
file listing
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +
.../metadata/HoodieBackedTableMetadataWriter.java | 64 ++++++++++++----------
.../SparkHoodieBackedTableMetadataWriter.java | 1 +
.../internal/HoodieDataSourceInternalWriter.java | 4 +-
4 files changed, 39 insertions(+), 32 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index c3260914bd5..17956479762 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -519,6 +519,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*/
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata) {
try {
+ context.setJobStatus(this.getClass().getSimpleName(),"Cleaning up marker
directories for commit " + instantTime + " in table "
+ + config.getTableName());
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a8356ff9c71..5e8367e2095 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1086,39 +1086,45 @@ public abstract class HoodieBackedTableMetadataWriter
implements HoodieTableMeta
Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap
= new HashMap<>();
- List<DirectoryInfo> partitionInfoList = listAllPartitions(dataMetaClient);
- Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
- .map(p -> {
- String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
- return Pair.of(partitionName, p.getFileNameToSizeMap());
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
-
- int totalDataFilesCount =
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
- List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
-
- if (partitionTypes.contains(MetadataPartitionType.FILES)) {
- // Record which saves the list of all partitions
- HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
- HoodieData<HoodieRecord> filesPartitionRecords =
getFilesPartitionRecords(createInstantTime, partitionInfoList,
allPartitionRecord);
- ValidationUtils.checkState(filesPartitionRecords.count() ==
(partitions.size() + 1));
- partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecords);
- }
+ // skip file system listing to populate metadata records if its a fresh
table.
+ // this is applicable only if the table already has N commits and metadata
is enabled at a later point in time.
+ if (createInstantTime.equals(SOLO_COMMIT_TIMESTAMP)) { //
SOLO_COMMIT_TIMESTAMP will be the initial commit time in MDT for a fresh table.
+ // If not, last completed commit in data table will be chosen as the
initial commit time.
+ LOG.info("Triggering empty Commit to metadata to initialize");
+ } else {
+ List<DirectoryInfo> partitionInfoList =
listAllPartitions(dataMetaClient);
+ Map<String, Map<String, Long>> partitionToFilesMap =
partitionInfoList.stream()
+ .map(p -> {
+ String partitionName =
HoodieTableMetadataUtil.getPartitionIdentifier(p.getRelativePath());
+ return Pair.of(partitionName, p.getFileNameToSizeMap());
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+
+ int totalDataFilesCount =
partitionToFilesMap.values().stream().mapToInt(Map::size).sum();
+ List<String> partitions = new ArrayList<>(partitionToFilesMap.keySet());
+
+ if (partitionTypes.contains(MetadataPartitionType.FILES)) {
+ // Record which saves the list of all partitions
+ HoodieRecord allPartitionRecord =
HoodieMetadataPayload.createPartitionListRecord(partitions);
+ HoodieData<HoodieRecord> filesPartitionRecords =
getFilesPartitionRecords(createInstantTime, partitionInfoList,
allPartitionRecord);
+ ValidationUtils.checkState(filesPartitionRecords.count() ==
(partitions.size() + 1));
+ partitionToRecordsMap.put(MetadataPartitionType.FILES,
filesPartitionRecords);
+ }
- if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) &&
totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
- partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
recordsRDD);
- }
+ if (partitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS) &&
totalDataFilesCount > 0) {
+ final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToBloomFilterRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams(), createInstantTime);
+ partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS,
recordsRDD);
+ }
- if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) &&
totalDataFilesCount > 0) {
- final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
- engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
- partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
recordsRDD);
+ if (partitionTypes.contains(MetadataPartitionType.COLUMN_STATS) &&
totalDataFilesCount > 0) {
+ final HoodieData<HoodieRecord> recordsRDD =
HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
+ engineContext, Collections.emptyMap(), partitionToFilesMap,
getRecordsGenerationParams());
+ partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS,
recordsRDD);
+ }
+ LOG.info("Committing " + partitions.size() + " partitions and " +
totalDataFilesCount + " files to metadata");
}
- LOG.info("Committing " + partitions.size() + " partitions and " +
totalDataFilesCount + " files to metadata");
-
commit(createInstantTime, partitionToRecordsMap, false);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 81526c25bcc..23537f6f798 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -133,6 +133,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
HoodieData<HoodieRecord> preppedRecords = prepRecords(partitionRecordsMap);
JavaRDD<HoodieRecord> preppedRecordRDD =
HoodieJavaRDD.getJavaRDD(preppedRecords);
+ engineContext.setJobStatus(this.getClass().getName(), "Committing " +
instantTime + " to metadata table " + metadataWriteConfig.getTableName());
try (SparkRDDWriteClient writeClient = new
SparkRDDWriteClient(engineContext, metadataWriteConfig)) {
// rollback partially failed writes if any.
if (writeClient.rollbackFailedWrites()) {
diff --git
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
index c4b21483e8f..11f5d5030b4 100644
---
a/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark2/src/main/java/org/apache/hudi/internal/HoodieDataSourceInternalWriter.java
@@ -34,7 +34,6 @@ import
org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -51,7 +50,6 @@ public class HoodieDataSourceInternalWriter implements
DataSourceWriter {
private final DataSourceInternalWriterHelper dataSourceInternalWriterHelper;
private final boolean populateMetaFields;
private final Boolean arePartitionRecordsSorted;
- private Map<String, String> extraMetadataMap = new HashMap<>();
public HoodieDataSourceInternalWriter(String instantTime, HoodieWriteConfig
writeConfig, StructType structType,
SparkSession sparkSession,
Configuration configuration, DataSourceOptions dataSourceOptions,
@@ -61,7 +59,7 @@ public class HoodieDataSourceInternalWriter implements
DataSourceWriter {
this.structType = structType;
this.populateMetaFields = populateMetaFields;
this.arePartitionRecordsSorted = arePartitionRecordsSorted;
- this.extraMetadataMap =
DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
+ Map<String, String> extraMetadataMap =
DataSourceUtils.getExtraMetadata(dataSourceOptions.asMap());
this.dataSourceInternalWriterHelper = new
DataSourceInternalWriterHelper(instantTime, writeConfig, structType,
sparkSession, configuration, extraMetadataMap);
}