This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 80f9f1ef36c [HUDI-7410] Use SeekableDataInputStream as the input of
native HFile reader (#10673)
80f9f1ef36c is described below
commit 80f9f1ef36c0e7953a13ee4b433a6afc623ad4cc
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Feb 15 15:26:02 2024 -0800
[HUDI-7410] Use SeekableDataInputStream as the input of native HFile reader
(#10673)
---
.../bootstrap/index/HFileBootstrapIndex.java | 5 ++-
.../io/storage/HoodieNativeAvroHFileReader.java | 11 +++--
.../TestInLineFileSystemWithHFileReader.java | 8 ++--
.../hudi/io/ByteArraySeekableDataInputStream.java | 47 ++++++++++++++++++++++
.../org/apache/hudi/io/hfile/HFileBlockReader.java | 6 +--
.../org/apache/hudi/io/hfile/HFileReaderImpl.java | 8 ++--
.../org/apache/hudi/io/hfile/TestHFileReader.java | 38 +----------------
7 files changed, 71 insertions(+), 52 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
index 989b0ad1e6d..7a6de5fe994 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
@@ -33,6 +33,8 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.hfile.HFileReader;
import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.io.hfile.Key;
@@ -41,7 +43,6 @@ import org.apache.hudi.io.storage.HoodieHFileUtils;
import org.apache.hudi.io.util.IOUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellComparatorImpl;
@@ -238,7 +239,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
LOG.info("Opening HFile for reading :" + hFilePath);
Path path = new Path(hFilePath);
long fileSize = fileSystem.getFileStatus(path).getLen();
- FSDataInputStream stream = fileSystem.open(path);
+ SeekableDataInputStream stream = new
HadoopSeekableDataInputStream(fileSystem.open(path));
return new HFileReaderImpl(stream, fileSize);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index cc3833996b9..e760b33b9e2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -28,9 +28,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.hfile.HFileReader;
import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.io.hfile.KeyValue;
@@ -41,7 +45,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -256,15 +259,15 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
}
private HFileReader newHFileReader() throws IOException {
- FSDataInputStream inputStream;
+ SeekableDataInputStream inputStream;
long fileSize;
if (path.isPresent()) {
FileSystem fs = HadoopFSUtils.getFs(path.get(), conf);
fileSize = fs.getFileStatus(path.get()).getLen();
- inputStream = fs.open(path.get());
+ inputStream = new HadoopSeekableDataInputStream(fs.open(path.get()));
} else {
fileSize = bytesContent.get().length;
- inputStream = new FSDataInputStream(new
SeekableByteArrayInputStream(bytesContent.get()));
+ inputStream = new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(bytesContent.get()));
}
return new HFileReaderImpl(inputStream, fileSize);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
index 2ae8fd2f651..91649c68bd9 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
@@ -20,7 +20,9 @@
package org.apache.hudi.common.fs.inline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
+import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.io.hfile.HFileReader;
import org.apache.hudi.io.hfile.HFileReaderImpl;
import org.apache.hudi.io.hfile.Key;
@@ -28,7 +30,6 @@ import org.apache.hudi.io.hfile.KeyValue;
import org.apache.hudi.io.hfile.UTF8StringKey;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
@@ -51,8 +52,9 @@ public class TestInLineFileSystemWithHFileReader extends
TestInLineFileSystemHFi
Path inlinePath,
int maxRows) throws IOException {
long fileSize = inlineFileSystem.getFileStatus(inlinePath).getLen();
- try (FSDataInputStream fin = inlineFileSystem.open(inlinePath)) {
- try (HFileReader reader = new HFileReaderImpl(fin, fileSize)) {
+ try (SeekableDataInputStream stream =
+ new
HadoopSeekableDataInputStream(inlineFileSystem.open(inlinePath))) {
+ try (HFileReader reader = new HFileReaderImpl(stream, fileSize)) {
// Align scanner at start of the file.
reader.seekTo();
readAllRecords(reader, maxRows);
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java
b/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java
new file mode 100644
index 00000000000..5ebe3a1729b
--- /dev/null
+++
b/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link SeekableDataInputStream} based on byte array
+ */
+public class ByteArraySeekableDataInputStream extends SeekableDataInputStream {
+
+ ByteBufferBackedInputStream stream;
+
+ public ByteArraySeekableDataInputStream(ByteBufferBackedInputStream stream) {
+ super(stream);
+ this.stream = stream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return stream.getPosition();
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ stream.seek(pos);
+ }
+}
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java
index bcc1afb64ce..26103a4b391 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java
@@ -19,7 +19,7 @@
package org.apache.hudi.io.hfile;
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -30,7 +30,7 @@ import java.io.IOException;
public class HFileBlockReader {
private final HFileContext context;
private final long streamStartOffset;
- private final FSDataInputStream stream;
+ private final SeekableDataInputStream stream;
private final byte[] byteBuff;
private int offset;
private boolean isReadFully = false;
@@ -44,7 +44,7 @@ public class HFileBlockReader {
* @param endOffset end offset to stop at.
*/
public HFileBlockReader(HFileContext context,
- FSDataInputStream stream,
+ SeekableDataInputStream stream,
long startOffset,
long endOffset) {
this.context = context;
diff --git
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
index 87dafc9d886..564dd98eb64 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
@@ -20,8 +20,8 @@
package org.apache.hudi.io.hfile;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.logging.log4j.util.Strings;
import java.io.ByteArrayInputStream;
@@ -38,7 +38,7 @@ import static
org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
* An implementation a {@link HFileReader}.
*/
public class HFileReaderImpl implements HFileReader {
- private final FSDataInputStream stream;
+ private final SeekableDataInputStream stream;
private final long fileSize;
private final HFileCursor cursor;
@@ -51,7 +51,7 @@ public class HFileReaderImpl implements HFileReader {
private Option<BlockIndexEntry> currentDataBlockEntry;
private Option<HFileDataBlock> currentDataBlock;
- public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+ public HFileReaderImpl(SeekableDataInputStream stream, long fileSize) {
this.stream = stream;
this.fileSize = fileSize;
this.cursor = new HFileCursor();
@@ -255,7 +255,7 @@ public class HFileReaderImpl implements HFileReader {
* @return {@link HFileTrailer} instance.
* @throws IOException upon error.
*/
- private static HFileTrailer readTrailer(FSDataInputStream stream,
+ private static HFileTrailer readTrailer(SeekableDataInputStream stream,
long fileSize) throws IOException {
int bufferSize = HFileTrailer.getTrailerSize();
long seekPos = fileSize - bufferSize;
diff --git
a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java
b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java
index d9a1969c75d..ef7d1c3fc75 100644
--- a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java
+++ b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java
@@ -21,10 +21,8 @@ package org.apache.hudi.io.hfile;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -439,7 +437,7 @@ public class TestHFileReader {
public static HFileReader getHFileReader(String filename) throws IOException
{
byte[] content = readHFileFromResources(filename);
return new HFileReaderImpl(
- new FSDataInputStream(new SeekableByteArrayInputStream(content)),
content.length);
+ new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(content)), content.length);
}
private static void verifyHFileRead(String filename,
@@ -604,36 +602,4 @@ public class TestHFileReader {
return expectedValue;
}
}
-
- static class SeekableByteArrayInputStream extends
ByteBufferBackedInputStream implements Seekable,
- PositionedReadable {
- public SeekableByteArrayInputStream(byte[] buf) {
- super(buf);
- }
-
- @Override
- public long getPos() throws IOException {
- return getPosition();
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
-
- @Override
- public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
- return copyFrom(position, buffer, offset, length);
- }
-
- @Override
- public void readFully(long position, byte[] buffer) throws IOException {
- read(position, buffer, 0, buffer.length);
- }
-
- @Override
- public void readFully(long position, byte[] buffer, int offset, int
length) throws IOException {
- read(position, buffer, offset, length);
- }
- }
}