[hotfix] [core] Minor code cleanup and correction of javadocs for filesystem input stream classes.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c62776f0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c62776f0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c62776f0 Branch: refs/heads/master Commit: c62776f0f7ac97f6fd484e2a9e0283074d26a444 Parents: 28b37ef Author: Stephan Ewen <[email protected]> Authored: Tue Oct 4 13:29:46 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 5 19:36:13 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/core/fs/FSDataInputStream.java | 8 ---- .../core/fs/local/LocalDataInputStream.java | 41 +++++++------------- .../runtime/fs/hdfs/HadoopDataInputStream.java | 22 +++++------ 3 files changed, 24 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c62776f0/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java index c94a71d..6ce1235 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java @@ -16,13 +16,6 @@ * limitations under the License. */ - -/** - * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache - * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. - */ - package org.apache.flink.core.fs; import org.apache.flink.annotation.Public; @@ -32,7 +25,6 @@ import java.io.InputStream; /** * Interface for a data input stream to a file on a {@link FileSystem}. - * */ @Public public abstract class FSDataInputStream extends InputStream { http://git-wip-us.apache.org/repos/asf/flink/blob/c62776f0/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java index 99ca2c4..e7b2828 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.core.fs.local; import java.io.File; @@ -26,36 +25,31 @@ import java.io.IOException; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.FSDataInputStream; +import javax.annotation.Nonnull; + /** * The <code>LocalDataInputStream</code> class is a wrapper class for a data * input stream to the local file system. - * */ @Internal public class LocalDataInputStream extends FSDataInputStream { - /** - * The file input stream used to read data. - */ - private FileInputStream fis = null; + /** The file input stream used to read data from.*/ + private final FileInputStream fis; /** * Constructs a new <code>LocalDataInputStream</code> object from a given {@link File} object. * - * @param file - * the {@link File} object the data stream is written to - * @throws IOException - * thrown if the data input stream cannot be created + * @param file The File the data stream is read from + * + * @throws IOException Thrown if the data input stream cannot be created. */ - public LocalDataInputStream(final File file) throws IOException { - + public LocalDataInputStream(File file) throws IOException { this.fis = new FileInputStream(file); } - @Override - public void seek(final long desired) throws IOException { - + public void seek(long desired) throws IOException { this.fis.getChannel().position(desired); } @@ -64,37 +58,28 @@ public class LocalDataInputStream extends FSDataInputStream { return this.fis.getChannel().position(); } - @Override public int read() throws IOException { - return this.fis.read(); } - @Override - public int read(final byte[] buffer, final int offset, final int length) throws IOException { - + public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException { return this.fis.read(buffer, offset, length); } - - + @Override public void close() throws IOException { - this.fis.close(); } - - + @Override public int available() throws IOException { return this.fis.available(); } - - + @Override public long skip(final long n) throws IOException { return this.fis.skip(n); } - } http://git-wip-us.apache.org/repos/asf/flink/blob/c62776f0/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java index 6eeeb57..8893ba4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java @@ -22,30 +22,30 @@ import java.io.IOException; import org.apache.flink.core.fs.FSDataInputStream; +import javax.annotation.Nonnull; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** - * Concrete implementation of the {@link FSDataInputStream} for the - * Hadoop Distributed File System. + * Concrete implementation of the {@link FSDataInputStream} for the Hadoop's input streams. + * This supports all file systems supported by Hadoop, such as HDFS and S3 (S3a/S3n). */ public final class HadoopDataInputStream extends FSDataInputStream { private final org.apache.hadoop.fs.FSDataInputStream fsDataInputStream; /** - * Creates a new data input stream from the given HDFS input stream + * Creates a new data input stream from the given Hadoop input stream * - * @param fsDataInputStream - * the HDFS input stream + * @param fsDataInputStream The Hadoop input stream */ public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) { - if (fsDataInputStream == null) { - throw new NullPointerException(); - } - this.fsDataInputStream = fsDataInputStream; + this.fsDataInputStream = checkNotNull(fsDataInputStream); } @Override - public synchronized void seek(long desired) throws IOException { + public void seek(long desired) throws IOException { fsDataInputStream.seek(desired); } @@ -65,7 +65,7 @@ public final class HadoopDataInputStream extends FSDataInputStream { } @Override - public int read(byte[] buffer, int offset, int length) throws IOException { + public int read(@Nonnull byte[] buffer, int offset, int length) throws IOException { return fsDataInputStream.read(buffer, offset, length); }
