Repository: flink Updated Branches: refs/heads/master 7c2bbb6a3 -> 9d8a34881
[streaming] Minor streaming code cleanups Closes #873 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d8a3488 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d8a3488 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d8a3488 Branch: refs/heads/master Commit: 9d8a348815b4b49940b48aad09dacecbd7a9564e Parents: 6c21862 Author: mbalassi <[email protected]> Authored: Mon Jun 29 11:21:23 2015 +0200 Committer: mbalassi <[email protected]> Committed: Fri Jul 3 08:42:54 2015 +0200 ---------------------------------------------------------------------- .../api/operators/windowing/GroupedActiveDiscretizer.java | 7 ------- .../api/operators/windowing/GroupedStreamDiscretizer.java | 5 +---- .../test/java/org/apache/flink/streaming/api/IterateTest.java | 1 - 3 files changed, 1 insertion(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java index 190cb48..0cdafd9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java @@ -52,8 +52,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> { @Override public void processElement(IN element) throws Exception { - -// last = copy(element); last = element; Object key = keySelector.getKey(element); @@ -67,10 +65,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> { groupDiscretizer.processRealElement(element); } - - - - } @Override @@ -90,7 +84,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> { centralThread.interrupt(); centralThread.join(); } catch (InterruptedException e) { - e.printStackTrace(); LOG.info("GroupedActiveDiscretizer got interruped while joining with central thread: {}", e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java index e80b6ab..64e8b04 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java @@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer; * transformation. The user supplied eviction and trigger policies are applied * on a per group basis to create the {@link StreamWindow} that will be further * transformed in the next stages. </p> To allow pre-aggregations supply an - * appropriate {@link WindowBuffer} + * appropriate {@link WindowBuffer}. */ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> { @@ -69,7 +69,6 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> { @Override public void processElement(IN element) throws Exception { - Object key = keySelector.getKey(element); StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key); @@ -97,12 +96,10 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> { StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(), evictionPolicy.clone()); -// groupDiscretizer.output = taskContext.getOutputCollector(); // TODO: this seems very hacky, maybe we can get around this groupDiscretizer.setup(this.output, this.runtimeContext); groupDiscretizer.open(this.parameters); - return groupDiscretizer; } http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java index c660dbc..3021abb 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java @@ -167,7 +167,6 @@ public class IterateTest { it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true); it2.closeWith(head2, false); - System.out.println(env.getExecutionPlan()); StreamGraph graph = env.getStreamGraph(); for (StreamLoop loop : graph.getStreamLoops()) {
