AHeise commented on a change in pull request #13885:
URL: https://github.com/apache/flink/pull/13885#discussion_r534009700



##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;
+
+       // read offset of buf
+       private int pos;
+
+       // availed count of buf
+       private int count;
+
+       private final FSDataInputStream inputStream;
+
+       private boolean closed;
+
+       public FSDataBufferedInputStream(FSDataInputStream inputStream) {
+               this(inputStream, DEFAULT_BUFFER_SIZE);
+       }
+

Review comment:
       This constructor is only used in tests, I'd remove it here and add the 
constant to the test.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;
+
+       // read offset of buf
+       private int pos;
+
+       // availed count of buf
+       private int count;
+
+       private final FSDataInputStream inputStream;
+
+       private boolean closed;
+
+       public FSDataBufferedInputStream(FSDataInputStream inputStream) {
+               this(inputStream, DEFAULT_BUFFER_SIZE);
+       }
+
+       public FSDataBufferedInputStream(
+               FSDataInputStream inputStream,
+               int bufferSize) {

Review comment:
       nit: That should fit on one line.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;
+
+       // read offset of buf
+       private int pos;
+
+       // availed count of buf
+       private int count;
+
+       private final FSDataInputStream inputStream;
+
+       private boolean closed;
+
+       public FSDataBufferedInputStream(FSDataInputStream inputStream) {
+               this(inputStream, DEFAULT_BUFFER_SIZE);
+       }
+
+       public FSDataBufferedInputStream(
+               FSDataInputStream inputStream,
+               int bufferSize) {
+               this.inputStream = inputStream;
+
+               Preconditions.checkState(bufferSize > 0, "bufferSize must > 0");
+               this.buf = new byte[bufferSize];
+
+               this.pos = 0;
+               this.count = 0;
+               this.closed = false;
+       }
+
+       @Override
+       public void seek(long desired) throws IOException {
+               long streamPos = inputStream.getPos();
+               long bufStartPos = streamPos - count;
+               if (bufStartPos <= desired && desired < streamPos) {
+                       this.pos = (int) (desired - bufStartPos);
+                       return;
+               }
+               inputStream.seek(desired);
+               this.pos = 0;
+               this.count = 0;
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               int avail = count - pos;
+               return inputStream.getPos() - avail;
+       }
+
+       @Override
+       public int read() throws IOException {
+               if (pos >= count) {
+                       fill();
+                       if (pos >= count) {
+                               return -1;
+                       }
+               }
+               return buf[pos++] & 0xff;
+       }
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+                       throw new IndexOutOfBoundsException();
+               } else if (len == 0) {
+                       return 0;
+               }
+
+               int n = 0;
+               for (; ; ) {
+                       int nread = read1(b, off + n, len - n);
+                       if (nread <= 0) {
+                               return (n == 0) ? nread : n;
+                       }
+                       n += nread;
+                       if (n >= len) {
+                               return n;
+                       }
+                       // if not closed but no bytes available, return
+                       InputStream input = inputStream;
+                       if (input != null && input.available() <= 0) {
+                               return n;
+                       }
+               }
+       }
+
+       /**
+        * Read characters into a portion of an array, reading from the 
underlying
+        * stream at most once if necessary.
+        */
+       private int read1(byte[] b, int off, int len) throws IOException {
+               int avail = count - pos;
+               if (avail <= 0) {
+            /* If the requested length is at least as large as the buffer,
+               do not bother to copy the bytes into the local buffer.
+               In this way buffered streams will cascade harmlessly. */
+                       if (len >= buf.length) {
+                               return inputStream.read(b, off, len);
+                       }
+                       fill();
+                       avail = count - pos;
+                       if (avail <= 0) {
+                               return -1;
+                       }
+               }
+               int cnt = Math.min(avail, len);
+               System.arraycopy(buf, pos, b, off, cnt);
+               pos += cnt;
+               return cnt;
+       }
+
+       @Override
+       public long skip(long n) throws IOException {
+               if (n <= 0) {
+                       return 0;
+               }
+               long avail = count - pos;
+
+               if (avail <= 0) {
+                       // Fill in buffer to save bytes for reset
+                       fill();
+                       avail = count - pos;
+                       if (avail <= 0) {
+                               return 0;
+                       }
+               }
+
+               long skipped = Math.min(avail, n);
+               pos += skipped;
+               return skipped;
+       }
+
+       @Override
+       public int available() throws IOException {
+               int avail = count - pos;
+               return inputStream.available() + avail;
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (closed) {
+                       return;
+               }
+               closed = true;
+               if (inputStream != null) {
+                       inputStream.close();
+               }
+       }

Review comment:
       I'd set `inputStream = null` after closing. Then you don't need `closed` 
at all - you could simply check if `inputStream != null`.

##########
File path: 
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
##########
@@ -61,4 +72,61 @@ public void testCreateHadoopFsWithMissingAuthority() throws 
Exception {
                        assertTrue(e.getMessage().contains("authority"));
                }
        }
+
+       @Test
+       public void testReadBufferSize() throws Exception {
+               final URI uri = URI.create("hdfs://localhost:12345/");
+               final Path path = new Path(uri.getPath());
+
+               org.apache.hadoop.fs.FileSystem hadoopFs = 
mock(org.apache.hadoop.fs.FileSystem.class);
+               org.apache.hadoop.fs.FSDataInputStream hadoopInputStream =
+                       mock(org.apache.hadoop.fs.FSDataInputStream.class);
+               when(hadoopFs.open(isA(org.apache.hadoop.fs.Path.class), 
anyInt()))
+                       .thenReturn(hadoopInputStream);
+

Review comment:
       We actually want to move away from mocking with mockito as far as 
possible.
   I'm wondering what's the benefit over using `file://` to some random (or 
temp) file?

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;

Review comment:
       Couldn't that be private?

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;
+
+       // read offset of buf
+       private int pos;
+
+       // availed count of buf
+       private int count;
+
+       private final FSDataInputStream inputStream;
+
+       private boolean closed;
+
+       public FSDataBufferedInputStream(FSDataInputStream inputStream) {
+               this(inputStream, DEFAULT_BUFFER_SIZE);
+       }
+
+       public FSDataBufferedInputStream(
+               FSDataInputStream inputStream,
+               int bufferSize) {
+               this.inputStream = inputStream;

Review comment:
       this.inputStream = Preconditions.checkNotNull(inputStream);

##########
File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
##########
@@ -116,17 +135,19 @@ public FileStatus getFileStatus(final Path f) throws 
IOException {
        }
 
        @Override
-       public HadoopDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
+       public FSDataInputStream open(final Path f, final int bufferSize) 
throws IOException {
                final org.apache.hadoop.fs.Path path = toHadoopPath(f);
                final org.apache.hadoop.fs.FSDataInputStream fdis = 
this.fs.open(path, bufferSize);
-               return new HadoopDataInputStream(fdis);
+               HadoopDataInputStream inputStream = new 
HadoopDataInputStream(fdis);
+               if (readBufferSize <= 0) {

Review comment:
       `readBufferSize == 0` is clearer -- it cannot be negative because of 
your ctor check. Also I'd add a constant for `0`. (`NO_BUFFER`, `NO_BUFFERING`, 
...)

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;
+
+       // read offset of buf
+       private int pos;
+
+       // availed count of buf
+       private int count;
+
+       private final FSDataInputStream inputStream;
+
+       private boolean closed;
+
+       public FSDataBufferedInputStream(FSDataInputStream inputStream) {
+               this(inputStream, DEFAULT_BUFFER_SIZE);
+       }
+
+       public FSDataBufferedInputStream(
+               FSDataInputStream inputStream,
+               int bufferSize) {
+               this.inputStream = inputStream;
+
+               Preconditions.checkState(bufferSize > 0, "bufferSize must > 0");
+               this.buf = new byte[bufferSize];
+
+               this.pos = 0;
+               this.count = 0;
+               this.closed = false;
+       }
+
+       @Override
+       public void seek(long desired) throws IOException {
+               long streamPos = inputStream.getPos();
+               long bufStartPos = streamPos - count;
+               if (bufStartPos <= desired && desired < streamPos) {
+                       this.pos = (int) (desired - bufStartPos);
+                       return;
+               }
+               inputStream.seek(desired);
+               this.pos = 0;
+               this.count = 0;
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               int avail = count - pos;
+               return inputStream.getPos() - avail;
+       }
+
+       @Override
+       public int read() throws IOException {
+               if (pos >= count) {
+                       fill();
+                       if (pos >= count) {
+                               return -1;
+                       }
+               }
+               return buf[pos++] & 0xff;
+       }
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+                       throw new IndexOutOfBoundsException();
+               } else if (len == 0) {
+                       return 0;
+               }
+
+               int n = 0;
+               for (; ; ) {
+                       int nread = read1(b, off + n, len - n);
+                       if (nread <= 0) {
+                               return (n == 0) ? nread : n;
+                       }
+                       n += nread;
+                       if (n >= len) {
+                               return n;
+                       }
+                       // if not closed but no bytes available, return
+                       InputStream input = inputStream;
+                       if (input != null && input.available() <= 0) {
+                               return n;
+                       }
+               }
+       }
+
+       /**
+        * Read characters into a portion of an array, reading from the 
underlying
+        * stream at most once if necessary.
+        */
+       private int read1(byte[] b, int off, int len) throws IOException {
+               int avail = count - pos;
+               if (avail <= 0) {
+            /* If the requested length is at least as large as the buffer,
+               do not bother to copy the bytes into the local buffer.
+               In this way buffered streams will cascade harmlessly. */
+                       if (len >= buf.length) {
+                               return inputStream.read(b, off, len);
+                       }
+                       fill();
+                       avail = count - pos;
+                       if (avail <= 0) {
+                               return -1;
+                       }
+               }
+               int cnt = Math.min(avail, len);
+               System.arraycopy(buf, pos, b, off, cnt);
+               pos += cnt;
+               return cnt;
+       }
+
+       @Override
+       public long skip(long n) throws IOException {
+               if (n <= 0) {
+                       return 0;
+               }
+               long avail = count - pos;
+
+               if (avail <= 0) {
+                       // Fill in buffer to save bytes for reset

Review comment:
       It feels as if we could improve `skip` by using `inputStream.skip`. 
Currently, `skip` is pretty much just `read` with discarding bytes. At least, 
S3InputStream and DFSInputStream have special implementations for skip.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;
+
+       // read offset of buf
+       private int pos;
+
+       // availed count of buf
+       private int count;
+
+       private final FSDataInputStream inputStream;
+
+       private boolean closed;
+
+       public FSDataBufferedInputStream(FSDataInputStream inputStream) {
+               this(inputStream, DEFAULT_BUFFER_SIZE);
+       }
+
+       public FSDataBufferedInputStream(
+               FSDataInputStream inputStream,
+               int bufferSize) {
+               this.inputStream = inputStream;
+
+               Preconditions.checkState(bufferSize > 0, "bufferSize must > 0");
+               this.buf = new byte[bufferSize];
+
+               this.pos = 0;
+               this.count = 0;
+               this.closed = false;
+       }
+
+       @Override
+       public void seek(long desired) throws IOException {

Review comment:
       All operations should check if the stream has already been closed. I'd 
add `checkClosed();` as the first line to pretty much all methods.
   See also my comment below - you don't need `closed`. `checkClosed could 
throw an IllegalStateException whenever, `inputStream == null`.

##########
File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
##########
@@ -51,7 +55,22 @@
         * @param hadoopFileSystem The Hadoop FileSystem that will be used 
under the hood.
         */
        public HadoopFileSystem(org.apache.hadoop.fs.FileSystem 
hadoopFileSystem) {
+               this(hadoopFileSystem, 0);
+       }
+
+       /**
+        * Wraps the given Hadoop File System object as a Flink File System 
object.
+        * The given Hadoop file system object is expected to be initialized 
already.
+        *
+        * @param hadoopFileSystem The Hadoop FileSystem that will be used 
under the hood.
+        * @param readBufferSize The size of the buffer to be used.
+        */
+       public HadoopFileSystem(org.apache.hadoop.fs.FileSystem 
hadoopFileSystem,
+                                                       int readBufferSize) {

Review comment:
       nit: formatting (either bring on one line, or chop down both parameters 
with double indent.

##########
File path: 
flink-core/src/main/java/org/apache/flink/core/fs/FSDataBufferedInputStream.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * FSDataInputStream with Buffer, only support single thread.
+ * <p>
+ * The design of FSDataBufferedInputStream refers to {@link 
java.io.BufferedInputStream}.
+ * Compared to {@link java.io.BufferedInputStream}, the seek and getPos 
interfaces are mainly added.
+ */
+public class FSDataBufferedInputStream extends FSDataInputStream {
+
+       private static final int DEFAULT_BUFFER_SIZE = 8192;
+
+       protected byte[] buf;
+
+       // read offset of buf
+       private int pos;
+
+       // availed count of buf
+       private int count;
+
+       private final FSDataInputStream inputStream;
+
+       private boolean closed;
+
+       public FSDataBufferedInputStream(FSDataInputStream inputStream) {
+               this(inputStream, DEFAULT_BUFFER_SIZE);
+       }
+
+       public FSDataBufferedInputStream(
+               FSDataInputStream inputStream,
+               int bufferSize) {
+               this.inputStream = inputStream;
+
+               Preconditions.checkState(bufferSize > 0, "bufferSize must > 0");
+               this.buf = new byte[bufferSize];
+
+               this.pos = 0;
+               this.count = 0;
+               this.closed = false;
+       }
+
+       @Override
+       public void seek(long desired) throws IOException {
+               long streamPos = inputStream.getPos();
+               long bufStartPos = streamPos - count;
+               if (bufStartPos <= desired && desired < streamPos) {
+                       this.pos = (int) (desired - bufStartPos);
+                       return;
+               }
+               inputStream.seek(desired);
+               this.pos = 0;
+               this.count = 0;
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               int avail = count - pos;
+               return inputStream.getPos() - avail;
+       }
+
+       @Override
+       public int read() throws IOException {
+               if (pos >= count) {
+                       fill();
+                       if (pos >= count) {
+                               return -1;
+                       }
+               }
+               return buf[pos++] & 0xff;
+       }
+
+       @Override
+       public int read(byte[] b, int off, int len) throws IOException {
+               if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+                       throw new IndexOutOfBoundsException();
+               } else if (len == 0) {
+                       return 0;
+               }
+
+               int n = 0;
+               for (; ; ) {
+                       int nread = read1(b, off + n, len - n);
+                       if (nread <= 0) {
+                               return (n == 0) ? nread : n;
+                       }
+                       n += nread;
+                       if (n >= len) {
+                               return n;
+                       }
+                       // if not closed but no bytes available, return
+                       InputStream input = inputStream;
+                       if (input != null && input.available() <= 0) {
+                               return n;
+                       }
+               }
+       }
+
+       /**
+        * Read characters into a portion of an array, reading from the 
underlying
+        * stream at most once if necessary.
+        */
+       private int read1(byte[] b, int off, int len) throws IOException {
+               int avail = count - pos;
+               if (avail <= 0) {
+            /* If the requested length is at least as large as the buffer,
+               do not bother to copy the bytes into the local buffer.
+               In this way buffered streams will cascade harmlessly. */
+                       if (len >= buf.length) {
+                               return inputStream.read(b, off, len);
+                       }
+                       fill();
+                       avail = count - pos;
+                       if (avail <= 0) {
+                               return -1;
+                       }
+               }
+               int cnt = Math.min(avail, len);
+               System.arraycopy(buf, pos, b, off, cnt);
+               pos += cnt;
+               return cnt;
+       }
+
+       @Override
+       public long skip(long n) throws IOException {
+               if (n <= 0) {
+                       return 0;
+               }
+               long avail = count - pos;
+
+               if (avail <= 0) {
+                       // Fill in buffer to save bytes for reset
+                       fill();
+                       avail = count - pos;
+                       if (avail <= 0) {
+                               return 0;
+                       }
+               }
+
+               long skipped = Math.min(avail, n);
+               pos += skipped;
+               return skipped;
+       }
+
+       @Override
+       public int available() throws IOException {
+               int avail = count - pos;
+               return inputStream.available() + avail;
+       }
+
+       @Override
+       public void close() throws IOException {
+               if (closed) {
+                       return;
+               }
+               closed = true;
+               if (inputStream != null) {
+                       inputStream.close();
+               }
+       }
+
+       @VisibleForTesting
+       public int getBufferSize() {
+               return buf.length;
+       }
+
+       private void fill() throws IOException {
+               Preconditions.checkState(pos >= count);
+               count = inputStream.read(buf);

Review comment:
       count can be `-1` here. Please double-check if it's used correctly in 
all places. `getPos` seems to not work for example. `seek` also looks 
suspicious.

##########
File path: 
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
##########
@@ -61,4 +72,61 @@ public void testCreateHadoopFsWithMissingAuthority() throws 
Exception {
                        assertTrue(e.getMessage().contains("authority"));
                }
        }
+
+       @Test
+       public void testReadBufferSize() throws Exception {

Review comment:
       I'd probably divide the test into different cases: One without setting 
property, one with setting it to 0, one with setting it to >0. The additional 
lines should make the tests much easier to read.

##########
File path: 
flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopFsFactoryTest.java
##########
@@ -61,4 +72,61 @@ public void testCreateHadoopFsWithMissingAuthority() throws 
Exception {
                        assertTrue(e.getMessage().contains("authority"));
                }
        }
+
+       @Test
+       public void testReadBufferSize() throws Exception {
+               final URI uri = URI.create("hdfs://localhost:12345/");
+               final Path path = new Path(uri.getPath());
+
+               org.apache.hadoop.fs.FileSystem hadoopFs = 
mock(org.apache.hadoop.fs.FileSystem.class);
+               org.apache.hadoop.fs.FSDataInputStream hadoopInputStream =
+                       mock(org.apache.hadoop.fs.FSDataInputStream.class);
+               when(hadoopFs.open(isA(org.apache.hadoop.fs.Path.class), 
anyInt()))
+                       .thenReturn(hadoopInputStream);
+
+               // default configuration
+               Configuration configuration = new Configuration();
+
+               HadoopFsFactory fsFactory = new HadoopFsFactory();
+               fsFactory.configure(configuration);
+
+               FileSystem fileSystem = fsFactory.create(uri);
+               mockHadoopFsOpen(fileSystem, hadoopFs);

Review comment:
       I hope that we don't need that as well...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to