yihua commented on code in PR #12105:
URL: https://github.com/apache/hudi/pull/12105#discussion_r1815625645
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -840,12 +840,17 @@ private Map<String, String>
reverseLookupSecondaryKeys(String partitionName, Lis
}
Set<String> keySet = new TreeSet<>(recordKeys);
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (keySet.contains(recordKey)) {
- logRecordsMap.put(recordKey, record);
+ if (!payload.isDeleted()) { // process only valid records.
+ String recordKey = payload.getRecordKeyFromSecondaryIndex();
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ }
+ } else {
+ deletedRecordsFromLogs.add(record.getRecordKey());
Review Comment:
Why do we need custom log merging logic here for secondary index only?
Should it be handled by `HoodieMetadataPayload` directly?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1240,35 +1243,36 @@ private static Stream<HoodieRecord>
getColumnStatsRecords(String partitionPath,
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadataList, true);
}
-
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
- readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, false, Option.empty());
+ readColumnRangeMetadataFrom(partitionPath, fileName,
datasetMetaClient, columnsToIndex, fetchStatsForLogFiles, writerSchemaOpt,
maxBufferSize);
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadata, false);
}
- private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String filePath,
+ private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String partitionPath,
+
String fileName,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
-
boolean shouldReadColumnStatsForLogFiles,
-
Option<Schema> writerSchemaOpt) {
+
boolean fetchStatsForLogFiles,
+
Option<Schema> writerSchemaOpt,
+
int maxBufferSize) {
+ String partitionPathFileName = partitionPath.equals(EMPTY_PARTITION_NAME)
? fileName : partitionPath + "/" + fileName;
try {
- StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
- if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), partitionPathFileName);
+ if
(partitionPathFileName.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
- } else if (FSUtils.isLogFile(fullFilePath) &&
shouldReadColumnStatsForLogFiles) {
- LOG.warn("Reading log file: {}, to build column range metadata.",
fullFilePath);
- return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt);
+ } else if (fetchStatsForLogFiles && FSUtils.isLogFile(fileName)) {
Review Comment:
Remove `fetchStatsForLogFiles` so log file stats should always be fetched.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -953,7 +948,9 @@ public static HoodieData<HoodieRecord>
convertFilesToColumnStatsRecords(HoodieEn
HoodieTableMetaClient dataMetaClient,
boolean isColumnStatsIndexEnabled,
int
columnStatsIndexParallelism,
-
List<String> targetColumnsForColumnStatsIndex) {
+
List<String> targetColumnsForColumnStatsIndex,
+
boolean fetchStatsForLogFiles,
Review Comment:
This argument should not exist.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -840,12 +840,17 @@ private Map<String, String>
reverseLookupSecondaryKeys(String partitionName, Lis
}
Set<String> keySet = new TreeSet<>(recordKeys);
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (keySet.contains(recordKey)) {
- logRecordsMap.put(recordKey, record);
+ if (!payload.isDeleted()) { // process only valid records.
+ String recordKey = payload.getRecordKeyFromSecondaryIndex();
+ if (keySet.contains(recordKey)) {
+ logRecordsMap.put(recordKey, record);
+ }
+ } else {
+ deletedRecordsFromLogs.add(record.getRecordKey());
Review Comment:
If not easy to fix, could we file a Jira ticket as a follow-up?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -840,12 +840,17 @@ private Map<String, String>
reverseLookupSecondaryKeys(String partitionName, Lis
}
Set<String> keySet = new TreeSet<>(recordKeys);
+ Set<String> deletedRecordsFromLogs = new HashSet<>();
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new
HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
- String recordKey = payload.getRecordKeyFromSecondaryIndex();
- if (keySet.contains(recordKey)) {
- logRecordsMap.put(recordKey, record);
+ if (!payload.isDeleted()) { // process only valid records.
Review Comment:
Does MDT compaction need to go through the same logic here?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -751,12 +750,8 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
return engineContext.parallelize(deleteFileList, parallelism)
.flatMap(deleteFileInfoPair -> {
String partitionPath = deleteFileInfoPair.getLeft();
- String filePath = deleteFileInfoPair.getRight();
-
- if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())
|| ExternalFilePathUtil.isExternallyCreatedFile(filePath)) {
- return getColumnStatsRecords(partitionPath, filePath,
dataMetaClient, columnsToIndex, true).iterator();
- }
- return Collections.emptyListIterator();
+ String fileName = deleteFileInfoPair.getRight();
+ return getColumnStatsRecords(partitionPath, fileName,
dataMetaClient, columnsToIndex, true).iterator();
Review Comment:
Looks like the "delete" is actually nullifying the column stats metadata
fields in the records, instead of deleting them from the table.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]