This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b0fa11424d37 fix: Fixing point lookup in MDT partitions (#14085)
b0fa11424d37 is described below
commit b0fa11424d379c7226999a4fb2a1a8922f8812b9
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Oct 15 14:07:43 2025 -0700
fix: Fixing point lookup in MDT partitions (#14085)
---------
Co-authored-by: Lin Liu <[email protected]>
---
.../HoodieLogCompactionPlanGenerator.java | 1 -
.../table/log/BaseHoodieLogRecordReader.java | 18 ++-
.../table/log/HoodieLogBlockMetadataScanner.java | 5 +-
.../table/log/HoodieMergedLogRecordReader.java | 18 +--
.../common/table/log/block/HoodieDataBlock.java | 22 ++--
.../table/log/block/HoodieHFileDataBlock.java | 27 ++++-
.../table/read/buffer/FileGroupRecordBuffer.java | 20 ++--
.../io/storage/HoodieNativeAvroHFileReader.java | 20 +++-
.../hudi/io/storage/HoodieSeekingFileReader.java | 8 ++
.../io/hadoop/TestHoodieHFileReaderWriter.java | 133 +++++++++++++++++++++
.../TestHoodieClientOnMergeOnReadStorage.java | 2 -
11 files changed, 226 insertions(+), 48 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
index 4739e9cfff51..3d704d85c7b5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/HoodieLogCompactionPlanGenerator.java
@@ -102,7 +102,6 @@ public class HoodieLogCompactionPlanGenerator<T extends
HoodieRecordPayload, I,
}
HoodieLogBlockMetadataScanner scanner = new
HoodieLogBlockMetadataScanner(metaClient, fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
- .map(file -> file.getPath().toString())
.collect(Collectors.toList()),
writeConfig.getMaxDFSStreamBufferSize(),
maxInstantTime,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 4bb3454fbda9..b471f5089369 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -41,7 +41,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.storage.HoodieStorage;
-import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.slf4j.Logger;
@@ -98,7 +97,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
protected final String orderingFields;
private final TypedProperties payloadProps;
// Log File Paths
- protected final List<String> logFilePaths;
+ protected final List<HoodieLogFile> logFiles;
// Reverse reader - Not implemented yet (NA -> Why do we need ?)
// but present here for plumbing for future implementation
private final boolean reverseReader;
@@ -139,7 +138,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
// table version for compatibility
private final HoodieTableVersion tableVersion;
- protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage,
List<String> logFilePaths,
+ protected BaseHoodieLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient, HoodieStorage storage,
+ List<HoodieLogFile> logFiles,
boolean reverseReader, int bufferSize,
Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan, Option<String> partitionNameOverride,
Option<String> keyFieldOverride, boolean
enableOptimizedLogBlocksScan, HoodieFileGroupRecordBuffer<T> recordBuffer,
@@ -158,8 +158,8 @@ public abstract class BaseHoodieLogRecordReader<T> {
props.setProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY,
this.orderingFields);
}
this.payloadProps = props;
- this.totalLogFiles.addAndGet(logFilePaths.size());
- this.logFilePaths = logFilePaths;
+ this.totalLogFiles.addAndGet(logFiles.size());
+ this.logFiles = logFiles;
this.reverseReader = reverseReader;
this.storage = storage;
this.bufferSize = bufferSize;
@@ -219,8 +219,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
HoodieLogFormatReader logFormatReaderWrapper = null;
try {
// Iterate over the paths
- logFormatReaderWrapper = new HoodieLogFormatReader(storage,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new
StoragePath(logFile))).collect(Collectors.toList()),
+ logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
readerSchema, reverseReader, bufferSize, shouldLookupRecords(),
recordKeyField, internalSchema);
Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
@@ -511,8 +510,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
HoodieLogFormatReader logFormatReaderWrapper = null;
try {
// Iterate over the paths
- logFormatReaderWrapper = new HoodieLogFormatReader(storage,
- logFilePaths.stream().map(logFile -> new HoodieLogFile(new
StoragePath(logFile))).collect(Collectors.toList()),
+ logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
readerSchema, reverseReader, bufferSize, shouldLookupRecords(),
recordKeyField, internalSchema);
/**
@@ -745,7 +743,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
}
}
// At this step the lastBlocks are consumed. We track approximate progress
by number of log-files seen
- progress = (float) (numLogFilesSeen - 1) / logFilePaths.size();
+ progress = (float) (numLogFilesSeen - 1) / logFiles.size();
}
private boolean shouldLookupRecords() {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
index 14d65d1aa6a3..23d64eed88cb 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogBlockMetadataScanner.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log;
import org.apache.hudi.avro.HoodieAvroReaderContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
@@ -32,8 +33,8 @@ import java.util.List;
*/
public class HoodieLogBlockMetadataScanner extends
BaseHoodieLogRecordReader<IndexedRecord> {
- public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient,
List<String> logFilePaths, int bufferSize, String maxInstantTime,
Option<InstantRange> instantRange) {
- super(getReaderContext(metaClient, maxInstantTime), metaClient,
metaClient.getStorage(), logFilePaths, false, bufferSize, instantRange, false,
false, Option.empty(), Option.empty(), true,
+ public HoodieLogBlockMetadataScanner(HoodieTableMetaClient metaClient,
List<HoodieLogFile> logFiles, int bufferSize, String maxInstantTime,
Option<InstantRange> instantRange) {
+ super(getReaderContext(metaClient, maxInstantTime), metaClient,
metaClient.getStorage(), logFiles, false, bufferSize, instantRange, false,
false, Option.empty(), Option.empty(), true,
null, false);
scanInternal(Option.empty(), true);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
index c861f6fc2463..4ca30827fb8b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java
@@ -63,11 +63,12 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
private long totalTimeTakenToReadAndMergeBlocks;
@SuppressWarnings("unchecked")
- private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient metaClient, HoodieStorage storage, List<String>
logFilePaths, boolean reverseReader,
+ private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient metaClient, HoodieStorage storage,
+ List<HoodieLogFile> logFiles, boolean
reverseReader,
int bufferSize, Option<InstantRange>
instantRange, boolean withOperationField, boolean forceFullScan,
Option<String> partitionName,
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
HoodieFileGroupRecordBuffer<T>
recordBuffer, boolean allowInflightInstants) {
- super(readerContext, metaClient, storage, logFilePaths, reverseReader,
bufferSize, instantRange, withOperationField,
+ super(readerContext, metaClient, storage, logFiles, reverseReader,
bufferSize, instantRange, withOperationField,
forceFullScan, partitionName, keyFieldOverride,
enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
if (forceFullScan) {
@@ -101,7 +102,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
this.numMergedRecordsInLog = recordBuffer.size();
- LOG.info("Number of log files scanned => {}", logFilePaths.size());
+ LOG.info("Number of log files scanned => {}", logFiles.size());
LOG.info("Number of entries in Map => {}", recordBuffer.size());
}
@@ -159,7 +160,7 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
public static class Builder<T> extends BaseHoodieLogRecordReader.Builder<T> {
private HoodieReaderContext<T> readerContext;
private HoodieStorage storage;
- private List<String> logFilePaths;
+ private List<HoodieLogFile> logFiles;
private boolean reverseReader;
private int bufferSize;
// specific configurations
@@ -192,9 +193,8 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
@Override
public Builder<T> withLogFiles(List<HoodieLogFile> hoodieLogFiles) {
- this.logFilePaths = hoodieLogFiles.stream()
+ this.logFiles = hoodieLogFiles.stream()
.filter(l -> !l.isCDC())
- .map(l -> l.getPath().toString())
.collect(Collectors.toList());
return this;
}
@@ -263,13 +263,13 @@ public class HoodieMergedLogRecordReader<T> extends
BaseHoodieLogRecordReader<T>
public HoodieMergedLogRecordReader<T> build() {
ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is
null in Merged Log Record Reader");
ValidationUtils.checkArgument(readerContext != null, "Reader Context is
null in Merged Log Record Reader");
- if (this.partitionName == null &&
CollectionUtils.nonEmpty(this.logFilePaths)) {
+ if (this.partitionName == null &&
CollectionUtils.nonEmpty(this.logFiles)) {
this.partitionName = getRelativePartitionPath(
- new StoragePath(readerContext.getTablePath()), new
StoragePath(this.logFilePaths.get(0)).getParent());
+ new StoragePath(readerContext.getTablePath()),
logFiles.get(0).getPath().getParent());
}
return new HoodieMergedLogRecordReader<>(
- readerContext, metaClient, storage, logFilePaths,
+ readerContext, metaClient, storage, logFiles,
reverseReader, bufferSize, instantRange,
withOperationField, forceFullScan,
Option.ofNullable(partitionName),
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 084909860282..625c678da6bc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -260,19 +260,13 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
* @param <T> The type of engine-specific record representation to
return.
* @return An iterator containing the records of interest in specified type.
*/
- public final <T> ClosableIterator<T>
getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String>
keys, boolean fullKey) {
+ public final <T> ClosableIterator<T>
getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String>
keys, boolean fullKey) throws IOException {
boolean fullScan = keys.isEmpty();
-
- // Otherwise, we fetch all the records and filter out all the records, but
the
- // ones requested
- ClosableIterator<T> allRecords = getEngineRecordIterator(readerContext);
- if (fullScan) {
- return allRecords;
+ if (!fullScan) {
+ return lookupEngineRecords(keys, fullKey);
+ } else {
+ throw new IllegalStateException("Unexpected code reached. Expected to be
called only with keySpec defined for non FILES partition in Metadata table");
}
-
- HashSet<String> keySet = new HashSet<>(keys);
- return FilteringEngineRecordIterator.getInstance(allRecords, keySet,
fullKey, record ->
- Option.of(readerContext.getRecordContext().getRecordKey(record,
readerSchema)));
}
protected <T> ClosableIterator<HoodieRecord<T>>
readRecordsFromBlockPayload(HoodieRecordType type) throws IOException {
@@ -329,6 +323,12 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
);
}
+ protected <T> ClosableIterator<T> lookupEngineRecords(List<String> keys,
boolean fullKey) throws IOException {
+ throw new UnsupportedOperationException(
+ String.format("Point lookups are not supported by this Data block type
(%s)", getBlockType())
+ );
+ }
+
protected abstract ByteArrayOutputStream serializeRecords(List<HoodieRecord>
records, HoodieStorage storage) throws IOException;
protected abstract <T> ClosableIterator<HoodieRecord<T>>
deserializeRecords(byte[] content, HoodieRecordType type) throws IOException;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 0ee914c32dc5..494cd02d781e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -173,8 +173,33 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
// Get writer's schema from the header
final ClosableIterator<HoodieRecord<IndexedRecord>> recordIterator =
fullKey ? reader.getRecordsByKeysIterator(sortedKeys, readerSchema)
: reader.getRecordsByKeyPrefixIterator(sortedKeys, readerSchema);
-
return new CloseableMappingIterator<>(recordIterator, data ->
(HoodieRecord<T>) data);
}
}
+
+ @Override
+ protected <T> ClosableIterator<T> lookupEngineRecords(List<String>
sortedKeys, boolean fullKey) throws IOException {
+ HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
+
+ // NOTE: It's important to extend Hadoop configuration here to make sure
configuration
+ // is appropriately carried over
+ StorageConfiguration<?> inlineConf =
getBlockContentLocation().get().getStorage().getConf().getInline();
+ StoragePath inlinePath = InLineFSUtils.getInlineFilePath(
+ blockContentLoc.getLogFile().getPath(),
+ blockContentLoc.getLogFile().getPath().toUri().getScheme(),
+ blockContentLoc.getContentPositionInLogFile(),
+ blockContentLoc.getBlockSize());
+ HoodieStorage inlineStorage =
blockContentLoc.getStorage().newInstance(inlinePath, inlineConf);
+
+ try (final HoodieAvroHFileReaderImplBase reader =
(HoodieAvroHFileReaderImplBase) HoodieIOFactory
+ .getIOFactory(inlineStorage)
+ .getReaderFactory(HoodieRecordType.AVRO)
+ .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+ inlinePath,
+ HoodieFileFormat.HFILE,
+ Option.of(getSchemaFromHeader()))) {
+ // Get writer's schema from the header
+ return (ClosableIterator<T>) (fullKey ?
reader.getEngineRecordsByKeysIterator(sortedKeys, readerSchema) :
reader.getEngineRecordsByKeyPrefixIterator(sortedKeys, readerSchema));
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index d8d9d215ae07..5b3ff3bcae0d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -193,15 +193,19 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
*/
protected Pair<ClosableIterator<T>, Schema>
getRecordsIterator(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) {
ClosableIterator<T> blockRecordsIterator;
- if (keySpecOpt.isPresent()) {
- KeySpec keySpec = keySpecOpt.get();
- blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext,
keySpec.getKeys(), keySpec.isFullKey());
- } else {
- blockRecordsIterator = dataBlock.getEngineRecordIterator(readerContext);
+ try {
+ if (keySpecOpt.isPresent()) {
+ KeySpec keySpec = keySpecOpt.get();
+ blockRecordsIterator =
dataBlock.getEngineRecordIterator(readerContext, keySpec.getKeys(),
keySpec.isFullKey());
+ } else {
+ blockRecordsIterator =
dataBlock.getEngineRecordIterator(readerContext);
+ }
+ Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema =
getSchemaTransformerWithEvolvedSchema(dataBlock);
+ return Pair.of(new CloseableMappingIterator<>(
+ blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()),
schemaTransformerWithEvolvedSchema.getRight());
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to deser records from log files ",
e);
}
- Pair<Function<T, T>, Schema> schemaTransformerWithEvolvedSchema =
getSchemaTransformerWithEvolvedSchema(dataBlock);
- return Pair.of(new CloseableMappingIterator<>(
- blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()),
schemaTransformerWithEvolvedSchema.getRight());
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index 7f361af6ca20..ce4a733428a5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -210,23 +210,35 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public ClosableIterator<HoodieRecord<IndexedRecord>>
getRecordsByKeysIterator(
List<String> sortedKeys, Schema schema) throws IOException {
- HFileReader reader = readerFactory.createHFileReader();
ClosableIterator<IndexedRecord> iterator =
- new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema,
useBloomFilter);
+ getEngineRecordsByKeysIterator(sortedKeys, schema);
return new CloseableMappingIterator<>(
iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}
+ @Override
+ public ClosableIterator<IndexedRecord> getEngineRecordsByKeysIterator(
+ List<String> sortedKeys, Schema schema) throws IOException {
+ HFileReader reader = readerFactory.createHFileReader();
+ return new RecordByKeyIterator(reader, sortedKeys, getSchema(), schema,
useBloomFilter);
+ }
+
@Override
public ClosableIterator<HoodieRecord<IndexedRecord>>
getRecordsByKeyPrefixIterator(
List<String> sortedKeyPrefixes, Schema schema) throws IOException {
- HFileReader reader = readerFactory.createHFileReader();
ClosableIterator<IndexedRecord> iterator =
- new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes, getSchema(),
schema);
+ getEngineRecordsByKeyPrefixIterator(sortedKeyPrefixes, schema);
return new CloseableMappingIterator<>(
iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}
+ @Override
+ public ClosableIterator<IndexedRecord> getEngineRecordsByKeyPrefixIterator(
+ List<String> sortedKeyPrefixes, Schema schema) throws IOException {
+ HFileReader reader = readerFactory.createHFileReader();
+ return new RecordByKeyPrefixIterator(reader, sortedKeyPrefixes,
getSchema(), schema);
+ }
+
@Override
public boolean supportKeyPredicate() {
return true;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
index e7547d2f02cd..5b8dcea0fb02 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieSeekingFileReader.java
@@ -33,6 +33,10 @@ public interface HoodieSeekingFileReader<T> extends
HoodieFileReader<T> {
throw new UnsupportedOperationException();
}
+ default ClosableIterator<T> getEngineRecordsByKeysIterator(List<String>
sortedKeys, Schema schema) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
default ClosableIterator<HoodieRecord<T>>
getRecordsByKeysIterator(List<String> sortedKeys) throws IOException {
return getRecordsByKeysIterator(sortedKeys, getSchema());
}
@@ -41,6 +45,10 @@ public interface HoodieSeekingFileReader<T> extends
HoodieFileReader<T> {
throw new UnsupportedEncodingException();
}
+ default ClosableIterator<T> getEngineRecordsByKeyPrefixIterator(List<String>
sortedKeyPrefixes, Schema schema) throws IOException {
+ throw new UnsupportedEncodingException();
+ }
+
default ClosableIterator<HoodieRecord<T>>
getRecordsByKeyPrefixIterator(List<String> sortedKeyPrefixes) throws
IOException {
return getRecordsByKeyPrefixIterator(sortedKeyPrefixes, getSchema());
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index ee83e7c52999..a20303e6a3bc 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -394,6 +394,15 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
// no entries should match since this is exact match.
assertEquals(Collections.emptyList(), recordsByKeys);
+ iterator =
+ hfileReader.getEngineRecordsByKeysIterator(keys, avroSchema);
+
+ recordsByKeys =
+ toStream(iterator).map(r -> (GenericRecord)
r).collect(Collectors.toList());
+
+ // no entries should match since this is exact match.
+ assertEquals(Collections.emptyList(), recordsByKeys);
+
// filter for "key00001, key05, key12, key24, key2, key31, key49, key61,
key50". Valid entries should be matched.
// key00001 should not match.
// key2 : we don't have an exact match
@@ -414,6 +423,16 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
.map(r -> (GenericRecord) r)
.collect(Collectors.toList());
assertEquals(expectedKey1s, recordsByKeys);
+
+ iterator =
+ hfileReader.getEngineRecordsByKeysIterator(
+ Arrays.asList("key00001", "key05", "key12", "key24", "key31",
"key49", "key61", "key50"),
+ avroSchema);
+ recordsByKeys =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey1s, recordsByKeys);
}
try (HoodieAvroHFileReaderImplBase hfileReader =
getHFileReaderFromFixture(useBloomFilter)) {
List<GenericRecord> allRecords =
toStream(hfileReader.getRecordIterator())
@@ -431,6 +450,16 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
||
(entry.get("_row_key").toString()).equals("key004958-abcdefghij")
||
(entry.get("_row_key").toString()).equals("key019889-abcdefghij"))).collect(Collectors.toList());
assertEquals(expected, actual);
+
+ iterator = hfileReader.getEngineRecordsByKeysIterator(
+ Arrays.asList("key000000", "key000066-abcdefghij", "key001424-aa",
"key001424-aaa",
+ "key004958-abcdefghij", "key010769", "key019889-abcdefghij",
"key030000"),
+ avroSchema);
+ actual =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expected, actual);
}
}
@@ -454,6 +483,12 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
assertEquals(allRecords, recordsByPrefix);
+ // test getEngineRecordsByKeyPrefix
+ iterator = hfileReader.getEngineRecordsByKeyPrefixIterator(keyPrefixes,
avroSchema);
+ recordsByPrefix =
+ toStream(iterator).map(r -> (GenericRecord)
r).collect(Collectors.toList());
+ assertEquals(allRecords, recordsByPrefix);
+
// filter for "key1" : entries from key10 to key19 should be matched
List<GenericRecord> expectedKey1s =
allRecords.stream().filter(entry ->
(entry.get("_row_key").toString()).contains("key1"))
@@ -468,6 +503,17 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
.collect(Collectors.toList());
assertEquals(expectedKey1s, recordsByPrefix);
+ // test getEngineRecordsByKeyPrefix
+ iterator =
+
hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key1"),
+ avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED),
+ false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey1s, recordsByPrefix);
+
// exact match
List<GenericRecord> expectedKey25 =
allRecords.stream().filter(entry ->
(entry.get("_row_key").toString()).contains("key25"))
@@ -480,6 +526,14 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
.collect(Collectors.toList());
assertEquals(expectedKey25, recordsByPrefix);
+ iterator =
+
hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key25"),
avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey25, recordsByPrefix);
+
// no match. key prefix is beyond entries in file.
iterator =
hfileReader.getIndexedRecordsByKeyPrefixIterator(Collections.singletonList("key99"),
avroSchema);
@@ -489,6 +543,14 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
.collect(Collectors.toList());
assertEquals(Collections.emptyList(), recordsByPrefix);
+ iterator =
+
hfileReader.getEngineRecordsByKeyPrefixIterator(Collections.singletonList("key99"),
avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(Collections.emptyList(), recordsByPrefix);
+
// no match. but keyPrefix is in between the entries found in file.
iterator =
hfileReader.getIndexedRecordsByKeyPrefixIterator(Collections.singletonList("key1234"),
avroSchema);
@@ -497,6 +559,13 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
.map(r -> (GenericRecord) r)
.collect(Collectors.toList());
assertEquals(Collections.emptyList(), recordsByPrefix);
+ iterator =
+
hfileReader.getEngineRecordsByKeysIterator(Collections.singletonList("key1234"),
avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(Collections.emptyList(), recordsByPrefix);
// filter for "key1", "key30" and "key60" : entries from 'key10 to
key19' and 'key30' should be matched.
List<GenericRecord> expectedKey50and1s =
@@ -510,6 +579,14 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
.collect(Collectors.toList());
assertEquals(expectedKey50and1s, recordsByPrefix);
+ iterator =
+
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key1", "key30",
"key6"), avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey50and1s, recordsByPrefix);
+
// filter for "key50" and "key0" : entries from key50 and 'key00 to
key09' should be matched.
List<GenericRecord> expectedKey50and0s =
allRecords.stream().filter(entry ->
(entry.get("_row_key").toString()).contains("key0")
@@ -521,6 +598,13 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
.map(r -> (GenericRecord) r)
.collect(Collectors.toList());
assertEquals(expectedKey50and0s, recordsByPrefix);
+ iterator =
+
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key0", "key50"),
avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expectedKey50and0s, recordsByPrefix);
// filter for "key1" and "key0" : entries from 'key10 to key19' and
'key00 to key09' should be matched.
List<GenericRecord> expectedKey1sand0s = allRecords.stream()
@@ -541,6 +625,20 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
});
assertEquals(expectedKey1sand0s, recordsByPrefix);
+ iterator =
+
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key0", "key1"),
avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ Collections.sort(recordsByPrefix, new Comparator<GenericRecord>() {
+ @Override
+ public int compare(GenericRecord o1, GenericRecord o2) {
+ return
o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString());
+ }
+ });
+ assertEquals(expectedKey1sand0s, recordsByPrefix);
+
// We expect the keys to be looked up in sorted order. If not, matching
entries may not be returned.
// key1 should have matching entries, but not key0.
iterator =
@@ -556,6 +654,19 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
}
});
assertEquals(expectedKey1s, recordsByPrefix);
+
+ iterator =
+
hfileReader.getEngineRecordsByKeyPrefixIterator(Arrays.asList("key1", "key0"),
avroSchema);
+ recordsByPrefix =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ Collections.sort(recordsByPrefix, new Comparator<GenericRecord>() {
+ @Override
+ public int compare(GenericRecord o1, GenericRecord o2) {
+ return
o1.get("_row_key").toString().compareTo(o2.get("_row_key").toString());
+ }
+ });
}
try (HoodieAvroHFileReaderImplBase hfileReader =
getHFileReaderFromFixture(useBloomFilter)) {
List<GenericRecord> allRecords =
toStream(hfileReader.getRecordIterator())
@@ -575,6 +686,17 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
|| (entry.get("_row_key").toString()).contains("key010769")
||
(entry.get("_row_key").toString()).contains("key01988"))).collect(Collectors.toList());
assertEquals(expected, actual);
+
+ iterator = hfileReader.getEngineRecordsByKeyPrefixIterator(
+ Arrays.asList("key000000", "key000066-abcdefghij", "key001424-aa",
"key001424-aaa",
+ "key004958", "key010769", "key01988", "key030000"),
+ avroSchema);
+ actual =
+ StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .map(r -> (GenericRecord) r)
+ .collect(Collectors.toList());
+ assertEquals(expected, actual);
+
}
}
@@ -600,6 +722,17 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
.collect(Collectors.toList()));
}
+
+ try (ClosableIterator<IndexedRecord> iterator =
+ hfileReader.getEngineRecordsByKeysIterator(
+ Arrays.asList("key00001", "key05", "key24", "key16",
"key31", "key61"),
+ avroSchema)) {
+ assertThrows(
+ IllegalStateException.class,
+ () -> StreamSupport.stream(
+ Spliterators.spliteratorUnknownSize(iterator,
Spliterator.ORDERED), false)
+ .collect(Collectors.toList()));
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
index ede8e3c932e2..4ee8c34c935d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnMergeOnReadStorage.java
@@ -487,7 +487,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
table.getMetaClient(),
slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
- .map(file -> file.getPath().toString())
.collect(Collectors.toList()),
config.getMaxDFSStreamBufferSize(),
instant,
@@ -497,7 +496,6 @@ public class TestHoodieClientOnMergeOnReadStorage extends
HoodieClientTestBase {
table.getMetaClient(),
slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
- .map(file -> file.getPath().toString())
.collect(Collectors.toList()),
config.getMaxDFSStreamBufferSize(),
currentInstant,