[ 
https://issues.apache.org/jira/browse/FLINK-1615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14358566#comment-14358566
 ] 

ASF GitHub Bot commented on FLINK-1615:
---------------------------------------

Github user Elbehery commented on a diff in the pull request:

    https://github.com/apache/flink/pull/442#discussion_r26295918
  
    --- Diff: 
flink-contrib/src/test/java/org/apache/flink/contrib/SimpleTweetInputFormatTest.java
 ---
    @@ -0,0 +1,76 @@
    +package org.apache.flink.contrib;
    +
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
    +import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
    +import 
org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
    +import org.apache.flink.core.fs.FileInputSplit;
    +import org.apache.flink.core.fs.Path;
    +import org.junit.Assert;
    +import org.junit.Before;
    +import org.junit.Test;
    +import java.io.File;
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.List;
    +
    +
    +public class SimpleTweetInputFormatTest {
    +
    +    private Tweet tweet;
    +
    +    private SimpleTweetInputFormat simpleTweetInputFormat;
    +
    +    private FileInputSplit fileInputSplit;
    +
    +    protected Configuration config;
    +
    +    protected File tempFile;
    +
    +
    +    @Before
    +    public void testSetUp() {
    +
    +
    +        simpleTweetInputFormat = new SimpleTweetInputFormat();
    +
    +        File jsonFile = new  
File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
    +
    +        fileInputSplit = new FileInputSplit(0, new 
Path(jsonFile.getPath()), 0, jsonFile.length(), new String[] {"localhost"});
    +    }
    +
    +    @Test
    +    public void testTweetInput() throws Exception {
    +
    +
    +        simpleTweetInputFormat.open(fileInputSplit);
    +        List<String> result;
    +
    +        while (!simpleTweetInputFormat.reachedEnd()) {
    +            tweet = new Tweet();
    +            tweet = simpleTweetInputFormat.nextRecord(tweet);
    +
    +            if(tweet != null){
    --- End diff --
    
    I tried to do so, but the problem is that the reachedEnd() is updated 
inside nextRecord() in DelimitedInputFormat. I tried to make a condition when 
the readPos == limit, do not call nextRecord, but I could not because these 
fields are private. 
    
    So, I have to stop reading when the reachedEnd is true, which in turn means 
that the returned object from nextRecord [ the tweet ] is null. To add the 
required assert, all the tests will fail. I do not know how to avoid this 
problem, do u have a suggestion ?


> Introduces a new InputFormat for Tweets
> ---------------------------------------
>
>                 Key: FLINK-1615
>                 URL: https://issues.apache.org/jira/browse/FLINK-1615
>             Project: Flink
>          Issue Type: New Feature
>          Components: flink-contrib
>    Affects Versions: 0.8.1
>            Reporter: mustafa elbehery
>            Priority: Minor
>
> An event-driven parser for Tweets into Java Pojos. 
> It parses all the important part of the tweet into Java objects. 
> Tested on cluster and the performance in pretty well. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to