[ 
https://issues.apache.org/jira/browse/FLINK-629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249669#comment-14249669
 ] 

Robert Metzger commented on FLINK-629:
--------------------------------------

Hi Mustafa,
This pull request contains two fixes for getting the code you posted to run: 
https://github.com/apache/incubator-flink/pull/271

You can check out the branch from the pull request (flink610-master) and work 
on that until it has been merged to the master.
I would recommend setting the Log4j logging level to INFO when writing flink 
jobs, because the java api will tell you why its not recognizing types as POJOs.
For example
{code}
0    [main] WARN  org.apache.flink.api.java.typeutils.TypeExtractor  - Class 
class de.tu_berlin.impro3.flink.model.places.Attributes does not contain a 
setter for field twitter
1    [main] WARN  org.apache.flink.api.java.typeutils.TypeExtractor  - Class 
class de.tu_berlin.impro3.flink.model.places.Attributes is not a valid POJO type
{code}

The following code runs with your data model:

{code}
        final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

                DataSet<Long> seq = env.generateSequence(0, 10);
                DataSet<Tweet> tw = seq.flatMap(new FlatMapFunction<Long, 
Tweet>() {
                        @Override
                        public void flatMap(Long aLong, Collector<Tweet> coll) 
throws Exception {
                                for(int i = 0; i < 5; i++) {
                                        Tweet t = new Tweet();
                                        t.setText("uhlalhla .. thats the text");
                                        t.setId(aLong);
                                        coll.collect(t);
                                }
                        }
                });
                DataSet<Integer> cnts = tw.groupBy("id").reduceGroup(new 
GroupReduceFunction<Tweet, Integer>() {
                        @Override
                        public void reduce(Iterable<Tweet> iterable, 
Collector<Integer> collector) throws Exception {
                                int cnt = 0;
                                for(Tweet t: iterable) {
                                        cnt++;
                                }
                                collector.collect(cnt);
                        }
                });
                cnts.print();
{code}

Let me know if you stumble across any other issues with Flink.


> Add support for null values to the java api
> -------------------------------------------
>
>                 Key: FLINK-629
>                 URL: https://issues.apache.org/jira/browse/FLINK-629
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API
>            Reporter: Stephan Ewen
>            Assignee: Gyula Fora
>            Priority: Critical
>              Labels: github-import
>             Fix For: pre-apache
>
>         Attachments: model.tar.gz
>
>
> Currently, many runtime operations fail when encountering a null value. Tuple 
> serialization should allow null fields.
> I suggest to add a method to the tuples called `getFieldNotNull()` which 
> throws a meaningful exception when the accessed field is null. That way, we 
> simplify the logic of operators that should not dead with null fields, like 
> key grouping or aggregations.
> Even though SQL allows grouping and aggregating of null values, I suggest to 
> exclude this from the java api, because the SQL semantics of aggregating null 
> fields are messy.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/629
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: enhancement, java api, 
> Milestone: Release 0.5.1
> Created at: Wed Mar 26 00:27:49 CET 2014
> State: open



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to