[ 
https://issues.apache.org/jira/browse/FLINK-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375938#comment-14375938
 ] 

ASF GitHub Bot commented on FLINK-1560:
---------------------------------------

Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/519#discussion_r26939215
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
 ---
    @@ -103,57 +115,124 @@ public static void main(String[] args) throws 
Exception {
        // 
*************************************************************************
     
        /**
    -    * Iteration step function which takes an input (Double , Integer) and
    -    * produces an output (Double + random, Integer + 1).
    +    * Generate random integer pairs from the range from 0 to BOUND/2
    +    */
    +   private static class RandomFibonacciSource implements 
SourceFunction<Tuple2<Integer, Integer>> {
    +           private static final long serialVersionUID = 1L;
    +
    +           private Random rnd = new Random();
    +
    +           @Override
    +           public void run(Collector<Tuple2<Integer, Integer>> collector) 
throws Exception {
    +                   while(true) {
    +                           int first = rnd.nextInt(BOUND/2 - 1) + 1;
    +                           int second = rnd.nextInt(BOUND/2 - 1) + 1;
    +
    +                           collector.collect(new Tuple2<Integer, 
Integer>(first, second));
    +                           Thread.sleep(100L);
    +                   }
    +           }
    +
    +           @Override
    +           public void cancel() {
    +                   // no cleanup needed
    +           }
    +   }
    +
    +   /**
    +    * Generate random integer pairs from the range from 0 to BOUND/2
         */
    -   public static class Step extends
    -                   RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double, 
Integer>> {
    +   private static class FibonacciInputMap implements MapFunction<String, 
Tuple2<Integer, Integer>> {
                private static final long serialVersionUID = 1L;
    -           private transient Random rnd;
     
    -           public void open(Configuration parameters) {
    -                   rnd = new Random();
    +           @Override
    +           public Tuple2<Integer, Integer> map(String value) throws 
Exception {
    +                   Thread.sleep(100L);
    +                   String record = value.substring(1, value.length()-1);
    +                   String[] splitted = record.split(",");
    +                   return new Tuple2<Integer, 
Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
                }
    +   }
    +
    +   /**
    +    * Map the inputs so that the next Fibonacci numbers can be calculated 
while preserving the original input tuple
    +    * A counter is attached to the tuple and incremented in every 
iteration step
    +    */
    +   public static class InputMap implements MapFunction<Tuple2<Integer, 
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    +
    +           @Override
    +           public Tuple5<Integer, Integer, Integer, Integer, Integer> 
map(Tuple2<Integer, Integer> value) throws
    +                           Exception {
    +                   return new Tuple5<Integer, Integer, Integer, Integer, 
Integer>(value.f0, value.f1, value.f0, value.f1, 0);
    +           }
    +   }
    +
    +   /**
    +    * Iteration step function that calculates the next Fibonacci number
    +    */
    +   public static class Step implements
    +                   MapFunction<Tuple5<Integer, Integer, Integer, Integer, 
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
    +           private static final long serialVersionUID = 1L;
     
                @Override
    -           public Tuple2<Double, Integer> map(Tuple2<Double, Integer> 
value) throws Exception {
    -                   return new Tuple2<Double, Integer>(value.f0 + 
rnd.nextDouble(), value.f1 + 1);
    +           public Tuple5<Integer, Integer, Integer, Integer, Integer> 
map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception 
{
    +                   return new Tuple5<Integer, Integer, Integer, Integer, 
Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
                }
        }
     
        /**
         * OutputSelector testing which tuple needs to be iterated again.
         */
    -   public static class MySelector implements OutputSelector<Tuple2<Double, 
Integer>> {
    +   public static class MySelector implements 
OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
                private static final long serialVersionUID = 1L;
     
                @Override
    -           public Iterable<String> select(Tuple2<Double, Integer> value) {
    +           public Iterable<String> select(Tuple5<Integer, Integer, 
Integer, Integer, Integer> value) {
                        List<String> output = new ArrayList<String>();
    -                   if (value.f0 > 100) {
    -                           output.add("output");
    -                   } else {
    +                   if (value.f2 < BOUND && value.f3 < BOUND) {
                                output.add("iterate");
    +                   } else {
    +                           output.add("output");
                        }
    +                   output.add("output");
                        return output;
                }
    +   }
    +
    +   /**
    +    * Giving back the input pair and the counter
    +    */
    +   public static class OutputMap implements MapFunction<Tuple5<Integer, 
Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>> 
{
     
    +           @Override
    +           public Tuple2<Tuple2<Integer, Integer>, Integer> 
map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws
    +                           Exception {
    +                   return new Tuple2<Tuple2<Integer, Integer>, 
Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4);
    +           }
        }
     
        // 
*************************************************************************
        // UTIL METHODS
        // 
*************************************************************************
     
    +   private static boolean fileInput = true;
        private static boolean fileOutput = false;
    +   private static String inputPath = "/home/szape/result.txt";
    --- End diff --
    
    :)


> Add ITCases for streaming examples
> ----------------------------------
>
>                 Key: FLINK-1560
>                 URL: https://issues.apache.org/jira/browse/FLINK-1560
>             Project: Flink
>          Issue Type: Test
>          Components: Streaming
>    Affects Versions: 0.9
>            Reporter: Márton Balassi
>            Assignee: Péter Szabó
>
> Currently there are no tests for consistency of the streaming example 
> programs. This might be a real show stopper for users who encounter an issue 
> there.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to