This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fc83da464184d1cafd9c8f0755a60d246b777ccc Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed Oct 15 14:07:43 2025 -0700 fix: Fixing point lookup in MDT partitions (#14085) --------- Co-authored-by: Lin Liu <[email protected]> --- .../HoodieLogCompactionPlanGenerator.java | 1 - .../table/log/BaseHoodieLogRecordReader.java | 18 ++- .../table/log/HoodieLogBlockMetadataScanner.java | 5 +- .../table/log/HoodieMergedLogRecordReader.java | 18 +-- .../common/table/log/block/HoodieDataBlock.java | 22 ++-- .../table/log/block/HoodieHFileDataBlock.java | 27 ++++- .../table/read/buffer/FileGroupRecordBuffer.java | 20 ++-- .../io/storage/HoodieNativeAvroHFileReader.java | 20 +++- .../hudi/io/storage/HoodieSeekingFileReader.java | 8 ++ .../io/hadoop/TestHoodieHFileReaderWriter.java | 133 +++++++++++++++++++++ .../TestHoodieClientOnMergeOnReadStorage.java | 2 - 11 files changed, 226 insertions(+), 48 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java index 4739e9cfff51..3d704d85c7b5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java @@ -102,7 +102,6 @@ public class HoodieLogCompactionPlanGenerator<T extends HoodieRecordPayload, I, } HoodieLogBlockMetadataScanner scanner = new HoodieLogBlockMetadataScanner(metaClient, fileSlice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) - .map(file -> file.getPath().toString()) .collect(Collectors.toList()), writeConfig.getMaxDFSStreamBufferSize(), maxInstantTime, diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java index 4bb3454fbda9..b471f5089369 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java @@ -41,7 +41,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.storage.HoodieStorage; -import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.slf4j.Logger; @@ -98,7 +97,7 @@ public abstract class BaseHoodieLogRecordReader<T> { protected final String orderingFields; private final TypedProperties payloadProps; // Log File Paths - protected final List<String> logFilePaths; + protected final List<HoodieLogFile> logFiles; // Reverse reader - Not implemented yet (NA -> Why do we need ?) // but present here for plumbing for future implementation private final boolean reverseReader; @@ -139,7 +138,8 @@ public abstract class BaseHoodieLogRecordReader<T> { // table version for compatibility private final HoodieTableVersion tableVersion; - protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext, HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage, List<String> logFilePaths, + protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext, HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage, + List<HoodieLogFile> logFiles, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange, boolean withOperationField, boolean forceFullScan, Option<String> partitionNameOverride, Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer, @@ -158,8 +158,8 @@ public abstract class BaseHoodieLogRecordReader<T> { props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY, this.orderingFields); } this.payloadProps = props; - this.totalLogFiles.addAndGet(logFilePaths.size()); - this.logFilePaths = logFilePaths; + this.totalLogFiles.addAndGet(logFiles.size()); + this.logFiles = logFiles; this.reverseReader = reverseReader; this.storage = storage; this.bufferSize = bufferSize; @@ -219,8 +219,7 @@ public abstract class BaseHoodieLogRecordReader<T> { HoodieLogFormatReader logFormatReaderWrapper = null; try { // Iterate over the paths - logFormatReaderWrapper = new HoodieLogFormatReader(storage, - logFilePaths.stream().map(logFile -> new HoodieLogFile(new StoragePath(logFile))).collect(Collectors.toList()), + logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles, readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); Set<HoodieLogFile> scannedLogFiles = new HashSet<>(); @@ -511,8 +510,7 @@ public abstract class BaseHoodieLogRecordReader<T> { HoodieLogFormatReader logFormatReaderWrapper = null; try { // Iterate over the paths - logFormatReaderWrapper = new HoodieLogFormatReader(storage, - logFilePaths.stream().map(logFile -> new HoodieLogFile(new StoragePath(logFile))).collect(Collectors.toList()), + logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles, readerSchema, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); /** @@ -745,7 +743,7 @@ public abstract class BaseHoodieLogRecordReader<T> { } } // At this step the lastBlocks are consumed. We track approximate progress by number of log-files seen - progress = (float) (numLogFilesSeen - 1) / logFilePaths.size(); + progress = (float) (numLogFilesSeen - 1) / logFiles.size(); } private boolean shouldLookupRecords() { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java index 14d65d1aa6a3..23d64eed88cb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.avro.HoodieAvroReaderContext; import org.apache.hudi.common.engine.HoodieReaderContext; +import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; @@ -32,8 +33,8 @@ import java.util.List; */ public class HoodieLogBlockMetadataScanner extends BaseHoodieLogRecordReader<IndexedRecord> { - public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient, List<String> logFilePaths, int bufferSize, String maxInstantTime, Option<InstantRange> instantRange) { - super(getReaderContext(metaClient, maxInstantTime), metaClient, metaClient.getStorage(), logFilePaths, false, bufferSize, instantRange, false, false, Option.empty(), Option.empty(), true, + public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient, List<HoodieLogFile> logFiles, int bufferSize, String maxInstantTime, Option<InstantRange> instantRange) { + super(getReaderContext(metaClient, maxInstantTime), metaClient, metaClient.getStorage(), logFiles, false, bufferSize, instantRange, false, false, Option.empty(), Option.empty(), true, null, false); scanInternal(Option.empty(), true); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java index c861f6fc2463..4ca30827fb8b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java @@ -63,11 +63,12 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> private long totalTimeTakenToReadAndMergeBlocks; @SuppressWarnings("unchecked") - private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, HoodieTableMetaClient metaClient, HoodieStorage storage, List<String> logFilePaths, boolean reverseReader, + private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, HoodieTableMetaClient metaClient, HoodieStorage storage, + List<HoodieLogFile> logFiles, boolean reverseReader, int bufferSize, Option<InstantRange> instantRange, boolean withOperationField, boolean forceFullScan, Option<String> partitionName, Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer, boolean allowInflightInstants) { - super(readerContext, metaClient, storage, logFilePaths, reverseReader, bufferSize, instantRange, withOperationField, + super(readerContext, metaClient, storage, logFiles, reverseReader, bufferSize, instantRange, withOperationField, forceFullScan, partitionName, keyFieldOverride, enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants); if (forceFullScan) { @@ -101,7 +102,7 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer(); this.numMergedRecordsInLog = recordBuffer.size(); - LOG.info("Number of log files scanned => {}", logFilePaths.size()); + LOG.info("Number of log files scanned => {}", logFiles.size()); LOG.info("Number of entries in Map => {}", recordBuffer.size()); } @@ -159,7 +160,7 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> public static class Builder<T> extends BaseHoodieLogRecordReader.Builder<T> { private HoodieReaderContext<T> readerContext; private HoodieStorage storage; - private List<String> logFilePaths; + private List<HoodieLogFile> logFiles; private boolean reverseReader; private int bufferSize; // specific configurations @@ -192,9 +193,8 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> @Override public Builder<T> withLogFiles(List<HoodieLogFile> hoodieLogFiles) { - this.logFilePaths = hoodieLogFiles.stream() + this.logFiles = hoodieLogFiles.stream() .filter(l -> !l.isCDC()) - .map(l -> l.getPath().toString()) .collect(Collectors.toList()); return this; } @@ -263,13 +263,13 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T> public HoodieMergedLogRecordReader<T> build() { ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is null in Merged Log Record Reader"); ValidationUtils.checkArgument(readerContext != null, "Reader Context is null in Merged Log Record Reader"); - if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) { + if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFiles)) { this.partitionName = getRelativePartitionPath( - new StoragePath(readerContext.getTablePath()), new StoragePath(this.logFilePaths.get(0)).getParent()); + new StoragePath(readerContext.getTablePath()), logFiles.get(0).getPath().getParent()); } return new HoodieMergedLogRecordReader<>( - readerContext, metaClient, storage, logFilePaths, + readerContext, metaClient, storage, logFiles, reverseReader, bufferSize, instantRange, withOperationField, forceFullScan, Option.ofNullable(partitionName), diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java index 084909860282..625c678da6bc 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java @@ -260,19 +260,13 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { * @param <T> The type of engine-specific record representation to return. * @return An iterator containing the records of interest in specified type. */ - public final <T> ClosableIterator<T> getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String> keys, boolean fullKey) { + public final <T> ClosableIterator<T> getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String> keys, boolean fullKey) throws IOException { boolean fullScan = keys.isEmpty(); - - // Otherwise, we fetch all the records and filter out all the records, but the - // ones requested - ClosableIterator<T> allRecords = getEngineRecordIterator(readerContext); - if (fullScan) { - return allRecords; + if (!fullScan) { + return lookupEngineRecords(keys, fullKey); + } else { + throw new IllegalStateException("Unexpected code reached. Expected to be called only with keySpec defined for non FILES partition in Metadata table"); } - - HashSet<String> keySet = new HashSet<>(keys); - return FilteringEngineRecordIterator.getInstance(allRecords, keySet, fullKey, record -> - Option.of(readerContext.getRecordContext().getRecordKey(record, readerSchema))); } protected <T> ClosableIterator<HoodieRecord<T>> readRecordsFromBlockPayload(HoodieRecordType type) throws IOException { @@ -329,6 +323,12 @@ public abstract class HoodieDataBlock extends HoodieLogBlock { ); } + protected <T> ClosableIterator<T> lookupEngineRecords(List<String> keys, boolean fullKey) throws IOException { + throw new UnsupportedOperationException( + String.format("Point lookups are not supported by this Data block type (%s)", getBlockType()) + ); + } + protected abstract ByteArrayOutputStream serializeRecords(List<HoodieRecord> records, HoodieStorage storage) throws IOException; protected abstract <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] content, HoodieRecordType type) throws IOException; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java index 0ee914c32dc5..494cd02d781e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java @@ -173,8 +173,33 @@ public class HoodieHFileDataBlock extends HoodieDataBlock { // Get writer's schema from the header final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator = fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema); - return new CloseableMappingIterator<>(recordIterator, data -> (HoodieRecord<T>) data); } } + + @Override + protected <T> ClosableIterator<T> lookupEngineRecords(List<String> sortedKeys, boolean fullKey) throws IOException { + HoodieLogBlockContentLocation blockContentLoc = getBlockContentLocation().get(); + + // NOTE: It's important to extend Hadoop configuration here to make sure configuration + // is appropriately carried over + StorageConfiguration<?> inlineConf = getBlockContentLocation().get().getStorage().getConf().getInline(); + StoragePath inlinePath = InLineFSUtils.getInlineFilePath( + blockContentLoc.getLogFile().getPath(), + blockContentLoc.getLogFile().getPath().toUri().getScheme(), + blockContentLoc.getContentPositionInLogFile(), + blockContentLoc.getBlockSize()); + HoodieStorage inlineStorage = blockContentLoc.getStorage().newInstance(inlinePath, inlineConf); + + try (final HoodieAvroHFileReaderImplBase reader = (HoodieAvroHFileReaderImplBase) HoodieIOFactory + .getIOFactory(inlineStorage) + .getReaderFactory(HoodieRecordType.AVRO) + .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, + inlinePath, + HoodieFileFormat.HFILE, + Option.of(getSchemaFromHeader()))) { + // Get writer's schema from the header + return (ClosableIterator<T>) (fullKey ? reader.getEngineRecordsByKeysIterator(sortedKeys, readerSchema) : reader.getEngineRecordsByKeyPrefixIterator(sortedKeys, readerSchema)); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java index d8d9d215ae07..5b3ff3bcae0d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java @@ -193,15 +193,19 @@ abstract class FileGroupRecordBuffer<T> implements HoodieFileGroupRecordBuffer<T */ protected Pair<ClosableIterator<T>, Schema> getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) { ClosableIterator<T> blockRecordsIterator; - if (keySpecOpt.isPresent()) { - KeySpec keySpec = keySpecOpt.get(); - blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext, keySpec.getKeys(), keySpec.isFullKey()); - } else { - blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext); + try { + if (keySpecOpt.isPresent()) { + KeySpec keySpec = keySpecOpt.get(); + blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext, keySpec.getKeys(), keySpec.isFullKey()); + } else { + blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext); + } + Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock); + return Pair.of(new CloseableMappingIterator<>( + blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()), schemaTransformerWithEvolvedSchema.getRight()); + } catch (IOException e) { + throw new HoodieIOException("Failed to deser records from log files ", e); } - Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema = getSchemaTransformerWithEvolvedSchema(dataBlock); - return Pair.of(new CloseableMappingIterator<>( - blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()), schemaTransformerWithEvolvedSchema.getRight()); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java index 7f361af6ca20..ce4a733428a5 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java @@ -210,23 +210,35 @@ public class HoodieNativeAvroHFileReader extends HoodieAvroHFileReaderImplBase { @Override public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeysIterator( List<String> sortedKeys, Schema schema) throws IOException { - HFileReader reader = readerFactory.createHFileReader(); ClosableIterator<IndexedRecord> iterator = - new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema, useBloomFilter); + getEngineRecordsByKeysIterator(sortedKeys, schema); return new CloseableMappingIterator<>( iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); } + @Override + public ClosableIterator<IndexedRecord> getEngineRecordsByKeysIterator( + List<String> sortedKeys, Schema schema) throws IOException { + HFileReader reader = readerFactory.createHFileReader(); + return new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema, useBloomFilter); + } + @Override public ClosableIterator<HoodieRecord<IndexedRecord>> getRecordsByKeyPrefixIterator( List<String> sortedKeyPrefixes, Schema schema) throws IOException { - HFileReader reader = readerFactory.createHFileReader(); ClosableIterator<IndexedRecord> iterator = - new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, getSchema(), schema); + getEngineRecordsByKeyPrefixIterator(sortedKeyPrefixes, schema); return new CloseableMappingIterator<>( iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data))); } + @Override + public ClosableIterator<IndexedRecord> getEngineRecordsByKeyPrefixIterator( + List<String> sortedKeyPrefixes, Schema schema) throws IOException { + HFileReader reader = readerFactory.createHFileReader(); + return new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, getSchema(), schema); + } + @Override public boolean supportKeyPredicate() { return true; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java index e7547d2f02cd..5b8dcea0fb02 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java @@ -33,6 +33,10 @@ public interface HoodieSeekingFileReader<T> extends HoodieFileReader<T> { throw new UnsupportedOperationException(); } + default ClosableIterator<T> getEngineRecordsByKeysIterator(List<String> sortedKeys, Schema schema) throws IOException { + throw new UnsupportedOperationException(); + } + default ClosableIterator<HoodieRecord<T>> getRecordsByKeysIterator(List<String> sortedKeys) throws IOException { return getRecordsByKeysIterator(sortedKeys, getSchema()); } @@ -41,6 +45,10 @@ public interface HoodieSeekingFileReader<T> extends HoodieFileReader<T> { throw new UnsupportedEncodingException(); } + default ClosableIterator<T> getEngineRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes, Schema schema) throws IOException { + throw new UnsupportedEncodingException(); + } + default ClosableIterator<HoodieRecord<T>> getRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes) throws IOException { return getRecordsByKeyPrefixIterator(sortedKeyPrefixes, getSchema()); } diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java index ee83e7c52999..a20303e6a3bc 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java @@ -394,6 +394,15 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { // no entries should match since this is exact match. assertEquals(Collections.emptyList(), recordsByKeys); + iterator = + hfileReader.getEngineRecordsByKeysIterator(keys, avroSchema); + + recordsByKeys = + toStream(iterator).map(r -> (GenericRecord) r).collect(Collectors.toList()); + + // no entries should match since this is exact match. + assertEquals(Collections.emptyList(), recordsByKeys); + // filter for "key00001, key05, key12, key24, key2, key31, key49, key61, key50". Valid entries should be matched. // key00001 should not match. // key2 : we don't have an exact match @@ -414,6 +423,16 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { .map(r -> (GenericRecord) r) .collect(Collectors.toList()); assertEquals(expectedKey1s, recordsByKeys); + + iterator = + hfileReader.getEngineRecordsByKeysIterator( + Arrays.asList("key00001", "key05", "key12", "key24", "key31", "key49", "key61", "key50"), + avroSchema); + recordsByKeys = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expectedKey1s, recordsByKeys); } try (HoodieAvroHFileReaderImplBase hfileReader = getHFileReaderFromFixture(useBloomFilter)) { List<GenericRecord> allRecords = toStream(hfileReader.getRecordIterator()) @@ -431,6 +450,16 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { || (entry.get("_row_key").toString()).equals("key004958-abcdefghij") || (entry.get("_row_key").toString()).equals("key019889-abcdefghij"))).collect(Collectors.toList()); assertEquals(expected, actual); + + iterator = hfileReader.getEngineRecordsByKeysIterator( + Arrays.asList("key000000", "key000066-abcdefghij", "key001424-aa", "key001424-aaa", + "key004958-abcdefghij", "key010769", "key019889-abcdefghij", "key030000"), + avroSchema); + actual = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expected, actual); } } @@ -454,6 +483,12 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { assertEquals(allRecords, recordsByPrefix); + // test getEngineRecordsByKeyPrefix + iterator = hfileReader.getEngineRecordsByKeyPrefixIterator(keyPrefixes, avroSchema); + recordsByPrefix = + toStream(iterator).map(r -> (GenericRecord) r).collect(Collectors.toList()); + assertEquals(allRecords, recordsByPrefix); + // filter for "key1" : entries from key10 to key19 should be matched List<GenericRecord> expectedKey1s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key1")) @@ -468,6 +503,17 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { .collect(Collectors.toList()); assertEquals(expectedKey1s, recordsByPrefix); + // test getEngineRecordsByKeyPrefix + iterator = + hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key1"), + avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), + false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expectedKey1s, recordsByPrefix); + // exact match List<GenericRecord> expectedKey25 = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key25")) @@ -480,6 +526,14 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { .collect(Collectors.toList()); assertEquals(expectedKey25, recordsByPrefix); + iterator = + hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key25"), avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expectedKey25, recordsByPrefix); + // no match. key prefix is beyond entries in file. iterator = hfileReader.getIndexedRecordsByKeyPrefixIterator(Collections.singletonList("key99"), avroSchema); @@ -489,6 +543,14 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { .collect(Collectors.toList()); assertEquals(Collections.emptyList(), recordsByPrefix); + iterator = + hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key99"), avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(Collections.emptyList(), recordsByPrefix); + // no match. but keyPrefix is in between the entries found in file. iterator = hfileReader.getIndexedRecordsByKeyPrefixIterator(Collections.singletonList("key1234"), avroSchema); @@ -497,6 +559,13 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { .map(r -> (GenericRecord) r) .collect(Collectors.toList()); assertEquals(Collections.emptyList(), recordsByPrefix); + iterator = + hfileReader.getEngineRecordsByKeysIterator(Collections.singletonList("key1234"), avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(Collections.emptyList(), recordsByPrefix); // filter for "key1", "key30" and "key60" : entries from 'key10 to key19' and 'key30' should be matched. List<GenericRecord> expectedKey50and1s = @@ -510,6 +579,14 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { .collect(Collectors.toList()); assertEquals(expectedKey50and1s, recordsByPrefix); + iterator = + hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key1", "key30", "key6"), avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expectedKey50and1s, recordsByPrefix); + // filter for "key50" and "key0" : entries from key50 and 'key00 to key09' should be matched. List<GenericRecord> expectedKey50and0s = allRecords.stream().filter(entry -> (entry.get("_row_key").toString()).contains("key0") @@ -521,6 +598,13 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { .map(r -> (GenericRecord) r) .collect(Collectors.toList()); assertEquals(expectedKey50and0s, recordsByPrefix); + iterator = + hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key0", "key50"), avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expectedKey50and0s, recordsByPrefix); // filter for "key1" and "key0" : entries from 'key10 to key19' and 'key00 to key09' should be matched. List<GenericRecord> expectedKey1sand0s = allRecords.stream() @@ -541,6 +625,20 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { }); assertEquals(expectedKey1sand0s, recordsByPrefix); + iterator = + hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key0", "key1"), avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + Collections.sort(recordsByPrefix, new Comparator<GenericRecord>() { + @Override + public int compare(GenericRecord o1, GenericRecord o2) { + return o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString()); + } + }); + assertEquals(expectedKey1sand0s, recordsByPrefix); + // We expect the keys to be looked up in sorted order. If not, matching entries may not be returned. // key1 should have matching entries, but not key0. iterator = @@ -556,6 +654,19 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { } }); assertEquals(expectedKey1s, recordsByPrefix); + + iterator = + hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key1", "key0"), avroSchema); + recordsByPrefix = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + Collections.sort(recordsByPrefix, new Comparator<GenericRecord>() { + @Override + public int compare(GenericRecord o1, GenericRecord o2) { + return o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString()); + } + }); } try (HoodieAvroHFileReaderImplBase hfileReader = getHFileReaderFromFixture(useBloomFilter)) { List<GenericRecord> allRecords = toStream(hfileReader.getRecordIterator()) @@ -575,6 +686,17 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { || (entry.get("_row_key").toString()).contains("key010769") || (entry.get("_row_key").toString()).contains("key01988"))).collect(Collectors.toList()); assertEquals(expected, actual); + + iterator = hfileReader.getEngineRecordsByKeyPrefixIterator( + Arrays.asList("key000000", "key000066-abcdefghij", "key001424-aa", "key001424-aaa", + "key004958", "key010769", "key01988", "key030000"), + avroSchema); + actual = + StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(r -> (GenericRecord) r) + .collect(Collectors.toList()); + assertEquals(expected, actual); + } } @@ -600,6 +722,17 @@ public class TestHoodieHFileReaderWriter extends TestHoodieReaderWriterBase { Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) .collect(Collectors.toList())); } + + try (ClosableIterator<IndexedRecord> iterator = + hfileReader.getEngineRecordsByKeysIterator( + Arrays.asList("key00001", "key05", "key24", "key16", "key31", "key61"), + avroSchema)) { + assertThrows( + IllegalStateException.class, + () -> StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .collect(Collectors.toList())); + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java index ede8e3c932e2..4ee8c34c935d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java @@ -487,7 +487,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { table.getMetaClient(), slice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) - .map(file -> file.getPath().toString()) .collect(Collectors.toList()), config.getMaxDFSStreamBufferSize(), instant, @@ -497,7 +496,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends HoodieClientTestBase { table.getMetaClient(), slice.getLogFiles() .sorted(HoodieLogFile.getLogFileComparator()) - .map(file -> file.getPath().toString()) .collect(Collectors.toList()), config.getMaxDFSStreamBufferSize(), currentInstant,
