Repository: flink Updated Branches: refs/heads/master 1f49926be -> c56e3f10b
[FLINK-1451] [streaming] Parallel file source fix + minor windowing fix Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c56e3f10 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c56e3f10 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c56e3f10 Branch: refs/heads/master Commit: c56e3f10b27e1e5be38b8a731f330891b190a268 Parents: 1f49926 Author: Gyula Fora <[email protected]> Authored: Fri Feb 20 13:57:17 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Fri Feb 20 14:22:52 2015 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DiscretizedStream.java | 2 +- .../api/function/source/FileSourceFunction.java | 2 +- .../windowing/GroupedWindowBufferInvokable.java | 16 +++++++++------- 3 files changed, 11 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c56e3f10/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java index 0c84d0a..7df91f0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java @@ -93,7 +93,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction, TypeInformation<R> returnType) { DiscretizedStream<R> out = partition(transformation).transform( - WindowTransformation.REDUCEWINDOW, "Window Reduce", returnType, + WindowTransformation.REDUCEWINDOW, "Window Map", returnType, new WindowMapper<OUT, R>(discretizedStream.clean(windowMapFunction))); if (isGrouped()) { http://git-wip-us.apache.org/repos/asf/flink/blob/c56e3f10/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java index 20f5f56..dcf67a9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; import org.apache.flink.util.Collector; -public class FileSourceFunction extends RichSourceFunction<String> { +public class FileSourceFunction extends RichParallelSourceFunction<String> { private static final long serialVersionUID = 1L; private InputSplitProvider provider; http://git-wip-us.apache.org/repos/asf/flink/blob/c56e3f10/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java index 2872f48..53c87c3 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowBufferInvokable.java @@ -49,15 +49,17 @@ public class GroupedWindowBufferInvokable<T> extends WindowBufferInvokable<T> { @Override protected void callUserFunction() throws Exception { - Object key = keySelector.getKey(nextObject.getElement()); - WindowBuffer<T> currentWindow = windowMap.get(key); + if (nextObject.getElement() != null) { + Object key = keySelector.getKey(nextObject.getElement()); + WindowBuffer<T> currentWindow = windowMap.get(key); - if (currentWindow == null) { - currentWindow = buffer.clone(); - windowMap.put(key, currentWindow); - } + if (currentWindow == null) { + currentWindow = buffer.clone(); + windowMap.put(key, currentWindow); + } - handleWindowEvent(nextObject, currentWindow); + handleWindowEvent(nextObject, currentWindow); + } } @Override
