[
https://issues.apache.org/jira/browse/FLINK-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358299#comment-14358299
]
ASF GitHub Bot commented on FLINK-1615:
---------------------------------------
Github user Elbehery commented on a diff in the pull request:
https://github.com/apache/flink/pull/442#discussion_r26284910
--- Diff:
flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/io/SimpleTweetInputFormat.java
---
@@ -0,0 +1,67 @@
+package org.apache.flink.contrib.tweetinputformat.io;
+
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
+import org.codehaus.jackson.JsonParseException;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+public class SimpleTweetInputFormat extends DelimitedInputFormat<Tweet>
implements ResultTypeQueryable<Tweet> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SimpleTweetInputFormat.class);
+
+ @Override
+ public Tweet nextRecord(Tweet record) throws IOException {
+ Boolean result = false;
+
+ do {
+ try {
+ record.reset(0);
+ record = super.nextRecord(record);
+ result = true;
+
+ } catch (JsonParseException e) {
+ result = false;
+
+ }
+ } while (!result);
+
+ return record;
+ }
+
+ @Override
+ public Tweet readRecord(Tweet reuse, byte[] bytes, int offset, int
numBytes) throws IOException {
+
+
+ InputStreamReader jsonReader = new InputStreamReader(new
ByteArrayInputStream(bytes));
+ jsonReader.skip(offset);
+
+ JSONParser parser = new JSONParser();
--- End diff --
I know, I have tried in the beginning to create them as instance fields in
the class, however I received an error because the fields are not serializable.
I tried to declare them as transient, but Flink threw "Can not submit Job"
exception, If you have suggestions please let me know
> Introduces a new InputFormat for Tweets
> ---------------------------------------
>
> Key: FLINK-1615
> URL: https://issues.apache.org/jira/browse/FLINK-1615
> Project: Flink
> Issue Type: New Feature
> Components: flink-contrib
> Affects Versions: 0.8.1
> Reporter: mustafa elbehery
> Priority: Minor
>
> An event-driven parser for Tweets into Java Pojos.
> It parses all the important part of the tweet into Java objects.
> Tested on cluster and the performance in pretty well.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)