[
https://issues.apache.org/jira/browse/FLINK-2457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14653613#comment-14653613
]
ASF GitHub Bot commented on FLINK-2457:
---------------------------------------
Github user mjsax commented on the pull request:
https://github.com/apache/flink/pull/983#issuecomment-127605738
I used different dop for source/consumer and double check that multiple TMs
are involved to make sure data is really going over the network...
Batch example:
```
public class TestEmptyTupleBatch {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple0> input = env.fromElements(
new Tuple0(), new Tuple0(), new Tuple0(), new
Tuple0(), new Tuple0(),
new Tuple0(), new Tuple0(), new Tuple0(), new
Tuple0(), new Tuple0(),
new Tuple0(), new Tuple0(), new Tuple0(), new
Tuple0(), new Tuple0(),
new Tuple0(), new Tuple0(), new Tuple0(), new
Tuple0(), new Tuple0());
input.map(new Counter()).setParallelism(4).print();
}
public static class Counter implements MapFunction<Tuple0, Integer> {
private static final long serialVersionUID =
5518823010274195186L;
int counter = 0;
@Override
public Integer map(Tuple0 value) throws Exception {
System.out.println("mjsax: " + (++counter) + " " +
value.getField(0));
return new Integer(value.getArity());
}
}
}
```
Streaming Example:
```
public class TestEmptyTuple {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple0> input = env.addSource(new
Empty()).setParallelism(4);
input.shuffle().transform("verify", null, new
Counter()).setParallelism(1);
env.execute();
}
public static class Empty implements ParallelSourceFunction<Tuple0> {
private static final long serialVersionUID =
1350902748274781033L;
private volatile boolean isRunning = true;
@Override
public void
run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Tuple0>
ctx)
throws Exception {
while (this.isRunning) {
Thread.sleep(1000);
ctx.collect(new Tuple0());
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}
public static class Counter implements
OneInputStreamOperator<Tuple0,Object> {
private static final long serialVersionUID =
5518823010274195186L;
private int counter = 0;
@Override
public void processElement(StreamRecord<Tuple0> element) throws
Exception {
System.out.println("Tuple0: " + (++counter));
}
//
}
}
```
> Integrate Tuple0
> ----------------
>
> Key: FLINK-2457
> URL: https://issues.apache.org/jira/browse/FLINK-2457
> Project: Flink
> Issue Type: Improvement
> Reporter: Matthias J. Sax
> Assignee: Matthias J. Sax
> Priority: Minor
>
> Tuple0 is not cleanly integrated:
> - missing serialization/deserialization support in runtime
> - Tuple.getTupleClass(int arity) cannot handle arity zero, ie, cannot create
> an instance of Tuple0
> Tuple0 is currently only used in Python API, but will be integrated into
> Storm compatibility, too.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)