yihua commented on code in PR #13300:
URL: https://github.com/apache/hudi/pull/13300#discussion_r2201410390
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -292,180 +325,58 @@ private static ArrayList<ArrayList<String>>
partitionKeysByFileSlices(List<Strin
return partitionedKeys;
}
- /**
- * Lookup list of keys from a single file slice.
- *
- * @param partitionName Name of the partition
- * @param keys The list of keys to lookup
- * @param fileSlice The file slice to read
- * @return A {@code Map} of key name to {@code HoodieRecord} for the keys
which were found in the file slice
- */
- private Map<String, HoodieRecord<HoodieMetadataPayload>>
lookupKeysFromFileSlice(String partitionName, List<String> keys, FileSlice
fileSlice) {
- Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
getOrCreateReaders(partitionName, fileSlice);
- try {
- HoodieSeekingFileReader<?> baseFileReader = readers.getKey();
- HoodieMetadataLogRecordReader logRecordScanner = readers.getRight();
- if (baseFileReader == null && logRecordScanner == null) {
- return Collections.emptyMap();
- }
-
- // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
- List<String> sortedKeys = new ArrayList<>(keys);
- Collections.sort(sortedKeys);
- boolean fullKeys = true;
- List<Long> timings = new ArrayList<>(1);
- Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords =
readLogRecords(logRecordScanner, sortedKeys, fullKeys, timings);
- return readFromBaseAndMergeWithLogRecords(baseFileReader, sortedKeys,
fullKeys, logRecords, timings, partitionName);
- } catch (IOException ioe) {
- throw new HoodieIOException("Error merging records from metadata table
for " + keys.size() + " key : ", ioe);
- } finally {
- if (!reuse) {
- closeReader(readers);
+ private Map<String, HoodieRecord<HoodieMetadataPayload>> lookupKeys(String
partitionName,
+
List<String> keys,
+
FileSlice fileSlice) {
+ // Sort it here once so that we don't need to sort individually for base
file and for each individual log files.
+ List<String> sortedKeys = new ArrayList<>(keys);
+ // So we use the natural order to sort.
+ Collections.sort(sortedKeys);
+ Option<HoodieInstant> latestMetadataInstant =
+
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+ String latestMetadataInstantTime =
+
latestMetadataInstant.map(HoodieInstant::requestedTime).orElse(SOLO_COMMIT_TIMESTAMP);
+ Schema schema =
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+ // Only those log files which have a corresponding completed instant on
the dataset should be read
+ // This is because the metadata table is updated before the dataset
instants are committed.
+ Set<String> validInstantTimestamps = getValidInstantTimestamps();
+ InstantRange instantRange = InstantRange.builder()
+ .rangeType(InstantRange.RangeType.EXACT_MATCH)
+ .explicitInstants(validInstantTimestamps).build();
+ HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(
+ storageConf,
+ metadataMetaClient.getTableConfig(),
+ Option.of(instantRange),
+ Option.of(transformKeysToPredicate(sortedKeys)));
+ try (HoodieFileGroupReader<IndexedRecord> fileGroupReader =
HoodieFileGroupReader.<IndexedRecord>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(metadataMetaClient)
+ .withLatestCommitTime(latestMetadataInstantTime)
+ .withFileSlice(fileSlice)
+ .withDataSchema(schema)
+ .withRequestedSchema(schema)
+ .withProps(buildFileGroupReaderProperties(metadataConfig))
+ .withStart(0)
+ .withLength(Long.MAX_VALUE)
+ .withShouldUseRecordPosition(false)
+ .build();
+ ClosableIterator<IndexedRecord> it =
fileGroupReader.getClosableIterator()) {
+ Map<String, HoodieRecord<HoodieMetadataPayload>> records = new
HashMap<>();
+ while (it.hasNext()) {
+ GenericRecord metadataRecord = (GenericRecord) it.next();
+ HoodieMetadataPayload payload = new
HoodieMetadataPayload(Option.of(metadataRecord));
+ String rowKey = payload.key != null
+ ? payload.key : ((GenericRecord)
metadataRecord).get(KEY_FIELD_NAME).toString();
+ HoodieKey hoodieKey = new HoodieKey(rowKey, partitionName);
+ records.put(rowKey, new HoodieAvroRecord<>(hoodieKey, payload));
}
- }
- }
-
- private Map<String, HoodieRecord<HoodieMetadataPayload>>
readLogRecords(HoodieMetadataLogRecordReader logRecordReader,
-
List<String> sortedKeys,
-
boolean fullKey,
-
List<Long> timings) {
- HoodieTimer timer = HoodieTimer.start();
-
- if (logRecordReader == null) {
- timings.add(timer.endTimer());
- return Collections.emptyMap();
- }
-
- try {
- return fullKey ? logRecordReader.getRecordsByKeys(sortedKeys) :
logRecordReader.getRecordsByKeyPrefixes(sortedKeys);
- } finally {
- timings.add(timer.endTimer());
- }
- }
-
- private Map<String, HoodieRecord<HoodieMetadataPayload>>
readFromBaseAndMergeWithLogRecords(HoodieSeekingFileReader<?> reader,
-
List<String> sortedKeys,
-
boolean fullKeys,
-
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecords,
-
List<Long> timings,
-
String partitionName) throws IOException {
- HoodieTimer timer = HoodieTimer.start();
-
- if (reader == null) {
- // No base file at all
- timings.add(timer.endTimer());
- return logRecords;
- }
-
- HoodieTimer readTimer = HoodieTimer.start();
-
- Map<String, HoodieRecord<HoodieMetadataPayload>> records =
- fetchBaseFileRecordsByKeys(reader, sortedKeys, fullKeys,
partitionName);
-
- metrics.ifPresent(m ->
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
-
- // Iterate over all provided log-records, merging them into existing
records
- logRecords.values().forEach(logRecord ->
- records.merge(
- logRecord.getRecordKey(),
- logRecord,
- (oldRecord, newRecord) -> {
- HoodieMetadataPayload mergedPayload =
newRecord.getData().preCombine(oldRecord.getData());
- return mergedPayload.isDeleted() ? null : new
HoodieAvroRecord<>(oldRecord.getKey(), mergedPayload);
- }
- ));
-
- timings.add(timer.endTimer());
- return records;
- }
-
- @SuppressWarnings("unchecked")
- private Map<String, HoodieRecord<HoodieMetadataPayload>>
fetchBaseFileRecordsByKeys(HoodieSeekingFileReader reader,
-
List<String> sortedKeys,
-
boolean fullKeys,
-
String partitionName) throws IOException {
- Map<String, HoodieRecord<HoodieMetadataPayload>> result;
- try (ClosableIterator<HoodieRecord<?>> records = fullKeys
- ? reader.getRecordsByKeysIterator(sortedKeys)
- : reader.getRecordsByKeyPrefixIterator(sortedKeys)) {
- result = toStream(records)
- .map(record -> {
- GenericRecord data = (GenericRecord) record.getData();
- // populateMetaFields is hardcoded to false for the metadata table
so key must be extracted from the `key` field
- String recordKey = (String)
data.get(HoodieMetadataPayload.KEY_FIELD_NAME);
- return Pair.of(recordKey, composeRecord(data, recordKey,
partitionName));
- })
- .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
- }
- return result;
- }
-
- private HoodieRecord<HoodieMetadataPayload> composeRecord(GenericRecord
avroRecord, String recordKey, String partitionName) {
- return new HoodieAvroRecord<>(new HoodieKey(recordKey, partitionName),
- new HoodieMetadataPayload(avroRecord, 0L), null);
- }
-
- /**
- * Create a file reader and the record scanner for a given partition and
file slice
- * if readers are not already available.
- *
- * @param partitionName - Partition name
- * @param slice - The file slice to open readers for
- * @return File reader and the record scanner pair for the requested file
slice
- */
- private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>
getOrCreateReaders(String partitionName, FileSlice slice) {
- if (reuse) {
- Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
- return partitionReaders.get().computeIfAbsent(key, ignored ->
openReaders(partitionName, slice));
- } else {
- return openReaders(partitionName, slice);
- }
Review Comment:
Is the `reuse` behavior handled in a different way though the FGReader-based
MDT reading?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]