[FLINK-2550] Rework interplay of Window Assigners and TimeCharacteristic

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff367d6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff367d6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff367d6e

Branch: refs/heads/master
Commit: ff367d6ea728a7c5bc334f34591a4d79e573972f
Parents: 28a38bb
Author: Aljoscha Krettek <[email protected]>
Authored: Tue Oct 6 18:19:56 2015 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 7 22:08:25 2015 +0200

----------------------------------------------------------------------
 .../api/datastream/AllWindowedStream.java       |  17 +--
 .../streaming/api/datastream/DataStream.java    |  20 +---
 .../streaming/api/datastream/KeyedStream.java   |  20 +---
 .../api/datastream/WindowedStream.java          |  29 ++---
 .../environment/StreamExecutionEnvironment.java |  12 +++
 .../api/windowing/assigners/GlobalWindows.java  |   3 +-
 .../assigners/SlidingProcessingTimeWindows.java | 106 -------------------
 .../windowing/assigners/SlidingTimeWindows.java |  11 +-
 .../TumblingProcessingTimeWindows.java          |  81 --------------
 .../assigners/TumblingTimeWindows.java          |  11 +-
 .../api/windowing/assigners/WindowAssigner.java |   3 +-
 .../operators/windowing/WindowOperator.java     |   5 +
 .../flink/streaming/api/CoGroupJoinITCase.java  |   6 +-
 .../windowing/AllWindowTranslationTest.java     |  60 +++++++----
 .../windowing/WindowTranslationTest.java        |  89 +++++++++++++---
 .../streaming/examples/join/WindowJoin.java     |   3 +-
 .../scala/examples/join/WindowJoin.scala        |   3 +-
 .../flink/streaming/api/scala/DataStream.scala  |  37 +------
 .../flink/streaming/api/scala/KeyedStream.scala |  37 +------
 .../api/scala/StreamExecutionEnvironment.scala  |   5 +
 .../api/scala/AllWindowTranslationTest.scala    |  24 +++--
 .../streaming/api/scala/CoGroupJoinITCase.scala |   9 +-
 .../api/scala/WindowTranslationTest.scala       |  22 ++--
 23 files changed, 233 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 89c4857..a8d7654 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -80,7 +81,7 @@ public class AllWindowedStream<T, W extends Window> {
                        WindowAssigner<? super T, W> windowAssigner) {
                this.input = input;
                this.windowAssigner = windowAssigner;
-               this.trigger = windowAssigner.getDefaultTrigger();
+               this.trigger = 
windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
        }
 
        /**
@@ -139,12 +140,14 @@ public class AllWindowedStream<T, W extends Window> {
 
                OneInputStreamOperator<T, T> operator;
 
+               boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
+
                if (evictor != null) {
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                        new HeapWindowBuffer.Factory<T>(),
                                        new ReduceAllWindowFunction<W, 
T>(function),
                                        trigger,
-                                       evictor);
+                                       
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
                        // we need to copy because we need our own instance of 
the pre aggregator
@@ -154,7 +157,7 @@ public class AllWindowedStream<T, W extends Window> {
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
                                        new 
PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
                                        new ReduceAllWindowFunction<W, 
T>(function),
-                                       trigger);
+                                       
trigger).enableSetProcessingTime(setProcessingTime);
                }
 
                return input.transform(opName, input.getType(), 
operator).setParallelism(1);
@@ -205,20 +208,22 @@ public class AllWindowedStream<T, W extends Window> {
 
                String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
 
-               OneInputStreamOperator<T, R> operator;
+               NonKeyedWindowOperator<T, R, W> operator;
+
+               boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
                if (evictor != null) {
                        operator = new 
EvictingNonKeyedWindowOperator<>(windowAssigner,
                                        new HeapWindowBuffer.Factory<T>(),
                                        function,
                                        trigger,
-                                       evictor);
+                                       
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
                        operator = new NonKeyedWindowOperator<>(windowAssigner,
                                        new HeapWindowBuffer.Factory<T>(),
                                        function,
-                                       trigger);
+                                       
trigger).enableSetProcessingTime(setProcessingTime);
                }
 
                return input.transform(opName, resultType, 
operator).setParallelism(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0be1d56..ee8b3d2 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,9 +59,7 @@ import 
org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.transformations.StreamTransformation;
 import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.helper.Count;
@@ -72,7 +70,6 @@ import 
org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
@@ -726,13 +723,7 @@ public class DataStream<T> {
         * @param size The size of the window.
         */
        public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime 
size) {
-               AbstractTime actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-               if (actualSize instanceof EventTime) {
-                       return windowAll(TumblingTimeWindows.of(actualSize));
-               } else {
-                       return 
windowAll(TumblingProcessingTimeWindows.of(actualSize));
-               }
+               return windowAll(TumblingTimeWindows.of(size));
        }
 
        /**
@@ -747,14 +738,7 @@ public class DataStream<T> {
         * @param size The size of the window.
         */
        public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime 
size, AbstractTime slide) {
-               AbstractTime actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-               AbstractTime actualSlide = 
slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-               if (actualSize instanceof EventTime) {
-                       return windowAll(SlidingTimeWindows.of(size, slide));
-               } else {
-                       return 
windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
-               }
+               return windowAll(SlidingTimeWindows.of(size, slide));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index edb7981..2e6d7d6 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -32,13 +32,10 @@ import 
org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
@@ -122,13 +119,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * @param size The size of the window.
         */
        public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) 
{
-               AbstractTime actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-               if (actualSize instanceof EventTime) {
-                       return window(TumblingTimeWindows.of(actualSize));
-               } else {
-                       return 
window(TumblingProcessingTimeWindows.of(actualSize));
-               }
+               return window(TumblingTimeWindows.of(size));
        }
 
        /**
@@ -143,14 +134,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
         * @param size The size of the window.
         */
        public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, 
AbstractTime slide) {
-               AbstractTime actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-               AbstractTime actualSlide = 
slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
-               if (actualSize instanceof EventTime) {
-                       return window(SlidingTimeWindows.of(actualSize, 
actualSlide));
-               } else {
-                       return 
window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
-               }
+               return window(SlidingTimeWindows.of(size, slide));
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1273b42..99f7d06 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -32,8 +33,8 @@ import 
org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
@@ -91,7 +92,7 @@ public class WindowedStream<T, K, W extends Window> {
                        WindowAssigner<? super T, W> windowAssigner) {
                this.input = input;
                this.windowAssigner = windowAssigner;
-               this.trigger = windowAssigner.getDefaultTrigger();
+               this.trigger = 
windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
        }
 
        /**
@@ -151,13 +152,15 @@ public class WindowedStream<T, K, W extends Window> {
 
                OneInputStreamOperator<T, T> operator;
 
+               boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
+
                if (evictor != null) {
                        operator = new EvictingWindowOperator<>(windowAssigner,
                                        keySel,
                                        new HeapWindowBuffer.Factory<T>(),
                                        new ReduceWindowFunction<K, W, 
T>(function),
                                        trigger,
-                                       evictor);
+                                       
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
                        // we need to copy because we need our own instance of 
the pre aggregator
@@ -168,7 +171,7 @@ public class WindowedStream<T, K, W extends Window> {
                                        keySel,
                                        new 
PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
                                        new ReduceWindowFunction<K, W, 
T>(function),
-                                       trigger);
+                                       
trigger).enableSetProcessingTime(setProcessingTime);
                }
 
                return input.transform(opName, input.getType(), operator);
@@ -222,7 +225,9 @@ public class WindowedStream<T, K, W extends Window> {
                String opName = "TriggerWindow(" + windowAssigner + ", " + 
trigger + ", " + udfName + ")";
                KeySelector<T, K> keySel = input.getKeySelector();
 
-               OneInputStreamOperator<T, R> operator;
+               WindowOperator<K, T, R, W> operator;
+
+               boolean setProcessingTime = 
input.getExecutionEnvironment().getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime;
 
                if (evictor != null) {
                        operator = new EvictingWindowOperator<>(windowAssigner,
@@ -230,14 +235,14 @@ public class WindowedStream<T, K, W extends Window> {
                                        new HeapWindowBuffer.Factory<T>(),
                                        function,
                                        trigger,
-                                       evictor);
+                                       
evictor).enableSetProcessingTime(setProcessingTime);
 
                } else {
                        operator = new WindowOperator<>(windowAssigner,
                                        keySel,
                                        new HeapWindowBuffer.Factory<T>(),
                                        function,
-                                       trigger);
+                                       
trigger).enableSetProcessingTime(setProcessingTime);;
                }
 
                return input.transform(opName, resultType, operator);
@@ -450,8 +455,8 @@ public class WindowedStream<T, K, W extends Window> {
                        TypeInformation<R> resultType,
                        String functionName) {
 
-               if (windowAssigner instanceof SlidingProcessingTimeWindows && 
trigger instanceof ProcessingTimeTrigger && evictor == null) {
-                       SlidingProcessingTimeWindows timeWindows = 
(SlidingProcessingTimeWindows) windowAssigner;
+               if (windowAssigner instanceof SlidingTimeWindows && trigger 
instanceof ProcessingTimeTrigger && evictor == null) {
+                       SlidingTimeWindows timeWindows = (SlidingTimeWindows) 
windowAssigner;
                        final long windowLength = timeWindows.getSize();
                        final long windowSlide = timeWindows.getSlide();
 
@@ -475,8 +480,8 @@ public class WindowedStream<T, K, W extends Window> {
                                                wf, input.getKeySelector(), 
windowLength, windowSlide);
                                return input.transform(opName, resultType, op);
                        }
-               } else if (windowAssigner instanceof 
TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && 
evictor == null) {
-                       TumblingProcessingTimeWindows timeWindows = 
(TumblingProcessingTimeWindows) windowAssigner;
+               } else if (windowAssigner instanceof TumblingTimeWindows && 
trigger instanceof ProcessingTimeTrigger && evictor == null) {
+                       TumblingTimeWindows timeWindows = (TumblingTimeWindows) 
windowAssigner;
                        final long windowLength = timeWindows.getSize();
                        final long windowSlide = timeWindows.getSize();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c2e2880..cc96217 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -541,11 +541,23 @@ public abstract class StreamExecutionEnvironment {
        /**
         * Sets the time characteristic for all streams create from this 
environment, e.g., processing
         * time, event time, or ingestion time.
+        *
+        * <p>
+        * If you set the characteristic to IngestionTime of EventTime this 
will set a default
+        * watermark update interval of 200 ms. If this is not applicable for 
your application
+        * you should change it using {@link 
ExecutionConfig#setAutoWatermarkInterval(long)}.
         * 
         * @param characteristic The time characteristic.
         */
        public void setStreamTimeCharacteristic(TimeCharacteristic 
characteristic) {
                this.timeCharacteristic = 
Objects.requireNonNull(characteristic);
+               if (characteristic == TimeCharacteristic.ProcessingTime) {
+                       getConfig().disableTimestamps();
+                       getConfig().setAutoWatermarkInterval(0);
+               } else {
+                       getConfig().enableTimestamps();
+                       getConfig().setAutoWatermarkInterval(200);
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 52c8f55..dbeb5ce 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 
@@ -42,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, 
GlobalWindow> {
        }
 
        @Override
-       public Trigger<Object, GlobalWindow> getDefaultTrigger() {
+       public Trigger<Object, GlobalWindow> 
getDefaultTrigger(StreamExecutionEnvironment env) {
                return null;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
deleted file mode 100644
index 65d7641..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@link WindowAssigner} that windows elements into sliding, time-based 
windows. The windowing
- * is based on system time. Windows can possibly overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), 
Time.of(10, SECONDS));
- * } </pre>
- */
-public class SlidingProcessingTimeWindows extends WindowAssigner<Object, 
TimeWindow> {
-       private static final long serialVersionUID = 1L;
-
-       private final long size;
-
-       private final long slide;
-
-       private transient List<TimeWindow> result;
-
-       private SlidingProcessingTimeWindows(long size, long slide) {
-               this.size = size;
-               this.slide = slide;
-               this.result = Lists.newArrayListWithCapacity((int) (size / 
slide));
-       }
-
-       private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-               in.defaultReadObject();
-               this.result = Lists.newArrayListWithCapacity((int) (size / 
slide));
-       }
-
-       @Override
-       public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
-               result.clear();
-               long time = System.currentTimeMillis();
-               long lastStart = time - time % slide;
-               for (long start = lastStart;
-                       start > time - size;
-                       start -= slide) {
-                       result.add(new TimeWindow(start, size));
-               }
-               return result;
-       }
-
-       public long getSize() {
-               return size;
-       }
-
-       public long getSlide() {
-               return slide;
-       }
-
-       @Override
-       public Trigger<Object, TimeWindow> getDefaultTrigger() {
-               return ProcessingTimeTrigger.create();
-       }
-
-       @Override
-       public String toString() {
-               return "SlidingProcessingTimeWindows(" + size + ", " + slide + 
")";
-       }
-
-       /**
-        * Creates a new {@code SlidingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
-        * elements to sliding time windows based on the current processing 
time.
-        *
-        * @param size The size of the generated windows.
-        * @param slide The slide interval of the generated windows.
-        * @return The time policy.
-        */
-       public static SlidingProcessingTimeWindows of(AbstractTime size, 
AbstractTime slide) {
-               return new SlidingProcessingTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 52ae356..6036dfb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -72,8 +75,12 @@ public class SlidingTimeWindows extends 
WindowAssigner<Object, TimeWindow> {
        }
 
        @Override
-       public Trigger<Object, TimeWindow> getDefaultTrigger() {
-               return WatermarkTrigger.create();
+       public Trigger<Object, TimeWindow> 
getDefaultTrigger(StreamExecutionEnvironment env) {
+               if (env.getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime) {
+                       return ProcessingTimeTrigger.create();
+               } else {
+                       return WatermarkTrigger.create();
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
deleted file mode 100644
index 41f6362..0000000
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link WindowAssigner} that windows elements into time-based windows. The 
windowing is
- * based on system time. Windows cannot overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), 
Time.of(10, SECONDS));
- * } </pre>
- */
-public class TumblingProcessingTimeWindows extends WindowAssigner<Object, 
TimeWindow> {
-       private static final long serialVersionUID = 1L;
-
-       private long size;
-
-       private TumblingProcessingTimeWindows(long size) {
-               this.size = size;
-       }
-
-       @Override
-       public Collection<TimeWindow> assignWindows(Object element, long 
timestamp) {
-               long time = System.currentTimeMillis();
-               long start = time - (time % size);
-               return Collections.singletonList(new TimeWindow(start, size));
-       }
-
-       public long getSize() {
-               return size;
-       }
-
-       @Override
-       public Trigger<Object, TimeWindow> getDefaultTrigger() {
-               return ProcessingTimeTrigger.create();
-       }
-
-       @Override
-       public String toString() {
-               return "TumblingProcessingTimeWindows(" + size + ")";
-       }
-
-       /**
-        * Creates a new {@code TumblingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
-        * elements to time windows based on the current processing time.
-        *
-        * @param size The size of the generated windows.
-        * @return The time policy.
-        */
-       public static TumblingProcessingTimeWindows of(AbstractTime size) {
-               return new TumblingProcessingTimeWindows(size.toMilliseconds());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index b6022b3..d57dc33 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -17,7 +17,10 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -58,8 +61,12 @@ public class TumblingTimeWindows extends 
WindowAssigner<Object, TimeWindow> {
        }
 
        @Override
-       public Trigger<Object, TimeWindow> getDefaultTrigger() {
-               return WatermarkTrigger.create();
+       public Trigger<Object, TimeWindow> 
getDefaultTrigger(StreamExecutionEnvironment env) {
+               if (env.getStreamTimeCharacteristic() == 
TimeCharacteristic.ProcessingTime) {
+                       return ProcessingTimeTrigger.create();
+               } else {
+                       return WatermarkTrigger.create();
+               }
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 20fe365..d0b1ed0 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import java.io.Serializable;
@@ -50,5 +51,5 @@ public abstract class WindowAssigner<T, W extends Window> 
implements Serializabl
        /**
         * Returns the default trigger associated with this {@code 
WindowAssigner}.
         */
-       public abstract Trigger<T, W> getDefaultTrigger();
+       public abstract Trigger<T, W> 
getDefaultTrigger(StreamExecutionEnvironment env);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 548afb3..368b8fa 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -347,6 +347,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
        // 
------------------------------------------------------------------------
 
        @VisibleForTesting
+       public boolean isSetProcessingTime() {
+               return setProcessingTime;
+       }
+
+       @VisibleForTesting
        public Trigger<? super IN, ? super W> getTriggerTemplate() {
                return triggerTemplate;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
index c06a608..9ddd6eb 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
@@ -49,8 +49,8 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                testResults = Lists.newArrayList();
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.setParallelism(1);
-               env.getConfig().enableTimestamps();
 
                DataStream<Tuple2<String, Integer>> source1 = env.addSource(new 
SourceFunction<Tuple2<String, Integer>>() {
                        private static final long serialVersionUID = 1L;
@@ -144,8 +144,8 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                testResults = Lists.newArrayList();
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.setParallelism(1);
-               env.getConfig().enableTimestamps();
 
                DataStream<Tuple3<String, String, Integer>> source1 = 
env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
                        private static final long serialVersionUID = 1L;
@@ -239,8 +239,8 @@ public class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
                testResults = Lists.newArrayList();
 
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                env.setParallelism(1);
-               env.getConfig().enableTimestamps();
 
                DataStream<Tuple3<String, String, Integer>> source1 = 
env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
                        private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 09a7149..4fa16ac 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -19,25 +19,25 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
@@ -50,30 +50,33 @@ import java.util.concurrent.TimeUnit;
 public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
        /**
-        * These tests ensure that the fast aligned time windows operator is 
used if the
-        * conditions are right.
-        *
-        * TODO: update once fast aligned time windows operator is in
+        * These tests ensure that the correct trigger is set when using 
event-time windows.
         */
-       @Ignore
        @Test
-       public void testFastTimeWindows() throws Exception {
+       @SuppressWarnings("rawtypes")
+       public void testEventTime() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
                DummyReducer reducer = new DummyReducer();
 
                DataStream<Tuple2<String, Integer>> window1 = source
-                               
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .windowAll(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
-               Assert.assertTrue(operator1 instanceof 
AggregatingProcessingTimeWindowOperator);
+               Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+               NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
+               Assert.assertFalse(winOperator1.isSetProcessingTime());
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
-                               
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
@@ -88,20 +91,26 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
-               Assert.assertTrue(operator2 instanceof 
AccumulatingProcessingTimeWindowOperator);
+               Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+               NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
+               Assert.assertFalse(winOperator2.isSetProcessingTime());
+               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
 
        @Test
        @SuppressWarnings("rawtypes")
        public void testNonEvicting() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
                DummyReducer reducer = new DummyReducer();
 
                DataStream<Tuple2<String, Integer>> window1 = source
-                               
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .windowAll(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .trigger(CountTrigger.of(100))
                                .reduce(reducer);
 
@@ -109,12 +118,13 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
                Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) 
operator1;
+               Assert.assertTrue(winOperator1.isSetProcessingTime());
                Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
-                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
@@ -132,8 +142,9 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
                Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
                NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) 
operator2;
+               Assert.assertTrue(winOperator1.isSetProcessingTime());
                Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
 
@@ -141,13 +152,14 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
        @SuppressWarnings("rawtypes")
        public void testEvicting() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
                DummyReducer reducer = new DummyReducer();
 
                DataStream<Tuple2<String, Integer>> window1 = source
-                               
.windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .windowAll(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
                                .reduce(reducer);
 
@@ -155,13 +167,14 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
                Assert.assertTrue(operator1 instanceof 
EvictingNonKeyedWindowOperator);
                EvictingNonKeyedWindowOperator winOperator1 = 
(EvictingNonKeyedWindowOperator) operator1;
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
ProcessingTimeTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertFalse(winOperator1.isSetProcessingTime());
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
-                               
.windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .windowAll(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
                                .apply(new AllWindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, TimeWindow>() {
@@ -180,8 +193,9 @@ public class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
                Assert.assertTrue(operator2 instanceof 
EvictingNonKeyedWindowOperator);
                EvictingNonKeyedWindowOperator winOperator2 = 
(EvictingNonKeyedWindowOperator) operator2;
+               Assert.assertFalse(winOperator2.isSetProcessingTime());
                Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 5124add..10fe734 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -20,19 +20,20 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
 import 
org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
@@ -59,13 +60,13 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                DummyReducer reducer = new DummyReducer();
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(0)
-                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
-                                               Time.of(100, 
TimeUnit.MILLISECONDS)))
+                               .window(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .reduce(reducer);
 
                OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
@@ -74,7 +75,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
-                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .window(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
 
@@ -92,10 +93,63 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                Assert.assertTrue(operator2 instanceof 
AccumulatingProcessingTimeWindowOperator);
        }
 
+       /**
+        * These tests ensure that the correct trigger is set when using 
event-time windows.
+        */
+       @Test
+       @SuppressWarnings("rawtypes")
+       public void testEventTime() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+               DummyReducer reducer = new DummyReducer();
+
+               DataStream<Tuple2<String, Integer>> window1 = source
+                               .keyBy(0)
+                               .window(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+                               .reduce(reducer);
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window1.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
+               Assert.assertTrue(operator1 instanceof WindowOperator);
+               WindowOperator winOperator1 = (WindowOperator) operator1;
+               Assert.assertFalse(winOperator1.isSetProcessingTime());
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
+               Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+               DataStream<Tuple2<String, Integer>> window2 = source
+                               .keyBy(0)
+                               .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+                               .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+                                       private static final long 
serialVersionUID = 1L;
+
+                                       @Override
+                                       public void apply(Tuple tuple,
+                                                       TimeWindow window,
+                                                       Iterable<Tuple2<String, 
Integer>> values,
+                                                       
Collector<Tuple2<String, Integer>> out) throws Exception {
+
+                                       }
+                               });
+
+               OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, 
Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, 
Tuple2<String, Integer>>) window2.getTransformation();
+               OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
+               Assert.assertTrue(operator2 instanceof WindowOperator);
+               WindowOperator winOperator2 = (WindowOperator) operator2;
+               Assert.assertFalse(winOperator2.isSetProcessingTime());
+               Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
+       }
+
        @Test
        @SuppressWarnings("rawtypes")
        public void testNonEvicting() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -103,7 +157,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(0)
-                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .window(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .trigger(CountTrigger.of(100))
                                .reduce(reducer);
 
@@ -111,13 +165,14 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
                Assert.assertTrue(operator1 instanceof WindowOperator);
                WindowOperator winOperator1 = (WindowOperator) operator1;
+               Assert.assertTrue(winOperator1.isSetProcessingTime());
                Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof PreAggregatingHeapWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
-                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
                                        private static final long 
serialVersionUID = 1L;
@@ -135,8 +190,9 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
                Assert.assertTrue(operator2 instanceof WindowOperator);
                WindowOperator winOperator2 = (WindowOperator) operator2;
+               Assert.assertTrue(winOperator2.isSetProcessingTime());
                Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }
 
@@ -144,6 +200,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
        @SuppressWarnings("rawtypes")
        public void testEvicting() throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
                DataStream<Tuple2<String, Integer>> source = 
env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
 
@@ -151,7 +208,7 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
                DataStream<Tuple2<String, Integer>> window1 = source
                                .keyBy(0)
-                               
.window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), 
Time.of(100, TimeUnit.MILLISECONDS)))
+                               .window(SlidingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
                                .evictor(CountEvictor.of(100))
                                .reduce(reducer);
 
@@ -159,14 +216,15 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator1 = transform1.getOperator();
                Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
                EvictingWindowOperator winOperator1 = (EvictingWindowOperator) 
operator1;
-               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
ProcessingTimeTrigger);
-               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingProcessingTimeWindows);
+               Assert.assertFalse(winOperator1.isSetProcessingTime());
+               Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof 
WatermarkTrigger);
+               Assert.assertTrue(winOperator1.getWindowAssigner() instanceof 
SlidingTimeWindows);
                Assert.assertTrue(winOperator1.getEvictor() instanceof 
CountEvictor);
                Assert.assertTrue(winOperator1.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
 
                DataStream<Tuple2<String, Integer>> window2 = source
                                .keyBy(0)
-                               
.window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+                               .window(TumblingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
                                .trigger(CountTrigger.of(100))
                                .evictor(TimeEvictor.of(Time.of(100, 
TimeUnit.MILLISECONDS)))
                                .apply(new WindowFunction<Tuple2<String, 
Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -185,8 +243,9 @@ public class WindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
                OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, 
Integer>> operator2 = transform2.getOperator();
                Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
                EvictingWindowOperator winOperator2 = (EvictingWindowOperator) 
operator2;
+               Assert.assertFalse(winOperator2.isSetProcessingTime());
                Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof 
CountTrigger);
-               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingProcessingTimeWindows);
+               Assert.assertTrue(winOperator2.getWindowAssigner() instanceof 
TumblingTimeWindows);
                Assert.assertTrue(winOperator2.getEvictor() instanceof 
TimeEvictor);
                Assert.assertTrue(winOperator2.getWindowBufferFactory() 
instanceof HeapWindowBuffer.Factory);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 8abf9d6..5915a7a 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -64,7 +65,7 @@ public class WindowJoin {
 
                // obtain execution environment
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               env.getConfig().enableTimestamps();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 
                // connect to the data sources for grades and salaries
                Tuple2<DataStream<Tuple3<Long, String, Integer>>, 
DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 225dab7..42484e8 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.scala.examples.join
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -45,7 +46,7 @@ object WindowJoin {
     }
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.getConfig.enableTimestamps()
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     //Create streams for grades and salaries by mapping the inputs to the 
corresponding objects
     val grades = setGradesInput(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7babc40..fb4d75d 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -34,7 +34,7 @@ import 
org.apache.flink.streaming.api.scala.function.StatefulFunction
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
 import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, 
TriggerPolicy}
-import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, 
ProcessingTime}
+import org.apache.flink.streaming.api.windowing.time.AbstractTime
 import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
 import org.apache.flink.streaming.util.serialization.SerializationSchema
 import org.apache.flink.util.Collector
@@ -624,20 +624,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @param size The size of the window.
    */
   def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
-    val env = new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = TumblingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case t: ProcessingTime =>
-        val assigner = TumblingProcessingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, 
TimeWindow]]
+    windowAll(assigner)
   }
 
   /**
@@ -651,23 +639,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
    * @param size The size of the window.
    */
   def timeWindowAll(size: AbstractTime, slide: AbstractTime): 
AllWindowedStream[T, TimeWindow] = {
-    val env = new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-    val actualSlide = 
slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = SlidingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case t: ProcessingTime =>
-        val assigner = SlidingProcessingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        windowAll(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = SlidingTimeWindows.of(size, 
slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
+    windowAll(assigner)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 0ce36aa..c605bb1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import 
org.apache.flink.streaming.api.functions.aggregation.SumAggregator
 import 
org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce
 import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, 
EventTime, AbstractTime}
+import org.apache.flink.streaming.api.windowing.time.AbstractTime
 import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -50,20 +50,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
    * @param size The size of the window.
    */
   def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
-    val env = new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = TumblingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case t: ProcessingTime =>
-        val assigner = TumblingProcessingTimeWindows.of(actualSize)
-          .asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, 
TimeWindow]]
+    window(assigner)
   }
 
   /**
@@ -78,23 +66,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) 
extends DataStream[T]
    * @param size The size of the window.
    */
   def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, 
K, TimeWindow] = {
-    val env = new 
StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
-    val actualSize = 
size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-    val actualSlide = 
slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
-    actualSize match {
-      case t: EventTime =>
-        val assigner = SlidingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case t: ProcessingTime =>
-        val assigner = SlidingProcessingTimeWindows.of(
-          actualSize,
-          actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
-        window(assigner)
-      case _ => throw new RuntimeException("Invalid time: " + actualSize)
-    }
+    val assigner = SlidingTimeWindows.of(size, 
slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
+    window(assigner)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7492e48..f767aba 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -299,6 +299,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    * Sets the time characteristic for all streams create from this 
environment, e.g., processing
    * time, event time, or ingestion time.
    *
+   * If you set the characteristic to IngestionTime of EventTime this will set 
a default
+   * watermark update interval of 200 ms. If this is not applicable for your 
application
+   * you should change it using
+   * 
[[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]
+   *
    * @param characteristic The time characteristic.
    */
   def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = 
{

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index dece9f6..99fcd07 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -22,9 +22,10 @@ package org.apache.flink.streaming.api.scala
 import java.util.concurrent.TimeUnit
 
 import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import 
org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows,
 SlidingProcessingTimeWindows}
+import 
org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, 
SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, 
TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
 import 
org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, 
CountTrigger}
@@ -49,13 +50,14 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
   @Test
   def testFastTimeWindows(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val source = env.fromElements(("hello", 1), ("hello", 2))
 
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .reduce(reducer)
@@ -69,7 +71,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
     val window2 = source
       .keyBy(0)
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
@@ -96,7 +98,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
@@ -110,13 +112,13 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
     assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
-    
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
 
 
     val window2 = source
-      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
       def apply(
@@ -133,7 +135,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
     val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
     assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
-    
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
 
@@ -146,7 +148,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     val reducer = new DummyReducer
 
     val window1 = source
-      .windowAll(SlidingProcessingTimeWindows.of(
+      .windowAll(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
@@ -161,12 +163,12 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     val winOperator1 = 
operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
     
assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
-    
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
 
 
     val window2 = source
-      .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, 
TimeUnit.SECONDS)))
+      .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
       .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
@@ -185,7 +187,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
     val winOperator2 = 
operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
     assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 7232309..3f6e10f 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.TimestampExtractor
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -38,9 +39,9 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
     CoGroupJoinITCase.testResults = mutable.MutableList()
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
-    env.getConfig.enableTimestamps
-    
+
     val source1 = env.addSource(new SourceFunction[(String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
         ctx.collect(("a", 0))
@@ -101,8 +102,8 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
     CoGroupJoinITCase.testResults = mutable.MutableList()
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
-    env.getConfig.enableTimestamps
 
     val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
@@ -177,8 +178,8 @@ class CoGroupJoinITCase extends 
StreamingMultipleProgramsTestBase {
     CoGroupJoinITCase.testResults = mutable.MutableList()
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(1)
-    env.getConfig.enableTimestamps
 
     val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
       def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {

Reply via email to