Repository: flink
Updated Branches:
  refs/heads/master 7ce9a8ff9 -> 9b63f269e


[FLINK-1081] Add getPos() method into FSDataInputStream class


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7aa9a503
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7aa9a503
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7aa9a503

Branch: refs/heads/master
Commit: 7aa9a5035c3679ba80ca7d6b29ced5f51d92265e
Parents: 7ce9a8f
Author: Chiwan Park <[email protected]>
Authored: Thu Nov 27 20:01:25 2014 +0900
Committer: Gyula Fora <[email protected]>
Committed: Sun Jan 25 18:56:55 2015 +0100

----------------------------------------------------------------------
 .../io/InflaterInputStreamFSInputWrapper.java   |  5 ++
 .../apache/flink/core/fs/FSDataInputStream.java |  6 +++
 .../core/fs/local/LocalDataInputStream.java     |  5 ++
 .../runtime/fs/hdfs/HadoopDataInputStream.java  |  6 +++
 .../flink/runtime/fs/s3/S3DataInputStream.java  | 55 +++++++++++++++++---
 5 files changed, 71 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
index 250b5f5..c940cbc 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
@@ -38,6 +38,11 @@ public class InflaterInputStreamFSInputWrapper extends 
FSDataInputStream {
        }
 
        @Override
+       public long getPos() throws IOException {
+               throw new UnsupportedOperationException("Compressed streams do 
not support the getPos operation");
+       }
+
+       @Override
        public int read() throws IOException {
                return inStream.read();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
index 19a34dd..9ebc7f7 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataInputStream.java
@@ -45,4 +45,10 @@ public abstract class FSDataInputStream extends InputStream {
         */
        public abstract void seek(long desired) throws IOException;
 
+       /**
+        * Get the current position in the input stream.
+        *
+        * @return current position in the input stream
+        */
+       public abstract long getPos() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
index 1421ab5..0d5fcf2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataInputStream.java
@@ -57,6 +57,11 @@ public class LocalDataInputStream extends FSDataInputStream {
                this.fis.getChannel().position(desired);
        }
 
+       @Override
+       public long getPos() throws IOException {
+               return this.fis.getChannel().position();
+       }
+
 
        @Override
        public int read() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
index 9a606fd..8aa3c57 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataInputStream.java
@@ -49,6 +49,12 @@ public final class HadoopDataInputStream extends 
FSDataInputStream {
        }
 
        @Override
+       public long getPos() throws IOException {
+
+               return fsDataInputStream.getPos();
+       }
+
+       @Override
        public int read() throws IOException {
 
                return fsDataInputStream.read();

http://git-wip-us.apache.org/repos/asf/flink/blob/7aa9a503/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
index 6dbae7d..715f1a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3DataInputStream.java
@@ -32,7 +32,6 @@ import com.amazonaws.services.s3.model.S3Object;
 /**
  * This class implements an {@link FSDataInputStream} that downloads its data 
from Amazon S3 in the background.
  * Essentially, this class is just a wrapper to the Amazon AWS SDK.
- * 
  */
 public class S3DataInputStream extends FSDataInputStream {
 
@@ -42,8 +41,18 @@ public class S3DataInputStream extends FSDataInputStream {
        private final InputStream inputStream;
 
        /**
+        * The current position of input stream.
+        */
+       private long position;
+
+       /**
+        * The marked position.
+        */
+       private long marked;
+
+       /**
         * Constructs a new input stream which reads its data from the 
specified S3 object.
-        * 
+        *
         * @param s3Client
         *        the S3 client to connect to Amazon S3.
         * @param bucket
@@ -63,6 +72,8 @@ public class S3DataInputStream extends FSDataInputStream {
                }
 
                this.inputStream = s3o.getObjectContent();
+               this.position = 0;
+               this.marked = 0;
        }
 
 
@@ -84,6 +95,7 @@ public class S3DataInputStream extends FSDataInputStream {
        public void mark(final int readlimit) {
 
                this.inputStream.mark(readlimit);
+               marked = readlimit;
        }
 
 
@@ -97,21 +109,36 @@ public class S3DataInputStream extends FSDataInputStream {
        @Override
        public int read() throws IOException {
 
-               return this.inputStream.read();
+               int read = this.inputStream.read();
+               if (read != -1) {
+                       ++position;
+               }
+
+               return read;
        }
 
 
        @Override
        public int read(final byte[] b) throws IOException {
 
-               return this.inputStream.read(b);
+               int read = this.inputStream.read(b);
+               if (read > 0) {
+                       position += read;
+               }
+
+               return read;
        }
 
 
        @Override
        public int read(final byte[] b, final int off, final int len) throws 
IOException {
 
-               return this.inputStream.read(b, off, len);
+               int read = this.inputStream.read(b, off, len);
+               if (read > 0) {
+                       position += read;
+               }
+
+               return read;
        }
 
 
@@ -119,12 +146,28 @@ public class S3DataInputStream extends FSDataInputStream {
        public void reset() throws IOException {
 
                this.inputStream.reset();
+               position = marked;
        }
 
 
        @Override
        public void seek(final long desired) throws IOException {
 
-               this.inputStream.skip(desired);
+               skip(desired);
+       }
+
+       @Override
+       public long skip(long n) throws IOException {
+               long skipped = this.inputStream.skip(n);
+               if (skipped > 0) {
+                       position += skipped;
+               }
+
+               return skipped;
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return position;
        }
 }

Reply via email to