[ 
https://issues.apache.org/jira/browse/FLINK-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358316#comment-14358316
 ] 

ASF GitHub Bot commented on FLINK-1615:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26285316
  
    --- Diff: 
flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
 ---
    @@ -0,0 +1,67 @@
    +package org.apache.flink.contrib.tweetinputformat.io;
    +
    +import org.apache.flink.api.common.io.DelimitedInputFormat;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.typeutils.GenericTypeInfo;
    +import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import org.codehaus.jackson.JsonParseException;
    +import org.json.simple.parser.JSONParser;
    +import org.json.simple.parser.ParseException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.ByteArrayInputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +
    +
    +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> 
implements ResultTypeQueryable<Tweet> {
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTweetInputFormat.class);
    +
    +    @Override
    +    public Tweet nextRecord(Tweet record) throws IOException {
    +        Boolean result = false;
    +
    +        do {
    +            try {
    +                record.reset(0);
    +                record = super.nextRecord(record);
    +                result = true;
    +
    +            } catch (JsonParseException e) {
    +                result = false;
    +
    +            }
    +        } while (!result);
    +
    +        return record;
    +    }
    +
    +    @Override
    +    public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int 
numBytes) throws IOException {
    +
    +
    +        InputStreamReader jsonReader = new InputStreamReader(new 
ByteArrayInputStream(bytes));
    +        jsonReader.skip(offset);
    +
    +        JSONParser parser = new JSONParser();
    --- End diff --
    
    Putting the parser into a `transient` field and initalizing it in the 
`open()` method is the way to go.
    
    Can you give me the full stacktrace for the "Can not submit Job" exception?


> Introduces a new InputFormat for Tweets
> ---------------------------------------
>
>                 Key: FLINK-1615
>                 URL: https://issues.apache.org/jira/browse/FLINK-1615
>             Project: Flink
>          Issue Type: New Feature
>          Components: flink-contrib
>    Affects Versions: 0.8.1
>            Reporter: mustafa elbehery
>            Priority: Minor
>
> An event-driven parser for Tweets into Java Pojos. 
> It parses all the important part of the tweet into Java objects. 
> Tested on cluster and the performance in pretty well. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to