Repository: flink Updated Branches: refs/heads/master 7ce9a8ff9 -> 9b63f269e
[FLINK-1081] Add getPos() method into FSDataInputStream class Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aa9a503 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aa9a503 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aa9a503 Branch: refs/heads/master Commit: 7aa9a5035c3679ba80ca7d6b29ced5f51d92265e Parents: 7ce9a8f Author: Chiwan Park <[email protected]> Authored: Thu Nov 27 20:01:25 2014 +0900 Committer: Gyula Fora <[email protected]> Committed: Sun Jan 25 18:56:55 2015 +0100 ---------------------------------------------------------------------- .../io/InflaterInputStreamFSInputWrapper.java | 5 ++ .../apache/flink/core/fs/FSDataInputStream.java | 6 +++ .../core/fs/local/LocalDataInputStream.java | 5 ++ .../runtime/fs/hdfs/HadoopDataInputStream.java | 6 +++ .../flink/runtime/fs/s3/S3DataInputStream.java | 55 +++++++++++++++++--- 5 files changed, 71 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java index 250b5f5..c940cbc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java @@ -38,6 +38,11 @@ public class InflaterInputStreamFSInputWrapper extends FSDataInputStream { } @Override + public long getPos() throws IOException { + throw new UnsupportedOperationException("Compressed streams do not support the getPos operation"); + } + + @Override public int read() throws IOException { return inStream.read(); } http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java index 19a34dd..9ebc7f7 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java @@ -45,4 +45,10 @@ public abstract class FSDataInputStream extends InputStream { */ public abstract void seek(long desired) throws IOException; + /** + * Get the current position in the input stream. + * + * @return current position in the input stream + */ + public abstract long getPos() throws IOException; } http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java index 1421ab5..0d5fcf2 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java @@ -57,6 +57,11 @@ public class LocalDataInputStream extends FSDataInputStream { this.fis.getChannel().position(desired); } + @Override + public long getPos() throws IOException { + return this.fis.getChannel().position(); + } + @Override public int read() throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java index 9a606fd..8aa3c57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java @@ -49,6 +49,12 @@ public final class HadoopDataInputStream extends FSDataInputStream { } @Override + public long getPos() throws IOException { + + return fsDataInputStream.getPos(); + } + + @Override public int read() throws IOException { return fsDataInputStream.read(); http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java index 6dbae7d..715f1a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java @@ -32,7 +32,6 @@ import com.amazonaws.services.s3.model.S3Object; /** * This class implements an {@link FSDataInputStream} that downloads its data from Amazon S3 in the background. * Essentially, this class is just a wrapper to the Amazon AWS SDK. - * */ public class S3DataInputStream extends FSDataInputStream { @@ -42,8 +41,18 @@ public class S3DataInputStream extends FSDataInputStream { private final InputStream inputStream; /** + * The current position of input stream. + */ + private long position; + + /** + * The marked position. + */ + private long marked; + + /** * Constructs a new input stream which reads its data from the specified S3 object. - * + * * @param s3Client * the S3 client to connect to Amazon S3. * @param bucket @@ -63,6 +72,8 @@ public class S3DataInputStream extends FSDataInputStream { } this.inputStream = s3o.getObjectContent(); + this.position = 0; + this.marked = 0; } @@ -84,6 +95,7 @@ public class S3DataInputStream extends FSDataInputStream { public void mark(final int readlimit) { this.inputStream.mark(readlimit); + marked = readlimit; } @@ -97,21 +109,36 @@ public class S3DataInputStream extends FSDataInputStream { @Override public int read() throws IOException { - return this.inputStream.read(); + int read = this.inputStream.read(); + if (read != -1) { + ++position; + } + + return read; } @Override public int read(final byte[] b) throws IOException { - return this.inputStream.read(b); + int read = this.inputStream.read(b); + if (read > 0) { + position += read; + } + + return read; } @Override public int read(final byte[] b, final int off, final int len) throws IOException { - return this.inputStream.read(b, off, len); + int read = this.inputStream.read(b, off, len); + if (read > 0) { + position += read; + } + + return read; } @@ -119,12 +146,28 @@ public class S3DataInputStream extends FSDataInputStream { public void reset() throws IOException { this.inputStream.reset(); + position = marked; } @Override public void seek(final long desired) throws IOException { - this.inputStream.skip(desired); + skip(desired); + } + + @Override + public long skip(long n) throws IOException { + long skipped = this.inputStream.skip(n); + if (skipped > 0) { + position += skipped; + } + + return skipped; + } + + @Override + public long getPos() throws IOException { + return position; } }
