[ 
https://issues.apache.org/jira/browse/FLINK-2457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14653613#comment-14653613
 ] 

ASF GitHub Bot commented on FLINK-2457:
---------------------------------------

Github user mjsax commented on the pull request:

    https://github.com/apache/flink/pull/983#issuecomment-127605738
  
    I used different dop for source/consumer and double check that multiple TMs 
are involved to make sure data is really going over the network...
    
    Batch example:
    ```
    public class TestEmptyTupleBatch {
    
        public static void main(String[] args) throws Exception {
                final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    
                DataSource<Tuple0> input = env.fromElements(
                                new Tuple0(), new Tuple0(), new Tuple0(), new 
Tuple0(), new Tuple0(),
                                new Tuple0(), new Tuple0(), new Tuple0(), new 
Tuple0(), new Tuple0(),
                                new Tuple0(), new Tuple0(), new Tuple0(), new 
Tuple0(), new Tuple0(),
                                new Tuple0(), new Tuple0(), new Tuple0(), new 
Tuple0(), new Tuple0());
    
                input.map(new Counter()).setParallelism(4).print();
        }
    
        public static class Counter implements MapFunction<Tuple0, Integer> {
                private static final long serialVersionUID = 
5518823010274195186L;
    
                int counter = 0;
                @Override
                public Integer map(Tuple0 value) throws Exception {
                        System.out.println("mjsax: " + (++counter) + " " + 
value.getField(0));
                        return new Integer(value.getArity());
                }
        }
    }
    ```
    
    Streaming Example:
    ```
    public class TestEmptyTuple {
    
        public static void main(String[] args) throws Exception {
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    
                DataStream<Tuple0> input = env.addSource(new 
Empty()).setParallelism(4);
                input.shuffle().transform("verify", null, new 
Counter()).setParallelism(1);
    
                env.execute();
        }
    
        public static class Empty implements ParallelSourceFunction<Tuple0> {
                private static final long serialVersionUID = 
1350902748274781033L;
                private volatile boolean isRunning = true;
    
                @Override
                public void 
run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext<Tuple0>
 ctx)
                                                throws Exception {
                        while (this.isRunning) {
                                Thread.sleep(1000);
                                ctx.collect(new Tuple0());
                        }
                }
    
                @Override
                public void cancel() {
                        this.isRunning = false;
                }
        }
    
        public static class Counter implements 
OneInputStreamOperator<Tuple0,Object> {
                private static final long serialVersionUID = 
5518823010274195186L;
    
                private int counter = 0;
                @Override
                public void processElement(StreamRecord<Tuple0> element) throws 
Exception {
                        System.out.println("Tuple0: " + (++counter));
                }
    
                // 
        }
    
    }
    ```
    



> Integrate Tuple0
> ----------------
>
>                 Key: FLINK-2457
>                 URL: https://issues.apache.org/jira/browse/FLINK-2457
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>            Priority: Minor
>
> Tuple0 is not cleanly integrated:
>   - missing serialization/deserialization support in runtime
>  - Tuple.getTupleClass(int arity) cannot handle arity zero, ie, cannot create 
> an instance of Tuple0
> Tuple0 is currently only used in Python API, but will be integrated into 
> Storm compatibility, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to