[
https://issues.apache.org/jira/browse/FLINK-629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249669#comment-14249669
]
Robert Metzger commented on FLINK-629:
--------------------------------------
Hi Mustafa,
This pull request contains two fixes for getting the code you posted to run:
https://github.com/apache/incubator-flink/pull/271
You can check out the branch from the pull request (flink610-master) and work
on that until it has been merged to the master.
I would recommend setting the Log4j logging level to INFO when writing flink
jobs, because the java api will tell you why its not recognizing types as POJOs.
For example
{code}
0 [main] WARN org.apache.flink.api.java.typeutils.TypeExtractor - Class
class de.tu_berlin.impro3.flink.model.places.Attributes does not contain a
setter for field twitter
1 [main] WARN org.apache.flink.api.java.typeutils.TypeExtractor - Class
class de.tu_berlin.impro3.flink.model.places.Attributes is not a valid POJO type
{code}
The following code runs with your data model:
{code}
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> seq = env.generateSequence(0, 10);
DataSet<Tweet> tw = seq.flatMap(new FlatMapFunction<Long,
Tweet>() {
@Override
public void flatMap(Long aLong, Collector<Tweet> coll)
throws Exception {
for(int i = 0; i < 5; i++) {
Tweet t = new Tweet();
t.setText("uhlalhla .. thats the text");
t.setId(aLong);
coll.collect(t);
}
}
});
DataSet<Integer> cnts = tw.groupBy("id").reduceGroup(new
GroupReduceFunction<Tweet, Integer>() {
@Override
public void reduce(Iterable<Tweet> iterable,
Collector<Integer> collector) throws Exception {
int cnt = 0;
for(Tweet t: iterable) {
cnt++;
}
collector.collect(cnt);
}
});
cnts.print();
{code}
Let me know if you stumble across any other issues with Flink.
> Add support for null values to the java api
> -------------------------------------------
>
> Key: FLINK-629
> URL: https://issues.apache.org/jira/browse/FLINK-629
> Project: Flink
> Issue Type: Improvement
> Components: Java API
> Reporter: Stephan Ewen
> Assignee: Gyula Fora
> Priority: Critical
> Labels: github-import
> Fix For: pre-apache
>
> Attachments: model.tar.gz
>
>
> Currently, many runtime operations fail when encountering a null value. Tuple
> serialization should allow null fields.
> I suggest to add a method to the tuples called `getFieldNotNull()` which
> throws a meaningful exception when the accessed field is null. That way, we
> simplify the logic of operators that should not dead with null fields, like
> key grouping or aggregations.
> Even though SQL allows grouping and aggregating of null values, I suggest to
> exclude this from the java api, because the SQL semantics of aggregating null
> fields are messy.
> ---------------- Imported from GitHub ----------------
> Url: https://github.com/stratosphere/stratosphere/issues/629
> Created by: [StephanEwen|https://github.com/StephanEwen]
> Labels: enhancement, java api,
> Milestone: Release 0.5.1
> Created at: Wed Mar 26 00:27:49 CET 2014
> State: open
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)