[ https://issues.apache.org/jira/browse/FLINK-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358316#comment-14358316 ]
ASF GitHub Bot commented on FLINK-1615: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/442#discussion_r26285316 --- Diff: flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java --- @@ -0,0 +1,67 @@ +package org.apache.flink.contrib.tweetinputformat.io; + +import org.apache.flink.api.common.io.DelimitedInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet; +import org.codehaus.jackson.JsonParseException; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; + + +public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet> implements ResultTypeQueryable<Tweet> { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleTweetInputFormat.class); + + @Override + public Tweet nextRecord(Tweet record) throws IOException { + Boolean result = false; + + do { + try { + record.reset(0); + record = super.nextRecord(record); + result = true; + + } catch (JsonParseException e) { + result = false; + + } + } while (!result); + + return record; + } + + @Override + public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int numBytes) throws IOException { + + + InputStreamReader jsonReader = new InputStreamReader(new ByteArrayInputStream(bytes)); + jsonReader.skip(offset); + + JSONParser parser = new JSONParser(); --- End diff -- Putting the parser into a `transient` field and initalizing it in the `open()` method is the way to go. Can you give me the full stacktrace for the "Can not submit Job" exception? > Introduces a new InputFormat for Tweets > --------------------------------------- > > Key: FLINK-1615 > URL: https://issues.apache.org/jira/browse/FLINK-1615 > Project: Flink > Issue Type: New Feature > Components: flink-contrib > Affects Versions: 0.8.1 > Reporter: mustafa elbehery > Priority: Minor > > An event-driven parser for Tweets into Java Pojos. > It parses all the important part of the tweet into Java objects. > Tested on cluster and the performance in pretty well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)