This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 80f9f1ef36c [HUDI-7410] Use SeekableDataInputStream as the input of 
native HFile reader (#10673)
80f9f1ef36c is described below

commit 80f9f1ef36c0e7953a13ee4b433a6afc623ad4cc
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Feb 15 15:26:02 2024 -0800

    [HUDI-7410] Use SeekableDataInputStream as the input of native HFile reader 
(#10673)
---
 .../bootstrap/index/HFileBootstrapIndex.java       |  5 ++-
 .../io/storage/HoodieNativeAvroHFileReader.java    | 11 +++--
 .../TestInLineFileSystemWithHFileReader.java       |  8 ++--
 .../hudi/io/ByteArraySeekableDataInputStream.java  | 47 ++++++++++++++++++++++
 .../org/apache/hudi/io/hfile/HFileBlockReader.java |  6 +--
 .../org/apache/hudi/io/hfile/HFileReaderImpl.java  |  8 ++--
 .../org/apache/hudi/io/hfile/TestHFileReader.java  | 38 +----------------
 7 files changed, 71 insertions(+), 52 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
index 989b0ad1e6d..7a6de5fe994 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/bootstrap/index/HFileBootstrapIndex.java
@@ -33,6 +33,8 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.hfile.HFileReader;
 import org.apache.hudi.io.hfile.HFileReaderImpl;
 import org.apache.hudi.io.hfile.Key;
@@ -41,7 +43,6 @@ import org.apache.hudi.io.storage.HoodieHFileUtils;
 import org.apache.hudi.io.util.IOUtils;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparatorImpl;
@@ -238,7 +239,7 @@ public class HFileBootstrapIndex extends BootstrapIndex {
       LOG.info("Opening HFile for reading :" + hFilePath);
       Path path = new Path(hFilePath);
       long fileSize = fileSystem.getFileStatus(path).getLen();
-      FSDataInputStream stream = fileSystem.open(path);
+      SeekableDataInputStream stream = new 
HadoopSeekableDataInputStream(fileSystem.open(path));
       return new HFileReaderImpl(stream, fileSize);
     }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index cc3833996b9..e760b33b9e2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -28,9 +28,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.hfile.HFileReader;
 import org.apache.hudi.io.hfile.HFileReaderImpl;
 import org.apache.hudi.io.hfile.KeyValue;
@@ -41,7 +45,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -256,15 +259,15 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
   }
 
   private HFileReader newHFileReader() throws IOException {
-    FSDataInputStream inputStream;
+    SeekableDataInputStream inputStream;
     long fileSize;
     if (path.isPresent()) {
       FileSystem fs = HadoopFSUtils.getFs(path.get(), conf);
       fileSize = fs.getFileStatus(path.get()).getLen();
-      inputStream = fs.open(path.get());
+      inputStream = new HadoopSeekableDataInputStream(fs.open(path.get()));
     } else {
       fileSize = bytesContent.get().length;
-      inputStream = new FSDataInputStream(new 
SeekableByteArrayInputStream(bytesContent.get()));
+      inputStream = new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(bytesContent.get()));
     }
     return new HFileReaderImpl(inputStream, fileSize);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
index 2ae8fd2f651..91649c68bd9 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/inline/TestInLineFileSystemWithHFileReader.java
@@ -20,7 +20,9 @@
 package org.apache.hudi.common.fs.inline;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.fs.HadoopSeekableDataInputStream;
 import org.apache.hudi.hadoop.fs.inline.InLineFileSystem;
+import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.io.hfile.HFileReader;
 import org.apache.hudi.io.hfile.HFileReaderImpl;
 import org.apache.hudi.io.hfile.Key;
@@ -28,7 +30,6 @@ import org.apache.hudi.io.hfile.KeyValue;
 import org.apache.hudi.io.hfile.UTF8StringKey;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
@@ -51,8 +52,9 @@ public class TestInLineFileSystemWithHFileReader extends 
TestInLineFileSystemHFi
                                       Path inlinePath,
                                       int maxRows) throws IOException {
     long fileSize = inlineFileSystem.getFileStatus(inlinePath).getLen();
-    try (FSDataInputStream fin = inlineFileSystem.open(inlinePath)) {
-      try (HFileReader reader = new HFileReaderImpl(fin, fileSize)) {
+    try (SeekableDataInputStream stream =
+             new 
HadoopSeekableDataInputStream(inlineFileSystem.open(inlinePath))) {
+      try (HFileReader reader = new HFileReaderImpl(stream, fileSize)) {
         // Align scanner at start of the file.
         reader.seekTo();
         readAllRecords(reader, maxRows);
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java
 
b/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java
new file mode 100644
index 00000000000..5ebe3a1729b
--- /dev/null
+++ 
b/hudi-io/src/main/java/org/apache/hudi/io/ByteArraySeekableDataInputStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+
+import java.io.IOException;
+
+/**
+ * Implementation of {@link SeekableDataInputStream} based on byte array
+ */
+public class ByteArraySeekableDataInputStream extends SeekableDataInputStream {
+
+  ByteBufferBackedInputStream stream;
+
+  public ByteArraySeekableDataInputStream(ByteBufferBackedInputStream stream) {
+    super(stream);
+    this.stream = stream;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return stream.getPosition();
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    stream.seek(pos);
+  }
+}
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java
index bcc1afb64ce..26103a4b391 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockReader.java
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.io.hfile;
 
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hudi.io.SeekableDataInputStream;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -30,7 +30,7 @@ import java.io.IOException;
 public class HFileBlockReader {
   private final HFileContext context;
   private final long streamStartOffset;
-  private final FSDataInputStream stream;
+  private final SeekableDataInputStream stream;
   private final byte[] byteBuff;
   private int offset;
   private boolean isReadFully = false;
@@ -44,7 +44,7 @@ public class HFileBlockReader {
    * @param endOffset   end offset to stop at.
    */
   public HFileBlockReader(HFileContext context,
-                          FSDataInputStream stream,
+                          SeekableDataInputStream stream,
                           long startOffset,
                           long endOffset) {
     this.context = context;
diff --git 
a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java 
b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
index 87dafc9d886..564dd98eb64 100644
--- a/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
+++ b/hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java
@@ -20,8 +20,8 @@
 package org.apache.hudi.io.hfile;
 
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.SeekableDataInputStream;
 
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.logging.log4j.util.Strings;
 
 import java.io.ByteArrayInputStream;
@@ -38,7 +38,7 @@ import static 
org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
  * An implementation a {@link HFileReader}.
  */
 public class HFileReaderImpl implements HFileReader {
-  private final FSDataInputStream stream;
+  private final SeekableDataInputStream stream;
   private final long fileSize;
 
   private final HFileCursor cursor;
@@ -51,7 +51,7 @@ public class HFileReaderImpl implements HFileReader {
   private Option<BlockIndexEntry> currentDataBlockEntry;
   private Option<HFileDataBlock> currentDataBlock;
 
-  public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+  public HFileReaderImpl(SeekableDataInputStream stream, long fileSize) {
     this.stream = stream;
     this.fileSize = fileSize;
     this.cursor = new HFileCursor();
@@ -255,7 +255,7 @@ public class HFileReaderImpl implements HFileReader {
    * @return {@link HFileTrailer} instance.
    * @throws IOException upon error.
    */
-  private static HFileTrailer readTrailer(FSDataInputStream stream,
+  private static HFileTrailer readTrailer(SeekableDataInputStream stream,
                                           long fileSize) throws IOException {
     int bufferSize = HFileTrailer.getTrailerSize();
     long seekPos = fileSize - bufferSize;
diff --git 
a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java 
b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java
index d9a1969c75d..ef7d1c3fc75 100644
--- a/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java
+++ b/hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileReader.java
@@ -21,10 +21,8 @@ package org.apache.hudi.io.hfile;
 
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -439,7 +437,7 @@ public class TestHFileReader {
   public static HFileReader getHFileReader(String filename) throws IOException 
{
     byte[] content = readHFileFromResources(filename);
     return new HFileReaderImpl(
-        new FSDataInputStream(new SeekableByteArrayInputStream(content)), 
content.length);
+        new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(content)), content.length);
   }
 
   private static void verifyHFileRead(String filename,
@@ -604,36 +602,4 @@ public class TestHFileReader {
       return expectedValue;
     }
   }
-
-  static class SeekableByteArrayInputStream extends 
ByteBufferBackedInputStream implements Seekable,
-      PositionedReadable {
-    public SeekableByteArrayInputStream(byte[] buf) {
-      super(buf);
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return getPosition();
-    }
-
-    @Override
-    public boolean seekToNewSource(long targetPos) throws IOException {
-      return false;
-    }
-
-    @Override
-    public int read(long position, byte[] buffer, int offset, int length) 
throws IOException {
-      return copyFrom(position, buffer, offset, length);
-    }
-
-    @Override
-    public void readFully(long position, byte[] buffer) throws IOException {
-      read(position, buffer, 0, buffer.length);
-    }
-
-    @Override
-    public void readFully(long position, byte[] buffer, int offset, int 
length) throws IOException {
-      read(position, buffer, offset, length);
-    }
-  }
 }

Reply via email to