nsivabalan commented on a change in pull request #4848:
URL: https://github.com/apache/hudi/pull/4848#discussion_r810405559
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat,
AppendResult result) {
statuses.add(this.writeStatus);
}
- private void processAppendResult(AppendResult result) {
+ /**
+ * Get column statistics for the records part of this append handle.
+ *
+ * @param filePath - Log file that records are part of
+ * @param recordList - List of records appended to the log for which
column statistics is needed for
+ * @param columnRangeMap - Output map to accumulate the column statistics
for the records
+ */
+ private void getRecordsStats(final String filePath, List<IndexedRecord>
recordList,
+ Map<String,
HoodieColumnRangeMetadata<Comparable>> columnRangeMap) {
+ recordList.forEach(record -> accumulateColumnRanges(record,
writeSchemaWithMetaFields, filePath, columnRangeMap,
config.isConsistentLogicalTimestampEnabled()));
+ }
+
+ /**
+ * Accumulate column range statistics for the requested record.
+ *
+ * @param record - Record to get the column range statistics for
+ * @param schema - Schema for the record
+ * @param filePath - File that record belongs to
+ */
+ private static void accumulateColumnRanges(IndexedRecord record, Schema
schema, String filePath,
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap,
boolean consistentLogicalTimestampEnabled) {
+ if (!(record instanceof GenericRecord)) {
+ throw new HoodieIOException("Record is not a generic type to get column
range metadata!");
+ }
+ schema.getFields().forEach(field -> {
+ final String fieldVal =
HoodieAvroUtils.getNestedFieldValAsString((GenericRecord) record, field.name(),
true, consistentLogicalTimestampEnabled);
+ final int fieldSize = fieldVal == null ? 0 : fieldVal.length();
+ final HoodieColumnRangeMetadata<Comparable> fieldRange = new
HoodieColumnRangeMetadata<>(
+ filePath,
+ field.name(),
+ fieldVal,
+ fieldVal,
+ fieldVal == null ? 1 : 0, // null count
+ fieldVal == null ? 0 : 1, // value count
+ fieldSize,
+ fieldSize
Review comment:
incase of avro, total size and total uncompressed size is gonna be same
?
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -330,78 +319,67 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
});
});
- return engineContext.map(deleteFileList, deleteFileInfo -> {
- return HoodieMetadataPayload.createBloomFilterMetadataRecord(
- deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime,
ByteBuffer.allocate(0), true);
- }, 1).stream().collect(Collectors.toList());
+ HoodieData<Pair<String, String>> deleteFileListRDD =
engineContext.parallelize(deleteFileList,
+ Math.max(deleteFileList.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+ return deleteFileListRDD.map(deleteFileInfo ->
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime,
StringUtils.EMPTY_STRING,
+ ByteBuffer.allocate(0), true));
}
/**
* Convert clean metadata to column stats index records.
*
- * @param cleanMetadata - Clean action metadata
- * @param engineContext - Engine context
- * @param datasetMetaClient - data table meta client
+ * @param cleanMetadata - Clean action metadata
+ * @param engineContext - Engine context
+ * @param recordsGenerationParams - Parameters for bloom filter record
generation
* @return List of column stats index records for the clean metadata
*/
- public static List<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
-
HoodieEngineContext engineContext,
-
HoodieTableMetaClient datasetMetaClient) {
+ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCleanMetadata cleanMetadata,
+
HoodieEngineContext engineContext,
+
MetadataRecordsGenerationParams recordsGenerationParams) {
List<Pair<String, String>> deleteFileList = new ArrayList<>();
cleanMetadata.getPartitionMetadata().forEach((partition,
partitionMetadata) -> {
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
deletedFiles.forEach(entry -> deleteFileList.add(Pair.of(partition,
entry)));
});
- List<String> latestColumns = getLatestColumns(datasetMetaClient);
- return engineContext.flatMap(deleteFileList,
- deleteFileInfo -> {
- if
(deleteFileInfo.getRight().endsWith(HoodieFileFormat.PARQUET.getFileExtension()))
{
- return getColumnStats(deleteFileInfo.getKey(),
deleteFileInfo.getValue(), datasetMetaClient,
- latestColumns, true);
- }
- return Stream.empty();
- }, 1).stream().collect(Collectors.toList());
+ final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(),
recordsGenerationParams.isAllColumnStatsIndexEnabled());
Review comment:
is it possible to move the generation of columnsToIndex to some higher
layer and avoid repeated computing.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -330,78 +319,67 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
});
});
- return engineContext.map(deleteFileList, deleteFileInfo -> {
- return HoodieMetadataPayload.createBloomFilterMetadataRecord(
- deleteFileInfo.getLeft(), deleteFileInfo.getRight(), instantTime,
ByteBuffer.allocate(0), true);
- }, 1).stream().collect(Collectors.toList());
+ HoodieData<Pair<String, String>> deleteFileListRDD =
engineContext.parallelize(deleteFileList,
+ Math.max(deleteFileList.size(),
recordsGenerationParams.getBloomIndexParallelism()));
Review comment:
again. if min makes sense, do fix in all places.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -831,7 +828,7 @@ public static HoodieTableFileSystemView
getFileSystemView(HoodieTableMetaClient
* @param datasetMetaClient - Data table meta client
* @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing
enabled for all columns
*/
- private static List<String> getLatestColumns(HoodieTableMetaClient
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+ private static List<String> getColumnsToIndex(HoodieTableMetaClient
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
Review comment:
a comment about L 834. I feel we can't directly take in
RecordKeyFieldProp as is. may not work for all key gens.
may be we have to split with "," and then set the columns to index.
Can you think if there are any other places where we have this dependency
and check if we have done the right thing
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##########
@@ -320,7 +325,48 @@ private void updateWriteStatus(HoodieDeltaWriteStat stat,
AppendResult result) {
statuses.add(this.writeStatus);
}
- private void processAppendResult(AppendResult result) {
+ /**
+ * Get column statistics for the records part of this append handle.
+ *
+ * @param filePath - Log file that records are part of
+ * @param recordList - List of records appended to the log for which
column statistics is needed for
+ * @param columnRangeMap - Output map to accumulate the column statistics
for the records
+ */
+ private void getRecordsStats(final String filePath, List<IndexedRecord>
recordList,
Review comment:
may be we can name this "setRecordsStats".
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -165,6 +165,12 @@
+ "used for pruning files during the index lookups. Only applies if "
+ ENABLE_METADATA_INDEX_COLUMN_STATS.key() + " is enabled.A");
+ public static final ConfigProperty<Integer> COLUMN_STATS_INDEX_PARALLELISM =
ConfigProperty
+ .key(METADATA_PREFIX + ".index.column.stats.parallelism")
+ .defaultValue(1)
Review comment:
why 1. can we make this 10 may be.
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -871,27 +879,39 @@ protected void bootstrapCommit(List<DirectoryInfo>
partitionInfoList, String cre
return HoodieMetadataPayload.createPartitionFilesRecord(
partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME :
partitionInfo.getRelativePath(), Option.of(validFileNameToSizeMap),
Option.empty());
});
- partitionRecords = partitionRecords.union(fileListRecords);
+ filesPartitionRecords = filesPartitionRecords.union(fileListRecords);
Review comment:
I did leave this comment in one of the previous patches. but can we make
the partition path name deduction to a method and reuse that everywhere.
```
partitionInfo.getRelativePath().isEmpty() ? NON_PARTITIONED_NAME :
partitionInfo.getRelativePath()
```
we already had some bugs around non partitioned dataset. so wanted to keep
it in one place.
also this one
```
String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
```
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -187,94 +178,90 @@ public static void deleteMetadataTable(String basePath,
HoodieEngineContext cont
/**
* Convert commit action metadata to bloom filter records.
*
- * @param commitMetadata - Commit action metadata
- * @param dataMetaClient - Meta client for the data table
- * @param instantTime - Action instant time
- * @return List of metadata table records
+ * @param context - Engine context to use
+ * @param commitMetadata - Commit action metadata
+ * @param instantTime - Action instant time
+ * @param recordsGenerationParams - Parameters for bloom filter record
generation
+ * @return HoodieData of metadata table records
*/
- public static List<HoodieRecord>
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
-
HoodieTableMetaClient dataMetaClient,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- commitMetadata.getPartitionToWriteStats().forEach((partitionStatName,
writeStats) -> {
- final String partition = partitionStatName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionStatName;
- Map<String, Long> newFiles = new HashMap<>(writeStats.size());
- writeStats.forEach(hoodieWriteStat -> {
- // No action for delta logs
- if (hoodieWriteStat instanceof HoodieDeltaWriteStat) {
- return;
- }
+ public static HoodieData<HoodieRecord> convertMetadataToBloomFilterRecords(
+ HoodieEngineContext context, HoodieCommitMetadata commitMetadata,
+ String instantTime, MetadataRecordsGenerationParams
recordsGenerationParams) {
+ final List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+ if (allWriteStats.isEmpty()) {
+ return context.emptyHoodieData();
+ }
- String pathWithPartition = hoodieWriteStat.getPath();
- if (pathWithPartition == null) {
- // Empty partition
- LOG.error("Failed to find path in write stat to update metadata
table " + hoodieWriteStat);
- return;
- }
- int offset = partition.equals(NON_PARTITIONED_NAME) ?
(pathWithPartition.startsWith("/") ? 1 : 0) :
- partition.length() + 1;
+ HoodieData<HoodieWriteStat> allWriteStatsRDD =
context.parallelize(allWriteStats,
+ Math.max(recordsGenerationParams.getBloomIndexParallelism(),
allWriteStats.size()));
Review comment:
shouldn't we be doing min here instead of max. if there are only 10
writeStats, why do we parallelize across 100 (if config is set to 100)?
applicable to everywhere we do this based on colsStatsParallelism and
bloomIndexParallelism.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -609,82 +578,124 @@ private static void
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
}
/**
- * Convert rollback action metadata to bloom filter index records.
+ * Convert added and deleted files metadata to bloom filter index records.
*/
- private static List<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
-
HoodieTableMetaClient dataMetaClient,
-
Map<String, List<String>> partitionToDeletedFiles,
-
Map<String, Map<String, Long>> partitionToAppendedFiles,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return;
- }
+ public static HoodieData<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+
MetadataRecordsGenerationParams recordsGenerationParams,
+
String instantTime) {
+ HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+
+ List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
+ .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
+ HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList,
+ Math.max(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+
+ HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+ final String partitionName = partitionToDeletedFilesEntry.getLeft();
+ final List<String> deletedFileList =
partitionToDeletedFilesEntry.getRight();
+ return deletedFileList.stream().flatMap(deletedFile -> {
+ if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+ return Stream.empty();
+ }
- final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
- }));
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+ partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
+ }).iterator();
+ });
+ allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
- partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+ List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
+ .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList,
+ Math.max(partitionToAppendedFiles.size(),
recordsGenerationParams.getBloomIndexParallelism()));
+
+ HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+ final String partitionName = partitionToAppendedFilesEntry.getKey();
+ final Map<String, Long> appendedFileMap =
partitionToAppendedFilesEntry.getValue();
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- appendedFileMap.forEach((appendedFile, length) -> {
+ return
appendedFileMap.entrySet().stream().flatMap(appendedFileLengthPairEntry -> {
+ final String appendedFile = appendedFileLengthPairEntry.getKey();
if (!FSUtils.isBaseFile(new Path(appendedFile))) {
- return;
+ return Stream.empty();
}
final String pathWithPartition = partitionName + "/" + appendedFile;
- final Path appendedFilePath = new Path(dataMetaClient.getBasePath(),
pathWithPartition);
- try {
- HoodieFileReader<IndexedRecord> fileReader =
-
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(),
appendedFilePath);
+ final Path appendedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ try (HoodieFileReader<IndexedRecord> fileReader =
+
HoodieFileReaderFactory.getFileReader(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
appendedFilePath)) {
final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
if (fileBloomFilter == null) {
LOG.error("Failed to read bloom filter for " + appendedFilePath);
- return;
+ return Stream.empty();
}
ByteBuffer bloomByteBuffer =
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
HoodieRecord record =
HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, appendedFile, instantTime, bloomByteBuffer, false);
- records.add(record);
- fileReader.close();
+ partition, appendedFile, instantTime,
recordsGenerationParams.getBloomFilterType(), bloomByteBuffer, false);
+ return Stream.of(record);
} catch (IOException e) {
LOG.error("Failed to get bloom filter for file: " +
appendedFilePath);
}
- });
+ return Stream.empty();
+ }).iterator();
});
- return records;
+ allRecordsRDD = allRecordsRDD.union(appendedFilesRecordsRDD);
+
+ return allRecordsRDD;
}
/**
- * Convert rollback action metadata to column stats index records.
+ * Convert added and deleted action metadata to column stats index records.
*/
- private static List<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
-
HoodieTableMetaClient datasetMetaClient,
-
Map<String, List<String>> partitionToDeletedFiles,
-
Map<String, Map<String, Long>> partitionToAppendedFiles,
- String
instantTime) {
- List<HoodieRecord> records = new LinkedList<>();
- List<String> latestColumns = getLatestColumns(datasetMetaClient);
- partitionToDeletedFiles.forEach((partitionName, deletedFileList) ->
deletedFileList.forEach(deletedFile -> {
+ public static HoodieData<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+
Map<String, List<String>> partitionToDeletedFiles,
+
Map<String, Map<String, Long>> partitionToAppendedFiles,
+
MetadataRecordsGenerationParams recordsGenerationParams) {
+ HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+ final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams.getDataMetaClient(),
recordsGenerationParams.isAllColumnStatsIndexEnabled());
+
+ final List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
+ .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
+ final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList,
+ Math.max(partitionToDeletedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()));
+
+ HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesEntry -> {
+ final String partitionName = partitionToDeletedFilesEntry.getLeft();
final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
- if (deletedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ final List<String> deletedFileList =
partitionToDeletedFilesEntry.getRight();
+
+ return deletedFileList.stream().flatMap(deletedFile -> {
final String filePathWithPartition = partitionName + "/" + deletedFile;
- records.addAll(getColumnStats(partition, filePathWithPartition,
datasetMetaClient,
- latestColumns, true).collect(Collectors.toList()));
- }
- }));
-
- partitionToAppendedFiles.forEach((partitionName, appendedFileMap) ->
appendedFileMap.forEach(
- (appendedFile, size) -> {
- final String partition = partitionName.equals(EMPTY_PARTITION_NAME)
? NON_PARTITIONED_NAME : partitionName;
- if
(appendedFile.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- final String filePathWithPartition = partitionName + "/" +
appendedFile;
- records.addAll(getColumnStats(partition, filePathWithPartition,
datasetMetaClient,
- latestColumns, false).collect(Collectors.toList()));
- }
- }));
- return records;
+ return getColumnStats(partition, filePathWithPartition,
recordsGenerationParams.getDataMetaClient(), columnsToIndex, true);
+ }).iterator();
+ });
+ allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+
+ final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
+ .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
+ final HoodieData<Pair<String, Map<String, Long>>>
partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList,
+ Math.max(partitionToAppendedFiles.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()));
+
+ HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesEntry -> {
+ final String partitionName = partitionToAppendedFilesEntry.getLeft();
+ final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ?
NON_PARTITIONED_NAME : partitionName;
+ final Map<String, Long> appendedFileMap =
partitionToAppendedFilesEntry.getRight();
+
+ return
appendedFileMap.entrySet().stream().flatMap(appendedFileNameLengthPair -> {
+ // TODO: HUDI-3374 Handle log files without delta write stat to get
records column stats
Review comment:
can we remove this comment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]