[ 
https://issues.apache.org/jira/browse/FLINK-1980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14534019#comment-14534019
 ] 

ASF GitHub Bot commented on FLINK-1980:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/658#discussion_r29920838
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java
 ---
    @@ -0,0 +1,60 @@
    +package org.apache.flink.api.common.io;
    +
    +import org.apache.flink.core.fs.FSDataInputStream;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +
    +/**
    + * This class wraps an {@link java.io.InputStream} and exposes it as 
{@link org.apache.flink.core.fs.FSDataInputStream}.
    + * <br>
    + * <i>NB: {@link #seek(long)} and {@link #getPos()} are currently not 
supported.</i>
    + */
    +public class InputStreamFSInputWrapper extends FSDataInputStream {
    +
    +    private final InputStream inStream;
    +
    +    private long pos = 0;
    +
    +    public InputStreamFSInputWrapper(InputStream inStream) {
    +        this.inStream = inStream;
    +    }
    +
    +    @Override
    +    public void seek(long desired) throws IOException {
    +        if (desired < this.pos) {
    +            throw new IllegalArgumentException("Wrapped InputStream: 
cannot search backwards.");
    +        } else if (desired == pos) {
    +            return;
    +        }
    +
    +        this.inStream.skip(desired - pos);
    +        this.pos = desired;
    +    }
    +
    +    @Override
    +    public long getPos() throws IOException {
    +        return this.pos;
    +    }
    +
    +    @Override
    +    public int read() throws IOException {
    +        int read = inStream.read();
    +        if (read != -1) {
    +            this.pos++;
    +        }
    +        return read;
    +    }
    +
    +    @Override
    +    public int read(byte[] b, int off, int len) throws IOException {
    +        int numReadBytes = inStream.read(b, off, len);
    --- End diff --
    
    This method will return `-1` when the stream is over. Will this corrupt the 
`pos` ? (Above, you're checking if `read != -1` ?) 


> Allowing users to decorate input streams
> ----------------------------------------
>
>                 Key: FLINK-1980
>                 URL: https://issues.apache.org/jira/browse/FLINK-1980
>             Project: Flink
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Sebastian Kruse
>            Assignee: Sebastian Kruse
>            Priority: Minor
>
> Users may have to do unforeseeable operations on file input streams before 
> they can be used by the actual input format logic, e.g., exotic compression 
> formats or preambles such as byte order marks. Therefore, it would be useful 
> to provide the user with a hook to decorate input streams in order to handle 
> such issues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to