nsivabalan commented on code in PR #9223:
URL: https://github.com/apache/hudi/pull/9223#discussion_r1271335512
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -848,64 +851,49 @@ public static HoodieData<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEn
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-
- List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
- .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
- int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionName = partitionToDeletedFilesPair.getLeft();
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
- return deletedFileList.stream().flatMap(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return Stream.empty();
- }
-
- final String partition = getPartitionIdentifier(partitionName);
- return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
- }).iterator();
- });
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
+ partitionToDeletedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new
Tuple3<>(entry.getKey(), deletedFile, true)))
Review Comment:
should we filter out the log files here itself preemptively rather than in
L873?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -915,65 +903,60 @@ public static HoodieData<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEn
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+ // Find the columns to index
HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
-
final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
-
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
- final List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream()
- .map(e -> Pair.of(e.getKey(), e.getValue()))
- .collect(Collectors.toList());
-
- int deletedFilesTargetParallelism =
Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
- engineContext.parallelize(partitionToDeletedFilesList,
deletedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionPath = partitionToDeletedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
-
- return deletedFileList.stream().flatMap(deletedFile -> {
- final String filePathWithPartition = partitionPath + "/" + deletedFile;
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, true);
- }).iterator();
- });
-
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
-
- final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream()
- .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
- .collect(Collectors.toList());
-
- int appendedFilesTargetParallelism =
Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, Map<String, Long>>>
partitionToAppendedFilesRDD =
- engineContext.parallelize(partitionToAppendedFilesList,
appendedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
- final String partitionPath = partitionToAppendedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final Map<String, Long> appendedFileMap =
partitionToAppendedFilesPair.getRight();
+ LOG.info(String.format("Indexing %d columns for column stats index",
columnsToIndex.size()));
+
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
+ partitionToDeletedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new
Tuple3<>(entry.getKey(), deletedFile, true)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+ partitionToAppendedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile ->
new Tuple3<>(entry.getKey(), addedFile, false)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+
+ // Create records MDT
+ int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
Review Comment:
getColumnStatsIndexParallelism
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -848,64 +851,49 @@ public static HoodieData<HoodieRecord>
convertFilesToBloomFilterRecords(HoodieEn
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams,
String instantTime) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
-
- List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet()
- .stream().map(e -> Pair.of(e.getKey(),
e.getValue())).collect(Collectors.toList());
- int parallelism = Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
engineContext.parallelize(partitionToDeletedFilesList, parallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionName = partitionToDeletedFilesPair.getLeft();
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
- return deletedFileList.stream().flatMap(deletedFile -> {
- if (!FSUtils.isBaseFile(new Path(deletedFile))) {
- return Stream.empty();
- }
-
- final String partition = getPartitionIdentifier(partitionName);
- return
Stream.<HoodieRecord>of(HoodieMetadataPayload.createBloomFilterMetadataRecord(
- partition, deletedFile, instantTime, StringUtils.EMPTY_STRING,
ByteBuffer.allocate(0), true));
- }).iterator();
- });
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
+ partitionToDeletedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new
Tuple3<>(entry.getKey(), deletedFile, true)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+ partitionToAppendedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile ->
new Tuple3<>(entry.getKey(), addedFile, false)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+
+ // Create records MDT
+ int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
+ return engineContext.parallelize(partitionFileFlagTupleList,
parallelism).flatMap(partitionFileFlagTuple -> {
+ final String partitionName = partitionFileFlagTuple._1();
+ final String filename = partitionFileFlagTuple._2();
+ final boolean isDeleted = partitionFileFlagTuple._3();
+ if (!FSUtils.isBaseFile(new Path(filename))) {
+ LOG.warn(String.format("Ignoring file %s as it is not a base file",
filename));
+ return Stream.<HoodieRecord>empty().iterator();
+ }
- List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet()
- .stream().map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList());
- parallelism = Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
- HoodieData<Pair<String, Map<String, Long>>> partitionToAppendedFilesRDD =
engineContext.parallelize(partitionToAppendedFilesList, parallelism);
+ // Read the bloom filter from the base file if the file is being added
+ ByteBuffer bloomFilterBuffer = ByteBuffer.allocate(0);
+ if (!isDeleted) {
+ final String pathWithPartition = partitionName + "/" + filename;
+ final Path addedFilePath = new
Path(recordsGenerationParams.getDataMetaClient().getBasePath(),
pathWithPartition);
+ bloomFilterBuffer =
readBloomFilter(recordsGenerationParams.getDataMetaClient().getHadoopConf(),
addedFilePath);
+
+ // If reading the bloom filter failed then do not add a record for
this file
+ if (bloomFilterBuffer == null) {
+ LOG.error("Failed to read bloom filter from " + addedFilePath);
+ return Stream.<HoodieRecord>empty().iterator();
Review Comment:
can you file a follow up ticket on this. we need to think about this failure
scenairo. for eg, would the index based on MDT bloom return right results even
if some files are missing the bloom filter.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -915,65 +903,60 @@ public static HoodieData<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEn
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
MetadataRecordsGenerationParams recordsGenerationParams) {
- HoodieData<HoodieRecord> allRecordsRDD = engineContext.emptyHoodieData();
+ // Find the columns to index
HoodieTableMetaClient dataTableMetaClient =
recordsGenerationParams.getDataMetaClient();
-
final List<String> columnsToIndex =
getColumnsToIndex(recordsGenerationParams,
Lazy.lazily(() -> tryResolveSchemaForTable(dataTableMetaClient)));
-
if (columnsToIndex.isEmpty()) {
// In case there are no columns to index, bail
return engineContext.emptyHoodieData();
}
- final List<Pair<String, List<String>>> partitionToDeletedFilesList =
partitionToDeletedFiles.entrySet().stream()
- .map(e -> Pair.of(e.getKey(), e.getValue()))
- .collect(Collectors.toList());
-
- int deletedFilesTargetParallelism =
Math.max(Math.min(partitionToDeletedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, List<String>>> partitionToDeletedFilesRDD =
- engineContext.parallelize(partitionToDeletedFilesList,
deletedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> deletedFilesRecordsRDD =
partitionToDeletedFilesRDD.flatMap(partitionToDeletedFilesPair -> {
- final String partitionPath = partitionToDeletedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final List<String> deletedFileList =
partitionToDeletedFilesPair.getRight();
-
- return deletedFileList.stream().flatMap(deletedFile -> {
- final String filePathWithPartition = partitionPath + "/" + deletedFile;
- return getColumnStatsRecords(partitionId, filePathWithPartition,
dataTableMetaClient, columnsToIndex, true);
- }).iterator();
- });
-
- allRecordsRDD = allRecordsRDD.union(deletedFilesRecordsRDD);
-
- final List<Pair<String, Map<String, Long>>> partitionToAppendedFilesList =
partitionToAppendedFiles.entrySet().stream()
- .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
- .collect(Collectors.toList());
-
- int appendedFilesTargetParallelism =
Math.max(Math.min(partitionToAppendedFilesList.size(),
recordsGenerationParams.getColumnStatsIndexParallelism()), 1);
- final HoodieData<Pair<String, Map<String, Long>>>
partitionToAppendedFilesRDD =
- engineContext.parallelize(partitionToAppendedFilesList,
appendedFilesTargetParallelism);
-
- HoodieData<HoodieRecord> appendedFilesRecordsRDD =
partitionToAppendedFilesRDD.flatMap(partitionToAppendedFilesPair -> {
- final String partitionPath = partitionToAppendedFilesPair.getLeft();
- final String partitionId = getPartitionIdentifier(partitionPath);
- final Map<String, Long> appendedFileMap =
partitionToAppendedFilesPair.getRight();
+ LOG.info(String.format("Indexing %d columns for column stats index",
columnsToIndex.size()));
+
+ // Total number of files which are added or deleted
+ final int totalFiles =
partitionToDeletedFiles.values().stream().mapToInt(List::size).sum()
+ + partitionToAppendedFiles.values().stream().mapToInt(Map::size).sum();
+
+ // Create the tuple (partition, filename, isDeleted) to handle both
deletes and appends
+ final List<Tuple3<String, String, Boolean>> partitionFileFlagTupleList =
new ArrayList<>(totalFiles);
+ partitionToDeletedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().stream().map(deletedFile -> new
Tuple3<>(entry.getKey(), deletedFile, true)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+ partitionToAppendedFiles.entrySet().stream()
+ .flatMap(entry -> entry.getValue().keySet().stream().map(addedFile ->
new Tuple3<>(entry.getKey(), addedFile, false)))
+ .collect(Collectors.toCollection(() -> partitionFileFlagTupleList));
+
+ // Create records MDT
+ int parallelism = Math.max(Math.min(partitionFileFlagTupleList.size(),
recordsGenerationParams.getBloomIndexParallelism()), 1);
+ return engineContext.parallelize(partitionFileFlagTupleList,
parallelism).flatMap(partitionFileFlagTuple -> {
+ final String partitionName = partitionFileFlagTuple._1();
+ final String filename = partitionFileFlagTuple._2();
+ final boolean isDeleted = partitionFileFlagTuple._3();
+ if (!FSUtils.isBaseFile(new Path(filename))) {
+ LOG.warn(String.format("Ignoring file %s as it is not a base file",
filename));
+ return Stream.<HoodieRecord>empty().iterator();
+ }
+ if (!FSUtils.isBaseFile(new Path(filename)) ||
!filename.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
Review Comment:
if we are not adding any log files to col stats nor bloom filter, we should
avoid routing log file info from AppendHandle all the way to MDT writer only?
just send only parquet file info.
or atleast filter out in L 925 to 929.
wdyt.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]