Repository: flink
Updated Branches:
  refs/heads/master 7c2bbb6a3 -> 9d8a34881


[streaming] Minor streaming code cleanups

Closes #873


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d8a3488
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d8a3488
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d8a3488

Branch: refs/heads/master
Commit: 9d8a348815b4b49940b48aad09dacecbd7a9564e
Parents: 6c21862
Author: mbalassi <[email protected]>
Authored: Mon Jun 29 11:21:23 2015 +0200
Committer: mbalassi <[email protected]>
Committed: Fri Jul 3 08:42:54 2015 +0200

----------------------------------------------------------------------
 .../api/operators/windowing/GroupedActiveDiscretizer.java     | 7 -------
 .../api/operators/windowing/GroupedStreamDiscretizer.java     | 5 +----
 .../test/java/org/apache/flink/streaming/api/IterateTest.java | 1 -
 3 files changed, 1 insertion(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index 190cb48..0cdafd9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -52,8 +52,6 @@ public class GroupedActiveDiscretizer<IN> extends 
GroupedStreamDiscretizer<IN> {
 
        @Override
        public void processElement(IN element) throws Exception {
-
-//                     last = copy(element);
                        last = element;
                        Object key = keySelector.getKey(element);
 
@@ -67,10 +65,6 @@ public class GroupedActiveDiscretizer<IN> extends 
GroupedStreamDiscretizer<IN> {
 
                                groupDiscretizer.processRealElement(element);
                        }
-
-
-
-
        }
 
        @Override
@@ -90,7 +84,6 @@ public class GroupedActiveDiscretizer<IN> extends 
GroupedStreamDiscretizer<IN> {
                        centralThread.interrupt();
                        centralThread.join();
                } catch (InterruptedException e) {
-                       e.printStackTrace();
                        LOG.info("GroupedActiveDiscretizer got interruped while 
joining with central thread: {}", e);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
index e80b6ab..64e8b04 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
@@ -32,7 +32,7 @@ import 
org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
  * transformation. The user supplied eviction and trigger policies are applied
  * on a per group basis to create the {@link StreamWindow} that will be further
  * transformed in the next stages. </p> To allow pre-aggregations supply an
- * appropriate {@link WindowBuffer}
+ * appropriate {@link WindowBuffer}.
  */
 public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 
@@ -69,7 +69,6 @@ public class GroupedStreamDiscretizer<IN> extends 
StreamDiscretizer<IN> {
        @Override
        public void processElement(IN element) throws Exception {
 
-
                        Object key = keySelector.getKey(element);
 
                        StreamDiscretizer<IN> groupDiscretizer = 
groupedDiscretizers.get(key);
@@ -97,12 +96,10 @@ public class GroupedStreamDiscretizer<IN> extends 
StreamDiscretizer<IN> {
                StreamDiscretizer<IN> groupDiscretizer = new 
StreamDiscretizer<IN>(triggerPolicy.clone(),
                                evictionPolicy.clone());
 
-//             groupDiscretizer.output = taskContext.getOutputCollector();
                // TODO: this seems very hacky, maybe we can get around this
                groupDiscretizer.setup(this.output, this.runtimeContext);
                groupDiscretizer.open(this.parameters);
 
-
                return groupDiscretizer;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index c660dbc..3021abb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -167,7 +167,6 @@ public class IterateTest {
                it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), 
true);
                it2.closeWith(head2, false);
 
-               System.out.println(env.getExecutionPlan());
                StreamGraph graph = env.getStreamGraph();
 
                for (StreamLoop loop : graph.getStreamLoops()) {

Reply via email to