bhat-vinay commented on code in PR #10625:
URL: https://github.com/apache/hudi/pull/10625#discussion_r1505530807
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -82,6 +88,9 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
public final HoodieTimer timer = HoodieTimer.create();
// Map of compacted/merged records
private final ExternalSpillableMap<String, HoodieRecord> records;
+
+ private final ExternalSpillableMap<String, HashMap<String, HoodieRecord>>
nonUniqueKeyRecords;
Review Comment:
`ExternalSpillableMap` needs to support a disk based map (or need a disk
backed multi-map that you pointed out). have not really looked into
`ExternalSpillableMap` implementation yet.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieFileSliceReader.java:
##########
@@ -63,15 +61,21 @@ public HoodieFileSliceReader(Option<HoodieFileReader>
baseFileReader,
}
this.props = props;
this.simpleKeyGenFieldsOpt = simpleKeyGenFieldsOpt;
- this.records = scanner.getRecords();
}
private boolean hasNextInternal() {
while (baseFileIterator.isPresent() && baseFileIterator.get().hasNext()) {
try {
HoodieRecord currentRecord =
baseFileIterator.get().next().wrapIntoHoodieRecordPayloadWithParams(schema,
props,
simpleKeyGenFieldsOpt, scanner.isWithOperationField(),
scanner.getPartitionNameOverride(), false, Option.empty());
- Option<HoodieRecord> logRecord =
removeLogRecord(currentRecord.getRecordKey());
+
+ if (!scanner.hasKey(currentRecord.getRecordKey())) {
Review Comment:
Will remove. Leftover code from a previous iteration (which is subsumed by
the changes below)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, hadoopConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus) throws Exception {
+ // Build a list of basefiles+delta-log-files for every partition that this
commit touches
+ // {
+ // {
+ // "partition1", { {"baseFile11", {"logFile11", "logFile12"}},
{"baseFile12", {"logFile11"} } },
+ // },
+ // {
+ // "partition2", { {"baseFile21", {"logFile21", "logFile22"}},
{"baseFile22", {"logFile21"} } }
+ // }
+ // }
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new
ArrayList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> {
+ writeStats.forEach(writeStat -> {
+ if (writeStat instanceof HoodieDeltaWriteStat) {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(),
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+ } else {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(writeStat.getPath(), new ArrayList<>())));
+ }
+ });
+ });
+
+ // Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
+ // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
+ // operation is going to be expensive (in CPU, memory and IO)
+ List<String> keysToRemove = new ArrayList<>();
+ writeStatus.collectAsList().forEach(status -> {
Review Comment:
Yes, the logic here is probably going to change if one uses the
`WriteStatus` to hold the (old-secondary-key, new-secondary-key) pair. Hence
did not think of optimising here yet.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -111,6 +123,16 @@ private HoodieMergedLogRecordScanner(FileSystem fs, String
basePath, List<String
// Store merged records for all versions for this log file, set the
in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes,
spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType,
isBitCaskDiskMapCompressionEnabled);
+ this.nonUniqueKeyRecords = new
ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new
DefaultSizeEstimator(),
+ new HoodieRecordSizeEstimator(readerSchema), diskMapType,
isBitCaskDiskMapCompressionEnabled);
+
+ if (logFilePaths.size() > 0 &&
HoodieTableMetadata.isMetadataTableSecondaryIndexPartition(basePath,
partitionName)) {
Review Comment:
Can this all be hidden inside a method (still in this layer) - there needs
to be some way to determine if the logs can have non-unique-keys. Initial
implementation was had it one layer above (i.e the callers instantiating
`HoodieMergedLogRecordScanner` passing in the flag, but having it here looked
cleaner.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOException
}
}
+ private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord)
throws IOException {
+ String key = newRecord.getRecordKey();
+ HoodieMetadataPayload newPayload = (HoodieMetadataPayload)
newRecord.getData();
+
+ // The rules for merging the prevRecord and the latestRecord is noted
below. Note that this only applies for SecondaryIndex
Review Comment:
combineAndGet() (which internally calls preCombine()), is an either-or
operation i.e the caller should ensure that the `prevRecord` and `newRecord`
are similar. For secondary-index, this similarity depends on the payload of the
record (HoodieMetadataPayload) - maybe I can add a new API in
`HoodieRecordPayload` which is implemented by `HoodieMetadataPayload` and avoid
exposing `HoodieMetadataPayload` to this layer?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]