nsivabalan commented on code in PR #12525:
URL: https://github.com/apache/hudi/pull/12525#discussion_r1896330232
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1160,17 +1159,19 @@ private void
updateSecondaryIndexIfPresent(HoodieCommitMetadata commitMetadata,
private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
List<Pair<String, Pair<String, List<String>>>> partitionFilePairs =
getPartitionFilePairs(commitMetadata);
+ if (partitionFilePairs.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
// Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
// the secondary index partition for each of these keys.
- List<String> keysToRemove =
HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated(engineContext,
commitMetadata, dataWriteConfig.getMetadataConfig(),
- dataMetaClient, instantTime);
+ HoodieData<String> keysToRemove =
getRecordKeysDeletedOrUpdated(engineContext, commitMetadata,
dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime);
Review Comment:
Trying to see if we should optimize only to fetch keys that are part of
delete log blocks here?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1181,16 +1182,13 @@ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata c
long targetPartitionSize = 100 * 1024 * 1024;
int parallelism = (int) Math.max(1, (totalWriteBytesForSecondaryIndex +
targetPartitionSize - 1) / targetPartitionSize);
- return readSecondaryKeysFromBaseFiles(
- engineContext,
- partitionFilePairs,
- parallelism,
- this.getClass().getSimpleName(),
- dataMetaClient,
- getEngineType(),
- indexDefinition)
- .union(deletedRecords)
- .distinctWithKey(HoodieRecord::getKey, parallelism);
+ // Load file system view for only the affected partitions on the driver.
+ // By loading on the driver one time, we avoid loading the same metadata
multiple times on the executors.
+ HoodieMetadataFileSystemView fsView = getMetadataView();
+
fsView.loadPartitions(partitionFilePairs.stream().map(Pair::getKey).collect(Collectors.toList()));
+ HoodieData<HoodieRecord> insertRecords =
+ readSecondaryKeysFromBaseFiles(engineContext, partitionFilePairs,
parallelism, this.getClass().getSimpleName(), dataMetaClient, getEngineType(),
indexDefinition, fsView);
Review Comment:
do we have a tracking ticket to support inserts to log files. For eg, w/
bucket index, we should be able to create secondary index. May be today, we
rely on RLI. but eventually we need to support SI for any table and index type.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2241,7 +2377,7 @@ public static HoodieData<HoodieRecord>
readSecondaryKeysFromBaseFiles(HoodieEngi
} else {
readerSchema = tableSchema;
}
- return createSecondaryIndexGenerator(metaClient, engineType,
logFilePaths, readerSchema, partition, dataFilePath, indexDefinition,
+ return createSecondaryIndexGenerator(metaClient, engineType, new
ArrayList<>(logFilePaths), readerSchema, partition, dataFilePath,
indexDefinition,
Review Comment:
not sure if we need this fix if we not getting rid of entries to be deleted
from SI processed separately.
I am talking about
```
// Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
// the secondary index partition for each of these keys.
HoodieData<String> keysToRemove =
getRecordKeysDeletedOrUpdated(engineContext, commitMetadata,
dataWriteConfig.getMetadataConfig(), dataMetaClient, instantTime);
HoodieIndexDefinition indexDefinition =
getIndexDefinition(indexPartition);
// Fetch the secondary keys that each of the record keys
('keysToRemove') maps to
HoodiePairData<String, String> recordKeySecondaryKeyMap =
metadata.getSecondaryKeys(keysToRemove,
indexDefinition.getIndexName(),
dataWriteConfig.getMetadataConfig().getSecondaryIndexParallelism());
HoodieData<HoodieRecord> deleteRecords = recordKeySecondaryKeyMap.map(
(recordKeyAndSecondaryKey) ->
HoodieMetadataPayload.createSecondaryIndexRecord(recordKeyAndSecondaryKey.getKey(),
recordKeyAndSecondaryKey.getValue(), indexDefinition.getIndexName(), true));
```
In HoodieBackedTableMetadataWriter.getSecondaryIndexUpdates
lets sync up
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -822,14 +823,17 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
if
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
} else if (FSUtils.isLogFile(fullFilePath)) {
- // for logs, we only need to process log files containing deletes
- if (writeStat.getNumDeletes() > 0) {
- Set<String> deletedRecordKeys =
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
- finalWriterSchemaOpt, maxBufferSize, instantTime, false,
true);
- return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
- }
- // ignore log file data blocks.
- return new ArrayList<HoodieRecord>().iterator();
+ checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file
should be associated with a delta write stat");
+ List<String> logFilePaths = ((HoodieDeltaWriteStat)
writeStat).getLogFiles().stream()
+ .map(logFile -> new StoragePath(new
StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPartitionPath()),
logFile).toString())
+ .collect(toList());
Review Comment:
also, don't we need to group by fileSlice for all write stats?
for eg, if there are more than 1 log file added to the same file slice, does
the current record generation will be right?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -855,6 +859,131 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
}
}
+ /**
+ * Get the deleted keys from the merged log files. The logic is as below.
Suppose:
+ * <li>A = Set of keys that are valid (not deleted) in the previous log
files merged</li>
+ * <li>B = Set of keys that are valid in all log files including current log
file merged</li>
+ * <li>C = Set of keys that are deleted in the current log file</li>
+ * <li>Then, D = Set of deleted keys = C - (B - A)</li>
+ *
+ * @param dataTableMetaClient data table meta client
+ * @param instantTime timestamp of the commit
+ * @param engineType engine type (SPARK, FLINK, JAVA)
+ * @param logFilePaths list of log file paths for which records need
to be merged
+ * @param finalWriterSchemaOpt records schema
+ * @param fullFilePath full path of the current log file
+ * @return set of deleted keys
+ */
+ @VisibleForTesting
+ public static Set<String> getDeletedKeysFromMergedLogs(HoodieTableMetaClient
dataTableMetaClient,
+ String instantTime,
+ EngineType engineType,
+ List<String>
logFilePaths,
+ Option<Schema>
finalWriterSchemaOpt,
+ StoragePath
fullFilePath) {
+ // Separate out the current log file
+ List<String> logFilePathsWithoutCurrentLogFile = logFilePaths.stream()
+ .filter(logFilePath -> !logFilePath.equals(fullFilePath.toString()))
+ .collect(toList());
+ if (logFilePathsWithoutCurrentLogFile.isEmpty()) {
+ // Only current log file is present, so we can directly get the deleted
record keys from it and return the RLI records.
+ Map<String, HoodieRecord> currentLogRecords =
+ getLogRecords(Collections.singletonList(fullFilePath.toString()),
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+ return currentLogRecords.entrySet().stream()
+ .filter(entry -> isDeleteRecord(dataTableMetaClient,
finalWriterSchemaOpt, entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+ // Fetch log records for all log files
+ Map<String, HoodieRecord> allLogRecords =
+ getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt,
instantTime, engineType);
+
+ // Fetch log records for previous log files (excluding the current log
file)
+ Map<String, HoodieRecord> previousLogRecords =
+ getLogRecords(logFilePathsWithoutCurrentLogFile, dataTableMetaClient,
finalWriterSchemaOpt, instantTime, engineType);
+
+ // Fetch log records for the current log file
+ Map<String, HoodieRecord> currentLogRecords =
+ getLogRecords(Collections.singletonList(fullFilePath.toString()),
dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
+
+ // Calculate valid (non-deleted) keys
+ Set<String> validKeysForPreviousLogs =
previousLogRecords.entrySet().stream()
+ .filter(entry -> !isDeleteRecord(dataTableMetaClient,
finalWriterSchemaOpt, entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ Set<String> validKeysIncludingCurrentLogs =
allLogRecords.entrySet().stream()
+ .filter(entry -> !isDeleteRecord(dataTableMetaClient,
finalWriterSchemaOpt, entry.getValue()))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+
+ // Calculate deleted keys in the current log file
+ Set<String> deletedKeysInCurrentLog = currentLogRecords.entrySet().stream()
Review Comment:
I thought we could simplify this.
1. parse previous log files to generate {recordKey -> isDeleted}
2. parse entire file slice (including inflight) to generate {recordKey ->
isDeleted}
we are interested in deleted record keys w/ current log file. So, we could
do something like
```
for every valid entry in (1)
if deleted in (2) => add to deletedRecordKeyList.
```
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -801,28 +801,47 @@ public int
getNumFileGroupsForPartition(MetadataPartitionType partition) {
}
@Override
- protected Map<String, String> getSecondaryKeysForRecordKeys(List<String>
recordKeys, String partitionName) {
+ protected HoodiePairData<String, String>
getSecondaryKeysForRecordKeys(HoodieData<String> recordKeys, String
partitionName, int batchSize) {
if (recordKeys.isEmpty()) {
- return Collections.emptyMap();
+ return getEngineContext().emptyHoodiePairData();
}
// Load the file slices for the partition. Each file slice is a shard
which saves a portion of the keys.
List<FileSlice> partitionFileSlices =
partitionFileSliceMap.computeIfAbsent(partitionName, k ->
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
metadataFileSystemView, partitionName));
if (partitionFileSlices.isEmpty()) {
- return Collections.emptyMap();
+ return getEngineContext().emptyHoodiePairData();
}
- // Parallel lookup keys from each file slice
- Map<String, String> reverseSecondaryKeyMap = new
HashMap<>(recordKeys.size());
- getEngineContext().setJobStatus(this.getClass().getSimpleName(), "Lookup
secondary keys from metadata table partition " + partitionName);
- List<Pair<String, String>> secondaryToRecordKeyPairList =
getEngineContext().flatMap(partitionFileSlices,
- (SerializableFunction<FileSlice, Stream<Pair<String, String>>>) v1 ->
reverseLookupSecondaryKeys(partitionName, recordKeys, v1)
- .entrySet().stream()
- .map(entry -> Pair.of(entry.getKey(),
entry.getValue())).collect(Collectors.toList()).stream(),
partitionFileSlices.size());
+ // Step 1: Batch record keys
+ HoodieData<List<String>> batchedRecordKeys = recordKeys.mapPartitions(iter
-> {
+ List<List<String>> batches = new ArrayList<>();
+ List<String> currentBatch = new ArrayList<>();
+
+ while (iter.hasNext()) {
Review Comment:
why can't we do count and then do repartition(count/batchSize)
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -822,14 +823,17 @@ public static HoodieData<HoodieRecord>
convertMetadataToRecordIndexRecords(Hoodi
if
(writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
return
BaseFileRecordParsingUtils.generateRLIMetadataHoodieRecordsForBaseFile(basePath,
writeStat, writesFileIdEncoding, instantTime, storage);
} else if (FSUtils.isLogFile(fullFilePath)) {
- // for logs, we only need to process log files containing deletes
- if (writeStat.getNumDeletes() > 0) {
- Set<String> deletedRecordKeys =
getRecordKeys(fullFilePath.toString(), dataTableMetaClient,
- finalWriterSchemaOpt, maxBufferSize, instantTime, false,
true);
- return deletedRecordKeys.stream().map(recordKey ->
HoodieMetadataPayload.createRecordIndexDelete(recordKey)).collect(toList()).iterator();
- }
- // ignore log file data blocks.
- return new ArrayList<HoodieRecord>().iterator();
+ checkState(writeStat instanceof HoodieDeltaWriteStat, "Log file
should be associated with a delta write stat");
+ List<String> logFilePaths = ((HoodieDeltaWriteStat)
writeStat).getLogFiles().stream()
Review Comment:
we need to fix L 806 to 814.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]