Repository: flink
Updated Branches:
  refs/heads/master 1f49926be -> c56e3f10b


[FLINK-1451] [streaming] Parallel file source fix + minor windowing fix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c56e3f10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c56e3f10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c56e3f10

Branch: refs/heads/master
Commit: c56e3f10b27e1e5be38b8a731f330891b190a268
Parents: 1f49926
Author: Gyula Fora <[email protected]>
Authored: Fri Feb 20 13:57:17 2015 +0100
Committer: Gyula Fora <[email protected]>
Committed: Fri Feb 20 14:22:52 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DiscretizedStream.java |  2 +-
 .../api/function/source/FileSourceFunction.java     |  2 +-
 .../windowing/GroupedWindowBufferInvokable.java     | 16 +++++++++-------
 3 files changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c56e3f10/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 0c84d0a..7df91f0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -93,7 +93,7 @@ public class DiscretizedStream<OUT> extends 
WindowedDataStream<OUT> {
        public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> 
windowMapFunction,
                        TypeInformation<R> returnType) {
                DiscretizedStream<R> out = partition(transformation).transform(
-                               WindowTransformation.REDUCEWINDOW, "Window 
Reduce", returnType,
+                               WindowTransformation.REDUCEWINDOW, "Window 
Map", returnType,
                                new WindowMapper<OUT, 
R>(discretizedStream.clean(windowMapFunction)));
 
                if (isGrouped()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c56e3f10/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
index 20f5f56..dcf67a9 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
 import org.apache.flink.util.Collector;
 
-public class FileSourceFunction extends RichSourceFunction<String> {
+public class FileSourceFunction extends RichParallelSourceFunction<String> {
        private static final long serialVersionUID = 1L;
 
        private InputSplitProvider provider;

http://git-wip-us.apache.org/repos/asf/flink/blob/c56e3f10/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
index 2872f48..53c87c3 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java
@@ -49,15 +49,17 @@ public class GroupedWindowBufferInvokable<T> extends 
WindowBufferInvokable<T> {
 
        @Override
        protected void callUserFunction() throws Exception {
-               Object key = keySelector.getKey(nextObject.getElement());
-               WindowBuffer<T> currentWindow = windowMap.get(key);
+               if (nextObject.getElement() != null) {
+                       Object key = 
keySelector.getKey(nextObject.getElement());
+                       WindowBuffer<T> currentWindow = windowMap.get(key);
 
-               if (currentWindow == null) {
-                       currentWindow = buffer.clone();
-                       windowMap.put(key, currentWindow);
-               }
+                       if (currentWindow == null) {
+                               currentWindow = buffer.clone();
+                               windowMap.put(key, currentWindow);
+                       }
 
-               handleWindowEvent(nextObject, currentWindow);
+                       handleWindowEvent(nextObject, currentWindow);
+               }
        }
 
        @Override

Reply via email to