vinothchandar commented on code in PR #12313:
URL: https://github.com/apache/hudi/pull/12313#discussion_r1859219114
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1311,6 +1446,53 @@ public static
List<HoodieColumnRangeMetadata<Comparable>> getLogFileColumnRangeM
return Collections.emptyList();
}
+ @VisibleForTesting
+ public static Set<String> getDeletedRecordKeys(String filePath,
HoodieTableMetaClient datasetMetaClient,
+ Option<Schema>
writerSchemaOpt, int maxBufferSize,
+ String latestCommitTimestamp)
throws IOException {
+ if (writerSchemaOpt.isPresent()) {
+ // read log file records without merging
+ Set<String> deletedKeys = new HashSet<>();
+ HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
Review Comment:
the code duplication needs to be fixed
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
Review Comment:
make this package protected and UT
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,120 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
Review Comment:
lets file a code cleanup JIRA for 1.1 for these.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
Review Comment:
I remember leaving a comment here, that we should handle `true`/`true` case
- by not throwing an error? - for log with redundant deletes. i.e we just
encode a streaming delete for same key, without checking for the existence
first.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1311,6 +1446,53 @@ public static
List<HoodieColumnRangeMetadata<Comparable>> getLogFileColumnRangeM
return Collections.emptyList();
}
+ @VisibleForTesting
+ public static Set<String> getDeletedRecordKeys(String filePath,
HoodieTableMetaClient datasetMetaClient,
+ Option<Schema>
writerSchemaOpt, int maxBufferSize,
+ String latestCommitTimestamp)
throws IOException {
+ if (writerSchemaOpt.isPresent()) {
+ // read log file records without merging
+ Set<String> deletedKeys = new HashSet<>();
+ HoodieUnMergedLogRecordScanner scanner =
HoodieUnMergedLogRecordScanner.newBuilder()
+ .withStorage(datasetMetaClient.getStorage())
+ .withBasePath(datasetMetaClient.getBasePath())
+ .withLogFilePaths(Collections.singletonList(filePath))
+ .withBufferSize(maxBufferSize)
+ .withLatestInstantTime(latestCommitTimestamp)
+ .withReaderSchema(writerSchemaOpt.get())
+ .withTableMetaClient(datasetMetaClient)
+ .withLogRecordScannerCallbackForDeletedKeys(deletedKey ->
deletedKeys.add(deletedKey.getRecordKey()))
+ .build();
+ scanner.scan();
+ return deletedKeys;
+ }
+ return Collections.emptySet();
+ }
+
+ @VisibleForTesting
+ public static Set<String> getRecordKeys(String filePath,
HoodieTableMetaClient datasetMetaClient,
+ Option<Schema>
writerSchemaOpt, int maxBufferSize,
Review Comment:
formatting
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
Review Comment:
avoid duplicating these code blocks?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
Review Comment:
cant be specific to parquet.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
Review Comment:
throw error if there are inserts please
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -759,6 +769,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
});
}
+ static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(HoodieEngineContext engineContext,
+
HoodieCommitMetadata commitMetadata,
+
HoodieMetadataConfig metadataConfig,
+
HoodieTableMetaClient dataTableMetaClient,
+ int
writesFileIdEncoding,
+ String
instantTime) {
+
+ List<HoodieWriteStat> allWriteStats =
commitMetadata.getPartitionToWriteStats().values().stream()
+ .flatMap(Collection::stream).collect(Collectors.toList());
+
+ if (allWriteStats.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ try {
+ int parallelism = Math.max(Math.min(allWriteStats.size(),
metadataConfig.getRecordIndexMaxParallelism()), 1);
+ String basePath = dataTableMetaClient.getBasePath().toString();
+ // we might need to set some additional variables if we need to process
log files.
+ boolean anyLogFilesWithDeletes =
allWriteStats.stream().anyMatch(writeStat -> {
+ String fileName = FSUtils.getFileName(writeStat.getPath(),
writeStat.getPartitionPath());
+ return FSUtils.isLogFile(fileName) && writeStat.getNumDeletes() > 0;
+ });
+ Option<Schema> writerSchemaOpt = Option.empty();
+ if (anyLogFilesWithDeletes) { // if we have a log file w/ deletes.
+ writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
+ }
+ int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
+ StorageConfiguration storageConfiguration =
dataTableMetaClient.getStorageConf();
+ Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
+ HoodieData<HoodieRecord> recordIndexRecords =
engineContext.parallelize(allWriteStats, parallelism)
+ .flatMap(writeStat -> {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(new
StoragePath(writeStat.getPath()), storageConfiguration);
+ // handle base files
+ if
(writeStat.getPath().endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
+ } else {
+ // for logs, we only need to process log files containing deletes
+ if (writeStat.getNumDeletes() > 0) {
+ StoragePath fullFilePath = new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
+ Set<String> deletedRecordKeys =
getDeletedRecordKeys(fullFilePath.toString(), dataTableMetaClient,
+ finalWriterSchemaOpt, maxBufferSize, instantTime);
+ return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
+ }
+ // ignore log file data blocks.
+ return new ArrayList<HoodieRecord>().iterator();
+ }
+ });
+
+ // there are chances that same record key from data table has 2 entries
(1 delete from older partition and 1 insert to newer partition)
+ // lets do reduce by key to ignore the deleted entry.
+ return reduceByKeys(recordIndexRecords, parallelism);
+ } catch (Exception e) {
+ throw new HoodieException("Failed to generate RLI records for metadata
table", e);
+ }
+ }
+
+ /**
+ * There are chances that same record key from data table has 2 entries (1
delete from older partition and 1 insert to newer partition)
+ * So, this method performs reduce by key to ignore the deleted entry.
+ * @param recordIndexRecords hoodie records after rli index lookup.
+ * @param parallelism parallelism to use.
+ * @return
+ */
+ private static HoodieData<HoodieRecord>
reduceByKeys(HoodieData<HoodieRecord> recordIndexRecords, int parallelism) {
+ return recordIndexRecords.mapToPair(
+ (SerializablePairFunction<HoodieRecord, HoodieKey, HoodieRecord>)
t -> Pair.of(t.getKey(), t))
+ .reduceByKey((SerializableBiFunction<HoodieRecord, HoodieRecord,
HoodieRecord>) (record1, record2) -> {
+ boolean isRecord1Deleted = record1.getData() instanceof
EmptyHoodieRecordPayload;
+ boolean isRecord2Deleted = record2.getData() instanceof
EmptyHoodieRecordPayload;
+ if (isRecord1Deleted && !isRecord2Deleted) {
+ return record2;
+ } else if (!isRecord1Deleted && isRecord2Deleted) {
+ return record1;
+ } else {
+ throw new HoodieIOException("Two HoodieRecord updates to RLI is
seen for same record key " + record2.getRecordKey() + ", record 1 : "
+ + record1.getData().toString() + ", record 2 : " +
record2.getData().toString());
+ }
+ }, parallelism).values();
+ }
+
+ static List<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext
engineContext,
+
HoodieCommitMetadata commitMetadata,
Review Comment:
formatting
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]