[
https://issues.apache.org/jira/browse/FLINK-1560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14375938#comment-14375938
]
ASF GitHub Bot commented on FLINK-1560:
---------------------------------------
Github user mbalassi commented on a diff in the pull request:
https://github.com/apache/flink/pull/519#discussion_r26939215
--- Diff:
flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
---
@@ -103,57 +115,124 @@ public static void main(String[] args) throws
Exception {
//
*************************************************************************
/**
- * Iteration step function which takes an input (Double , Integer) and
- * produces an output (Double + random, Integer + 1).
+ * Generate random integer pairs from the range from 0 to BOUND/2
+ */
+ private static class RandomFibonacciSource implements
SourceFunction<Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private Random rnd = new Random();
+
+ @Override
+ public void run(Collector<Tuple2<Integer, Integer>> collector)
throws Exception {
+ while(true) {
+ int first = rnd.nextInt(BOUND/2 - 1) + 1;
+ int second = rnd.nextInt(BOUND/2 - 1) + 1;
+
+ collector.collect(new Tuple2<Integer,
Integer>(first, second));
+ Thread.sleep(100L);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // no cleanup needed
+ }
+ }
+
+ /**
+ * Generate random integer pairs from the range from 0 to BOUND/2
*/
- public static class Step extends
- RichMapFunction<Tuple2<Double, Integer>, Tuple2<Double,
Integer>> {
+ private static class FibonacciInputMap implements MapFunction<String,
Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
- private transient Random rnd;
- public void open(Configuration parameters) {
- rnd = new Random();
+ @Override
+ public Tuple2<Integer, Integer> map(String value) throws
Exception {
+ Thread.sleep(100L);
+ String record = value.substring(1, value.length()-1);
+ String[] splitted = record.split(",");
+ return new Tuple2<Integer,
Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
}
+ }
+
+ /**
+ * Map the inputs so that the next Fibonacci numbers can be calculated
while preserving the original input tuple
+ * A counter is attached to the tuple and incremented in every
iteration step
+ */
+ public static class InputMap implements MapFunction<Tuple2<Integer,
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+
+ @Override
+ public Tuple5<Integer, Integer, Integer, Integer, Integer>
map(Tuple2<Integer, Integer> value) throws
+ Exception {
+ return new Tuple5<Integer, Integer, Integer, Integer,
Integer>(value.f0, value.f1, value.f0, value.f1, 0);
+ }
+ }
+
+ /**
+ * Iteration step function that calculates the next Fibonacci number
+ */
+ public static class Step implements
+ MapFunction<Tuple5<Integer, Integer, Integer, Integer,
Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
@Override
- public Tuple2<Double, Integer> map(Tuple2<Double, Integer>
value) throws Exception {
- return new Tuple2<Double, Integer>(value.f0 +
rnd.nextDouble(), value.f1 + 1);
+ public Tuple5<Integer, Integer, Integer, Integer, Integer>
map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws Exception
{
+ return new Tuple5<Integer, Integer, Integer, Integer,
Integer>(value.f0, value.f1, value.f3, value.f2 + value.f3, ++value.f4);
}
}
/**
* OutputSelector testing which tuple needs to be iterated again.
*/
- public static class MySelector implements OutputSelector<Tuple2<Double,
Integer>> {
+ public static class MySelector implements
OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
private static final long serialVersionUID = 1L;
@Override
- public Iterable<String> select(Tuple2<Double, Integer> value) {
+ public Iterable<String> select(Tuple5<Integer, Integer,
Integer, Integer, Integer> value) {
List<String> output = new ArrayList<String>();
- if (value.f0 > 100) {
- output.add("output");
- } else {
+ if (value.f2 < BOUND && value.f3 < BOUND) {
output.add("iterate");
+ } else {
+ output.add("output");
}
+ output.add("output");
return output;
}
+ }
+
+ /**
+ * Giving back the input pair and the counter
+ */
+ public static class OutputMap implements MapFunction<Tuple5<Integer,
Integer, Integer, Integer, Integer>, Tuple2<Tuple2<Integer, Integer>, Integer>>
{
+ @Override
+ public Tuple2<Tuple2<Integer, Integer>, Integer>
map(Tuple5<Integer, Integer, Integer, Integer, Integer> value) throws
+ Exception {
+ return new Tuple2<Tuple2<Integer, Integer>,
Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1), value.f4);
+ }
}
//
*************************************************************************
// UTIL METHODS
//
*************************************************************************
+ private static boolean fileInput = true;
private static boolean fileOutput = false;
+ private static String inputPath = "/home/szape/result.txt";
--- End diff --
:)
> Add ITCases for streaming examples
> ----------------------------------
>
> Key: FLINK-1560
> URL: https://issues.apache.org/jira/browse/FLINK-1560
> Project: Flink
> Issue Type: Test
> Components: Streaming
> Affects Versions: 0.9
> Reporter: Márton Balassi
> Assignee: Péter Szabó
>
> Currently there are no tests for consistency of the streaming example
> programs. This might be a real show stopper for users who encounter an issue
> there.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)