bhat-vinay commented on code in PR #10625:
URL: https://github.com/apache/hudi/pull/10625#discussion_r1499861590
##########
hudi-common/src/main/java/org/apache/hudi/keygen/constant/KeyGeneratorOptions.java:
##########
@@ -54,6 +54,13 @@ public class KeyGeneratorOptions extends HoodieConfig {
+ "Actual value will be obtained by invoking .toString() on the
field value. Nested fields can be specified using\n"
+ "the dot notation eg: `a.b.c`");
+ public static final ConfigProperty<String> SECONDARYKEY_FIELD_NAME =
ConfigProperty
Review Comment:
One of the current limitations - only supports one secondary index (per
table). Will remove the limitation once the functionality is working end-end.
Thinking of using the same approach as functional index (different partition
for different secondary index based on a config file/json)
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -360,6 +415,112 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>>
lookupKeysFromFileSlice
}
}
+ /**
+ * Lookup list of keys from a single file slice.
+ *
+ * @param partitionName Name of the partition
+ * @param secondaryKeys The list of secondary keys to lookup
+ * @param fileSlice The file slice to read
+ * @return A {@code Map} of secondary-key to list of {@code HoodieRecord}
for the secondary-keys which were found in the file slice
+ */
+ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
lookupSecondaryKeysFromFileSlice(String partitionName, List<String>
secondaryKeys, FileSlice fileSlice) {
+ Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice, Option.of(true));
+ try {
+ List<Long> timings = new ArrayList<>(1);
+ HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+ HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+ if (baseFileReader == null && logRecordScanner == null) {
+ return Collections.emptyMap();
+ }
+
+ // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
+ Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+ List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+ Collections.sort(sortedSecondaryKeys);
+ secondaryKeySet.addAll(sortedSecondaryKeys);
+
+ // TODO: Look at using scanByFullKeys() which pushes down the
'filtering' keys and
+ // only buffers matching records
+ logRecordScanner.scan();
+
+ Map<String, HashMap<String, HoodieRecord>> nonUniqueRecordsMap =
logRecordScanner.getNonUniqueRecordsMap();
+ nonUniqueRecordsMap.forEach((secondaryKey, recordsMap) -> {
+ if (secondaryKeySet.contains(secondaryKey) &&
!logRecordsMap.containsKey(secondaryKey)) {
+ logRecordsMap.put(secondaryKey, recordsMap);
+ }
+ });
+
+ return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader,
sortedSecondaryKeys, logRecordsMap, timings, partitionName);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Error merging records from metadata table
for " + secondaryKeys.size() + " key : ", ioe);
+ } finally {
+ if (!reuse) {
+ closeReader(readers);
+ }
+ }
+ }
+
+ private void reverseLookupSecondaryKeys(String partitionName, List<String>
recordKeys, FileSlice fileSlice, Map<String, String> recordKeyMap) {
Review Comment:
Please ignore for now. This may not be needed if `WriteStatus` is changed to
carry `(record-key, Optional<new-secondary-key>,Optional<old-secondary-key>)`
as discussed earlier
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -548,6 +787,31 @@ private Map<String,
List<HoodieRecord<HoodieMetadataPayload>>> fetchBaseFileAllR
.collect(Collectors.groupingBy(Pair::getKey,
Collectors.mapping(Pair::getValue, Collectors.toList())));
}
+ private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileAllRecordsByPayload(HoodieSeekingFileReader reader,
Review Comment:
Ignore for now. This is again related to the reverse-scanning to aid the
writer (to emit tombstone records) - will be change based on the design choice.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -33,6 +33,7 @@
import org.apache.hadoop.fs.FileSystem;
Review Comment:
Please ignore the changes in this file. Will revert in the next upload.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -240,6 +255,11 @@ public static HoodieMergedLogRecordScanner.Builder
newBuilder() {
@Override
public <T> void processNextRecord(HoodieRecord<T> newRecord) throws
IOException {
+ if (logContainsNonUniqueKeys) {
Review Comment:
If the log files are for partitions that can have non-unique keys, then this
logic makes use of the new map to buffer the scanned records.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -305,6 +305,52 @@ public Map<String, List<HoodieRecordGlobalLocation>>
readRecordIndex(List<String
return recordKeyToLocation;
}
+ /**
+ * Get record-location using secondary-index and record-index
+ * <p>
+ * If the Metadata Table is not enabled, an exception is thrown to
distinguish this from the absence of the key.
+ *
+ * @param secondaryKeys The list of secondary keys to read
+ */
+ @Override
+ public Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys) {
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX),
+ "Record index is not initialized in MDT");
+
ValidationUtils.checkState(dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.SECONDARY_INDEX),
+ "Secondary index is not initialized in MDT");
+
+ HoodieTimer timer = HoodieTimer.start();
+
+ // Fetch secondary-index records
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>> secondaryKeyRecords
= getSecondaryIndexRecords(secondaryKeys,
MetadataPartitionType.SECONDARY_INDEX.getPartitionPath());
+
+ // Now collect the record-keys and fetch the RLI records
+ List<String> recordKeys = new ArrayList<>();
+ secondaryKeyRecords.forEach((key, records) -> {
+ records.forEach(record -> {
+ if (!record.getData().isDeleted()) {
+ recordKeys.add(record.getData().getRecordKeyFromSecondaryIndex());
+ }
+ });
+ });
+
+ return readRecordIndex(recordKeys);
+ }
+
+ // Returns a map of (record-key -> secondary-key) for the provided
record-keys
Review Comment:
This is the reverse scan entry point --- used by the writer to extract
(record-key, old-secondary-key) mapping for updated/deleted record keys. This
is subsequently used by the writer to emit tombstone records into the
respective file groups. Please ignore this for now - based on what is decided,
we might choose to enhance `WriteStatus` to carry this information and aid the
writer to emit tombstone markers.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOException
}
}
+ private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord)
throws IOException {
+ String key = newRecord.getRecordKey();
+ HoodieMetadataPayload newPayload = (HoodieMetadataPayload)
newRecord.getData();
+
+ // The rules for merging the prevRecord and the latestRecord is noted
below. Note that this only applies for SecondaryIndex
+ // records in the metadata table (which is the only user of this API as of
this implementation)
+ // 1. Iff latestRecord is deleted (i.e it is a tombstone) AND prevRecord
is null (i.e not buffered), then retain the latestRecord
+ // The rationale here is that there could be a 'prev record' in the
base-file that needs to be merged at a later stage
+ // 2. Iff latestRecord is deleted AND prevRecord is non-null, then remove
prevRecord from the buffer AND discard the latestRecord
+ // 3. Iff latestRecord is not deleted AND prevRecord is non-null, then
remove the prevRecord from the buffer AND retain the latestRecord
+ // The rationale is that the most recent record is always retained
(based on arrival time). TODO: verify this logic
+ // 4. Iff latestRecord is not deleted AND prevRecord is null, then retain
the latestRecord (same rationale as #1)
+
+ HashMap<String, HoodieRecord> prevRecords = nonUniqueKeyRecords.get(key);
+ if (prevRecords == null) {
+ // Case #1 and #4
+ HashMap<String, HoodieRecord> recordsMap = new HashMap<>();
+ recordsMap.put(newPayload.getRecordKeyFromSecondaryIndex(),
newRecord.copy());
+ nonUniqueKeyRecords.put(key, recordsMap);
+ return;
+ }
+
+ String newRecordKey = newPayload.getRecordKeyFromSecondaryIndex();
+ HoodieRecord prevRecord = prevRecords.get(newRecordKey);
+ if (prevRecord == null) {
+ // Case #1 and #4
+ prevRecords.put(newRecordKey, newRecord.copy());
+ nonUniqueKeyRecords.put(key, prevRecords);
+ return;
+ }
+
+ HoodieMetadataPayload prevPayload = (HoodieMetadataPayload)
prevRecord.getData();
+ assert
prevPayload.getRecordKeyFromSecondaryIndex().equals(newPayload.getRecordKeyFromSecondaryIndex());
+
+ // TODO: Merger need not be called here as the merging logic is handled
explicitly in this function.
+ // Retain until Secondary Index feature is tested and stabilized
+ HoodieRecord<T> combinedRecord = (HoodieRecord<T>)
recordMerger.merge(prevRecord, readerSchema,
Review Comment:
Will remove this later. The merging logic does not really rely on the merger
(because of the comment made earlier).
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -466,12 +627,90 @@ private Map<String,
List<HoodieRecord<HoodieMetadataPayload>>> readAllLogRecords
}
try {
- return logRecordReader.getAllRecordsByKeys(sortedKeys);
+ Map<String, List<HoodieRecord<HoodieMetadataPayload>>> records =
logRecordReader.getAllRecordsByKeys(sortedKeys);
+ return records;
} finally {
timings.add(timer.endTimer());
}
}
+ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
readNonUniqueRecordsAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
Review Comment:
This method is to merge the buffer obtained by reading all log-records with
the buffer obtained by reading base-file.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java:
##########
@@ -269,6 +289,68 @@ public <T> void processNextRecord(HoodieRecord<T>
newRecord) throws IOException
}
}
+ private <T> void processNextNonUniqueKeyRecord(HoodieRecord<T> newRecord)
throws IOException {
+ String key = newRecord.getRecordKey();
+ HoodieMetadataPayload newPayload = (HoodieMetadataPayload)
newRecord.getData();
+
+ // The rules for merging the prevRecord and the latestRecord is noted
below. Note that this only applies for SecondaryIndex
Review Comment:
The crux of the merging logic is here. The main issue with using the
existing `preCombine(...)` method is that it returns 'either-or' i.e chooses
only one record. Changing the API was a little tedious- - hence this approach
of moving the merge logic directly in the scanner. @vinothchandar @codope.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -305,6 +305,52 @@ public Map<String, List<HoodieRecordGlobalLocation>>
readRecordIndex(List<String
return recordKeyToLocation;
}
+ /**
+ * Get record-location using secondary-index and record-index
+ * <p>
+ * If the Metadata Table is not enabled, an exception is thrown to
distinguish this from the absence of the key.
+ *
+ * @param secondaryKeys The list of secondary keys to read
+ */
+ @Override
+ public Map<String, List<HoodieRecordGlobalLocation>>
readSecondaryIndex(List<String> secondaryKeys) {
Review Comment:
This is the main entry point for the query planner (HoodieFileIndex) to
choose the secondary-index and prune candidate files
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1902,6 +1910,141 @@ public HoodieRecord next() {
});
}
+ public static HoodieData<HoodieRecord>
readSecondaryKeysFromFiles(HoodieEngineContext engineContext,
+
List<Pair<String, Pair<String, List<String>>>> partitionFiles,
+ int
recordIndexMaxParallelism,
+ String
activeModule, HoodieTableMetaClient metaClient, EngineType engineType) {
+ if (partitionFiles.isEmpty()) {
+ return engineContext.emptyHoodieData();
+ }
+
+ engineContext.setJobStatus(activeModule, "Secondary Index: reading
secondary keys from " + partitionFiles.size() + " partitions");
+ final int parallelism = Math.min(partitionFiles.size(),
recordIndexMaxParallelism);
+ final String basePath = metaClient.getBasePathV2().toString();
+ final SerializableConfiguration configuration = new
SerializableConfiguration(metaClient.getHadoopConf());
+
+ return engineContext.parallelize(partitionFiles,
parallelism).flatMap(partitionAndBaseFile -> {
+ final String partition = partitionAndBaseFile.getKey();
+ final Pair<String, List<String>> baseAndLogFiles =
partitionAndBaseFile.getValue();
+ List<String> logFilePaths = new ArrayList<>();
+ baseAndLogFiles.getValue().forEach(logFile -> {
+ logFilePaths.add(basePath + StoragePath.SEPARATOR + partition +
StoragePath.SEPARATOR + logFile);
+ });
+ String filePath = baseAndLogFiles.getKey();
+
+ Option<Path> dataFilePath = Option.empty();
+ Schema tableSchema;
+
+ if (!filePath.isEmpty()) {
+ dataFilePath = Option.of(filePath(basePath, "", filePath));
+ tableSchema =
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroSchema(metaClient.getHadoopConf(),
dataFilePath.get());
+ } else {
+ TableSchemaResolver schemaResolver = new
TableSchemaResolver(metaClient);
+ tableSchema = schemaResolver.getTableAvroSchema();
+ }
+
+ return createSecondaryIndexGenerator(metaClient, engineType,
logFilePaths, tableSchema, partition, dataFilePath);
+ });
+ }
+
+ private static ClosableIterator<HoodieRecord>
createSecondaryIndexGenerator(HoodieTableMetaClient metaClient,
+
EngineType engineType, List<String> logFilePaths,
+
Schema tableSchema, String partition,
+
Option<Path> dataFilePath) throws Exception {
+ final String basePath = metaClient.getBasePathV2().toString();
+ final SerializableConfiguration configuration = new
SerializableConfiguration(metaClient.getHadoopConf());
+
+ HoodieRecordMerger recordMerger = HoodieRecordUtils.createRecordMerger(
+ basePath,
+ engineType,
+ Collections.emptyList(),
+ metaClient.getTableConfig().getRecordMergerStrategy());
+
+ HoodieMergedLogRecordScanner mergedLogRecordScanner =
HoodieMergedLogRecordScanner.newBuilder()
+ .withFileSystem(metaClient.getFs())
+ .withBasePath(basePath)
+ .withLogFilePaths(logFilePaths)
+ .withReaderSchema(tableSchema)
Review Comment:
Will change to filter only the required columns. Could not find a easy
way/API to get that yet.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -402,6 +422,31 @@ public HoodieMetadataPayload
preCombine(HoodieMetadataPayload previousRecord) {
// 2. A key moved to a different file due to clustering
// No need to merge with previous record index, always pick the latest
payload.
+ return this;
+ case METADATA_TYPE_SECONDARY_INDEX:
+ // TODO: This block and checks are just for validation and to detecte
all callers.
Review Comment:
This is here mainly for asserting and will be removed later. Ideally all
merging logic of secondary (or non-unique-key) index records need to be handled
at upper layers directly (when the scanner is running the scan)
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -360,6 +415,112 @@ private Map<String, HoodieRecord<HoodieMetadataPayload>>
lookupKeysFromFileSlice
}
}
+ /**
+ * Lookup list of keys from a single file slice.
+ *
+ * @param partitionName Name of the partition
+ * @param secondaryKeys The list of secondary keys to lookup
+ * @param fileSlice The file slice to read
+ * @return A {@code Map} of secondary-key to list of {@code HoodieRecord}
for the secondary-keys which were found in the file slice
+ */
+ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>>
lookupSecondaryKeysFromFileSlice(String partitionName, List<String>
secondaryKeys, FileSlice fileSlice) {
+ Map<String, HashMap<String, HoodieRecord>> logRecordsMap = new HashMap<>();
+
+ Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice, Option.of(true));
+ try {
+ List<Long> timings = new ArrayList<>(1);
+ HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
+ HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
+ if (baseFileReader == null && logRecordScanner == null) {
+ return Collections.emptyMap();
+ }
+
+ // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
+ Set<String> secondaryKeySet = new HashSet<>(secondaryKeys.size());
+ List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
+ Collections.sort(sortedSecondaryKeys);
+ secondaryKeySet.addAll(sortedSecondaryKeys);
+
+ // TODO: Look at using scanByFullKeys() which pushes down the
'filtering' keys and
+ // only buffers matching records
+ logRecordScanner.scan();
Review Comment:
Tests failed a couple of times (missed records) when using the API that
supports filter push-down. Will debug further and update later.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndex.scala:
##########
@@ -0,0 +1,222 @@
+/*
Review Comment:
Please ignore the repetive/duplicated code blocks. Will change once the
functionality is ready and subsequently add more tests.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, hadoopConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus) throws Exception {
+ // Build a list of basefiles+delta-log-files for every partition that this
commit touches
+ // {
+ // {
+ // "partition1", { {"baseFile11", {"logFile11", "logFile12"}},
{"baseFile12", {"logFile11"} } },
+ // },
+ // {
+ // "partition2", { {"baseFile21", {"logFile21", "logFile22"}},
{"baseFile22", {"logFile21"} } }
+ // }
+ // }
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new
ArrayList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> {
+ writeStats.forEach(writeStat -> {
+ if (writeStat instanceof HoodieDeltaWriteStat) {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(),
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+ } else {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(writeStat.getPath(), new ArrayList<>())));
+ }
+ });
+ });
+
+ // Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
+ // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
+ // operation is going to be expensive (in CPU, memory and IO)
+ List<String> keysToRemove = new ArrayList<>();
+ writeStatus.collectAsList().forEach(status -> {
Review Comment:
This needs to be fixed. If the design is changed so that `WriteStatus`
includes the (record-key, old-secondary-key, new-secondary-key) then this needs
to change anyways.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1005,6 +1056,73 @@ private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata
return getFunctionalIndexRecords(partitionFileSlicePairs, indexDefinition,
dataMetaClient, parallelism, readerSchema, hadoopConf);
}
+ private void updateSecondaryIndexIfPresent(HoodieCommitMetadata
commitMetadata, Map<MetadataPartitionType, HoodieData<HoodieRecord>>
partitionToRecordMap, HoodieData<WriteStatus> writeStatus) {
+ dataMetaClient.getTableConfig().getMetadataPartitions()
+ .stream()
+ .filter(partition ->
partition.startsWith(HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX))
+ .forEach(partition -> {
+ HoodieData<HoodieRecord> secondaryIndexRecords;
+ try {
+ secondaryIndexRecords = getSecondaryIndexUpdates(commitMetadata,
writeStatus);
+ } catch (Exception e) {
+ throw new HoodieMetadataException("Failed to get secondary index
updates for partition " + partition, e);
+ }
+ partitionToRecordMap.put(SECONDARY_INDEX, secondaryIndexRecords);
+ });
+ }
+
+ private HoodieData<HoodieRecord>
getSecondaryIndexUpdates(HoodieCommitMetadata commitMetadata,
HoodieData<WriteStatus> writeStatus) throws Exception {
+ // Build a list of basefiles+delta-log-files for every partition that this
commit touches
+ // {
+ // {
+ // "partition1", { {"baseFile11", {"logFile11", "logFile12"}},
{"baseFile12", {"logFile11"} } },
+ // },
+ // {
+ // "partition2", { {"baseFile21", {"logFile21", "logFile22"}},
{"baseFile22", {"logFile21"} } }
+ // }
+ // }
+ List<Pair<String, Pair<String, List<String>>>> partitionFilePairs = new
ArrayList<>();
+ commitMetadata.getPartitionToWriteStats().forEach((dataPartition,
writeStats) -> {
+ writeStats.forEach(writeStat -> {
+ if (writeStat instanceof HoodieDeltaWriteStat) {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(((HoodieDeltaWriteStat) writeStat).getBaseFile(),
((HoodieDeltaWriteStat) writeStat).getLogFiles())));
+ } else {
+ partitionFilePairs.add(Pair.of(dataPartition,
Pair.of(writeStat.getPath(), new ArrayList<>())));
+ }
+ });
+ });
+
+ // Build a list of keys that need to be removed. A 'delete' record will be
emitted into the respective FileGroup of
+ // the secondary index partition for each of these keys. For a commit
which is deleting/updating a lot of records, this
+ // operation is going to be expensive (in CPU, memory and IO)
+ List<String> keysToRemove = new ArrayList<>();
+ writeStatus.collectAsList().forEach(status -> {
+ status.getWrittenRecordDelegates().forEach(recordDelegate -> {
+ // Consider those keys which were either updated or deleted in this
commit
+ if (!recordDelegate.getNewLocation().isPresent() ||
(recordDelegate.getCurrentLocation().isPresent() &&
recordDelegate.getNewLocation().isPresent())) {
+ keysToRemove.add(recordDelegate.getRecordKey());
+ }
+ });
+ });
+
+ // Fetch the secondary keys that each of the record keys ('keysToRemove')
maps to
+ // This is obtained by scanning the entire secondary index partition in
the metadata table
+ // This could be an expensive operation for a large commit
(updating/deleting millions of rows)
+ Map<String, String> recordKeySecondaryKeyMap =
metadata.getSecondaryKeys(keysToRemove);
+ HoodieData<HoodieRecord> deletedRecords =
getDeletedSecondaryRecordMapping(engineContext, recordKeySecondaryKeyMap);
+
+ // Reuse record index parallelism config to build secondary index
+ int parallelism = Math.min(partitionFilePairs.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+
+ return deletedRecords.union(readSecondaryKeysFromFiles(
Review Comment:
This is convoluted. `deletedRecords` are the tombstone records. For
correctness, this tombstone records should be emitted before the regular
records. Could not find anywhere in the doc if this ordering is preserved? But
noticed in the tests that it is.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]