Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/658#discussion_r29920838
--- Diff:
flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java
---
@@ -0,0 +1,60 @@
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This class wraps an {@link java.io.InputStream} and exposes it as
{@link org.apache.flink.core.fs.FSDataInputStream}.
+ * <br>
+ * <i>NB: {@link #seek(long)} and {@link #getPos()} are currently not
supported.</i>
+ */
+public class InputStreamFSInputWrapper extends FSDataInputStream {
+
+ private final InputStream inStream;
+
+ private long pos = 0;
+
+ public InputStreamFSInputWrapper(InputStream inStream) {
+ this.inStream = inStream;
+ }
+
+ @Override
+ public void seek(long desired) throws IOException {
+ if (desired < this.pos) {
+ throw new IllegalArgumentException("Wrapped InputStream:
cannot search backwards.");
+ } else if (desired == pos) {
+ return;
+ }
+
+ this.inStream.skip(desired - pos);
+ this.pos = desired;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return this.pos;
+ }
+
+ @Override
+ public int read() throws IOException {
+ int read = inStream.read();
+ if (read != -1) {
+ this.pos++;
+ }
+ return read;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int numReadBytes = inStream.read(b, off, len);
--- End diff --
This method will return `-1` when the stream is over. Will this corrupt the
`pos` ? (Above, you're checking if `read != -1` ?)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---