This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e23f402e194 [HUDI-7347] Introduce SeekableDataInputStream for random
access (#10575)
e23f402e194 is described below
commit e23f402e194498088f17142d9f132548ffbbd91d
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Jan 31 16:48:46 2024 -0800
[HUDI-7347] Introduce SeekableDataInputStream for random access (#10575)
---
.../hudi/common/table/log/HoodieLogFileReader.java | 36 +++++++++++----
.../table/log/block/HoodieAvroDataBlock.java | 4 +-
.../common/table/log/block/HoodieCDCDataBlock.java | 4 +-
.../common/table/log/block/HoodieCommandBlock.java | 5 +-
.../common/table/log/block/HoodieCorruptBlock.java | 5 +-
.../common/table/log/block/HoodieDataBlock.java | 4 +-
.../common/table/log/block/HoodieDeleteBlock.java | 6 +--
.../table/log/block/HoodieHFileDataBlock.java | 4 +-
.../common/table/log/block/HoodieLogBlock.java | 16 +++----
.../table/log/block/HoodieParquetDataBlock.java | 4 +-
.../hadoop/fs/HadoopSeekableDataInputStream.java | 48 ++++++++++++++++++++
.../apache/hudi/io/SeekableDataInputStream.java | 53 ++++++++++++++++++++++
12 files changed, 150 insertions(+), 39 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index cce13c1a6e2..fa8174931c4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -37,9 +37,11 @@ import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.hadoop.fs.BoundedFsDataInputStream;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
import org.apache.hudi.hadoop.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.hadoop.fs.TimedFSDataInputStream;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.util.IOUtils;
import org.apache.hudi.storage.StorageSchemes;
@@ -90,7 +92,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
private final boolean reverseReader;
private final boolean enableRecordLookups;
private boolean closed = false;
- private FSDataInputStream inputStream;
+ private SeekableDataInputStream inputStream;
public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema
readerSchema, int bufferSize,
boolean readBlockLazily) throws IOException {
@@ -120,7 +122,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
Path updatedPath = FSUtils.makeQualified(fs, logFile.getPath());
this.logFile = updatedPath.equals(logFile.getPath()) ? logFile : new
HoodieLogFile(updatedPath, logFile.getFileSize());
this.bufferSize = bufferSize;
- this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
+ this.inputStream = getDataInputStream(fs, this.logFile, bufferSize);
this.readerSchema = readerSchema;
this.readBlockLazily = readBlockLazily;
this.reverseReader = reverseReader;
@@ -202,7 +204,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
if (nextBlockVersion.getVersion() ==
HoodieLogFormatVersion.DEFAULT_VERSION) {
return HoodieAvroDataBlock.getBlock(content.get(), readerSchema,
internalSchema);
} else {
- return new HoodieAvroDataBlock(() -> getFSDataInputStream(fs,
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
+ return new HoodieAvroDataBlock(() -> getDataInputStream(fs,
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
getTargetReaderSchemaForBlock(), header, footer, keyField);
}
@@ -210,7 +212,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
checkState(nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("HFile block could not be of version (%d)",
HoodieLogFormatVersion.DEFAULT_VERSION));
return new HoodieHFileDataBlock(
- () -> getFSDataInputStream(fs, this.logFile, bufferSize), content,
readBlockLazily, logBlockContentLoc,
+ () -> getDataInputStream(fs, this.logFile, bufferSize), content,
readBlockLazily, logBlockContentLoc,
Option.ofNullable(readerSchema), header, footer,
enableRecordLookups, logFile.getPath(),
ConfigUtils.getBooleanWithAltKeys(fs.getConf(),
USE_NATIVE_HFILE_READER));
@@ -218,17 +220,17 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
checkState(nextBlockVersion.getVersion() !=
HoodieLogFormatVersion.DEFAULT_VERSION,
String.format("Parquet block could not be of version (%d)",
HoodieLogFormatVersion.DEFAULT_VERSION));
- return new HoodieParquetDataBlock(() -> getFSDataInputStream(fs,
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
+ return new HoodieParquetDataBlock(() -> getDataInputStream(fs,
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
getTargetReaderSchemaForBlock(), header, footer, keyField);
case DELETE_BLOCK:
- return new HoodieDeleteBlock(content, () -> getFSDataInputStream(fs,
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc),
header, footer);
+ return new HoodieDeleteBlock(content, () -> getDataInputStream(fs,
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc),
header, footer);
case COMMAND_BLOCK:
- return new HoodieCommandBlock(content, () -> getFSDataInputStream(fs,
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc),
header, footer);
+ return new HoodieCommandBlock(content, () -> getDataInputStream(fs,
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc),
header, footer);
case CDC_DATA_BLOCK:
- return new HoodieCDCDataBlock(() -> getFSDataInputStream(fs,
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
readerSchema, header, keyField);
+ return new HoodieCDCDataBlock(() -> getDataInputStream(fs,
this.logFile, bufferSize), content, readBlockLazily, logBlockContentLoc,
readerSchema, header, keyField);
default:
throw new HoodieNotSupportedException("Unsupported Block " +
blockType);
@@ -270,7 +272,7 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
Option<byte[]> corruptedBytes = HoodieLogBlock.tryReadContent(inputStream,
corruptedBlockSize, readBlockLazily);
HoodieLogBlock.HoodieLogBlockContentLocation logBlockContentLoc =
new HoodieLogBlock.HoodieLogBlockContentLocation(hadoopConf, logFile,
contentPosition, corruptedBlockSize, nextBlockOffset);
- return new HoodieCorruptBlock(corruptedBytes, () ->
getFSDataInputStream(fs, this.logFile, bufferSize), readBlockLazily,
Option.of(logBlockContentLoc), new HashMap<>(), new HashMap<>());
+ return new HoodieCorruptBlock(corruptedBytes, () -> getDataInputStream(fs,
this.logFile, bufferSize), readBlockLazily, Option.of(logBlockContentLoc), new
HashMap<>(), new HashMap<>());
}
private boolean isBlockCorrupted(int blocksize) throws IOException {
@@ -474,9 +476,23 @@ public class HoodieLogFileReader implements
HoodieLogFormat.Reader {
throw new UnsupportedOperationException("Remove not supported for
HoodieLogFileReader");
}
+ /**
+ * Fetch the right {@link SeekableDataInputStream} to be used by wrapping
with required input streams.
+ *
+ * @param fs instance of {@link FileSystem} in use.
+ * @param bufferSize buffer size to be used.
+ * @return the right {@link SeekableDataInputStream} as required.
+ */
+ private static SeekableDataInputStream getDataInputStream(FileSystem fs,
+ HoodieLogFile
logFile,
+ int bufferSize) {
+ return new HadoopSeekableDataInputStream(getFSDataInputStream(fs, logFile,
bufferSize));
+ }
+
/**
* Fetch the right {@link FSDataInputStream} to be used by wrapping with
required input streams.
- * @param fs instance of {@link FileSystem} in use.
+ *
+ * @param fs instance of {@link FileSystem} in use.
* @param bufferSize buffer size to be used.
* @return the right {@link FSDataInputStream} as required.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index ce3e23d50e7..590d9a17a0d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -29,6 +29,7 @@ import
org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
@@ -40,7 +41,6 @@ import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
import javax.annotation.Nonnull;
@@ -75,7 +75,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
private final ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
- public HoodieAvroDataBlock(Supplier<FSDataInputStream> inputStreamSupplier,
+ public HoodieAvroDataBlock(Supplier<SeekableDataInputStream>
inputStreamSupplier,
Option<byte[]> content,
boolean readBlockLazily,
HoodieLogBlockContentLocation
logBlockContentLocation,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
index 814e75821b5..f32eeb0dfb7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCDCDataBlock.java
@@ -20,9 +20,9 @@ package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FSDataInputStream;
import java.util.HashMap;
import java.util.List;
@@ -35,7 +35,7 @@ import java.util.function.Supplier;
public class HoodieCDCDataBlock extends HoodieAvroDataBlock {
public HoodieCDCDataBlock(
- Supplier<FSDataInputStream> inputStreamSupplier,
+ Supplier<SeekableDataInputStream> inputStreamSupplier,
Option<byte[]> content,
boolean readBlockLazily,
HoodieLogBlockContentLocation logBlockContentLocation,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
index ed5338344ad..deeb903cd18 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCommandBlock.java
@@ -19,8 +19,7 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.util.Option;
-
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
import java.util.HashMap;
import java.util.Map;
@@ -44,7 +43,7 @@ public class HoodieCommandBlock extends HoodieLogBlock {
this(Option.empty(), null, false, Option.empty(), header, new HashMap<>());
}
- public HoodieCommandBlock(Option<byte[]> content,
Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily,
+ public HoodieCommandBlock(Option<byte[]> content,
Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation>
blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, content, inputStreamSupplier,
readBlockLazily);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
index 928ae780ee6..19d704c2595 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieCorruptBlock.java
@@ -19,8 +19,7 @@
package org.apache.hudi.common.table.log.block;
import org.apache.hudi.common.util.Option;
-
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
import java.io.IOException;
import java.util.Map;
@@ -32,7 +31,7 @@ import java.util.function.Supplier;
*/
public class HoodieCorruptBlock extends HoodieLogBlock {
- public HoodieCorruptBlock(Option<byte[]> corruptedBytes,
Supplier<FSDataInputStream> inputStreamSupplier, boolean readBlockLazily,
+ public HoodieCorruptBlock(Option<byte[]> corruptedBytes,
Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation>
blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
super(header, footer, blockContentLocation, corruptedBytes,
inputStreamSupplier, readBlockLazily);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index ec4d548de23..1b024a3b530 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -24,9 +24,9 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,7 +110,7 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
* NOTE: This ctor is used on the write-path (ie when records ought to be
written into the log)
*/
protected HoodieDataBlock(Option<byte[]> content,
- Supplier<FSDataInputStream> inputStreamSupplier,
+ Supplier<SeekableDataInputStream>
inputStreamSupplier,
boolean readBlockLazily,
Option<HoodieLogBlockContentLocation>
blockContentLocation,
Option<Schema> readerSchema,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index b0b1a76d42d..a55f4f1e623 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SerializationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.util.Lazy;
import org.apache.avro.io.BinaryDecoder;
@@ -37,7 +38,6 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,14 +97,14 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
this.recordsToDelete =
recordsToDelete.stream().map(Pair::getLeft).toArray(DeleteRecord[]::new);
}
- public HoodieDeleteBlock(Option<byte[]> content, Supplier<FSDataInputStream>
inputStreamSupplier, boolean readBlockLazily,
+ public HoodieDeleteBlock(Option<byte[]> content,
Supplier<SeekableDataInputStream> inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation>
blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer) {
// Setting `shouldWriteRecordPositions` to false as this constructor is
only used by the reader
this(content, inputStreamSupplier, readBlockLazily, blockContentLocation,
header, footer, false);
}
- HoodieDeleteBlock(Option<byte[]> content, Supplier<FSDataInputStream>
inputStreamSupplier, boolean readBlockLazily,
+ HoodieDeleteBlock(Option<byte[]> content, Supplier<SeekableDataInputStream>
inputStreamSupplier, boolean readBlockLazily,
Option<HoodieLogBlockContentLocation>
blockContentLocation, Map<HeaderMetadataType, String> header,
Map<HeaderMetadataType, String> footer, boolean
shouldWriteRecordPositions) {
super(header, footer, blockContentLocation, content, inputStreamSupplier,
readBlockLazily);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 86cfe91f0f7..fa447063aa4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -33,6 +33,7 @@ import
org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -42,7 +43,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -83,7 +83,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
private final Path pathForReader;
private final HoodieConfig hFileReaderConfig;
- public HoodieHFileDataBlock(Supplier<FSDataInputStream> inputStreamSupplier,
+ public HoodieHFileDataBlock(Supplier<SeekableDataInputStream>
inputStreamSupplier,
Option<byte[]> content,
boolean readBlockLazily,
HoodieLogBlockContentLocation
logBlockContentLocation,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
index 2821c132d6b..b9c1063083e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
@@ -25,9 +25,9 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypeUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +36,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -68,10 +67,7 @@ public abstract class HoodieLogBlock {
private final Option<HoodieLogBlockContentLocation> blockContentLocation;
// data for a specific block
private Option<byte[]> content;
- // TODO : change this to just InputStream so this works for any FileSystem
- // create handlers to return specific type of inputstream based on FS
- // input stream corresponding to the log file where this logBlock belongs
- private final Supplier<FSDataInputStream> inputStreamSupplier;
+ private final Supplier<SeekableDataInputStream> inputStreamSupplier;
// Toggle flag, whether to read blocks lazily (I/O intensive) or not (Memory
intensive)
protected boolean readBlockLazily;
@@ -80,7 +76,7 @@ public abstract class HoodieLogBlock {
@Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation,
@Nonnull Option<byte[]> content,
- @Nullable Supplier<FSDataInputStream> inputStreamSupplier,
+ @Nullable Supplier<SeekableDataInputStream> inputStreamSupplier,
boolean readBlockLazily) {
this.logBlockHeader = logBlockHeader;
this.logBlockFooter = logBlockFooter;
@@ -265,7 +261,7 @@ public abstract class HoodieLogBlock {
/**
* Convert bytes to LogMetadata, follow the same order as {@link
HoodieLogBlock#getLogMetadataBytes}.
*/
- public static Map<HeaderMetadataType, String> getLogMetadata(DataInputStream
dis) throws IOException {
+ public static Map<HeaderMetadataType, String>
getLogMetadata(SeekableDataInputStream dis) throws IOException {
Map<HeaderMetadataType, String> metadata = new HashMap<>();
// 1. Read the metadata written out
@@ -289,7 +285,7 @@ public abstract class HoodieLogBlock {
* Read or Skip block content of a log block in the log file. Depends on
lazy reading enabled in
* {@link HoodieMergedLogRecordScanner}
*/
- public static Option<byte[]> tryReadContent(FSDataInputStream inputStream,
Integer contentLength, boolean readLazily)
+ public static Option<byte[]> tryReadContent(SeekableDataInputStream
inputStream, Integer contentLength, boolean readLazily)
throws IOException {
if (readLazily) {
// Seek to the end of the content block
@@ -311,7 +307,7 @@ public abstract class HoodieLogBlock {
checkState(!content.isPresent(), "Block has already been inflated");
checkState(inputStreamSupplier != null, "Block should have input-stream
provided");
- try (FSDataInputStream inputStream = inputStreamSupplier.get()) {
+ try (SeekableDataInputStream inputStream = inputStreamSupplier.get()) {
content = Option.of(new byte[(int)
this.getBlockContentLocation().get().getBlockSize()]);
inputStream.seek(this.getBlockContentLocation().get().getContentPositionInLogFile());
inputStream.readFully(content.get(), 0, content.get().length);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
index 1b8880c4a0d..485d70890b4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
@@ -27,13 +27,13 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieFileWriter;
import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
@@ -64,7 +64,7 @@ public class HoodieParquetDataBlock extends HoodieDataBlock {
private final Option<Double> expectedCompressionRatio;
private final Option<Boolean> useDictionaryEncoding;
- public HoodieParquetDataBlock(Supplier<FSDataInputStream>
inputStreamSupplier,
+ public HoodieParquetDataBlock(Supplier<SeekableDataInputStream>
inputStreamSupplier,
Option<byte[]> content,
boolean readBlockLazily,
HoodieLogBlockContentLocation
logBlockContentLocation,
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java
new file mode 100644
index 00000000000..ae10ca0ac3f
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopSeekableDataInputStream.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.fs;
+
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.IOException;
+
+/**
+ * An implementation of {@link SeekableDataInputStream} based on Hadoop's
{@link FSDataInputStream}
+ */
+public class HadoopSeekableDataInputStream extends SeekableDataInputStream {
+ private final FSDataInputStream stream;
+
+ public HadoopSeekableDataInputStream(FSDataInputStream stream) {
+ super(stream);
+ this.stream = stream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPos();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ stream.seek(pos);
+ }
+}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java
b/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java
new file mode 100644
index 00000000000..c76fd3be32d
--- /dev/null
+++ b/hudi-io/src/main/java/org/apache/hudi/io/SeekableDataInputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link InputStream} that supports random access by allowing to seek to
+ * an arbitrary position within the stream and read the content.
+ */
+public abstract class SeekableDataInputStream extends DataInputStream {
+ /**
+ * Creates a DataInputStream that uses the specified
+ * underlying InputStream.
+ *
+ * @param in the specified input stream
+ */
+ public SeekableDataInputStream(InputStream in) {
+ super(in);
+ }
+
+ /**
+ * @return current position of the stream. The next read() will be from that
location.
+ */
+ public abstract long getPos() throws IOException;
+
+ /**
+ * Seeks to a position within the stream.
+ *
+ * @param pos target position to seek to.
+ * @throws IOException upon error.
+ */
+ public abstract void seek(long pos) throws IOException;
+}