This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f0d07633a5c8e6511f3d16e04561cb277b65407
Author: Matthias Pohl <[email protected]>
AuthorDate: Mon Jul 10 19:11:57 2023 +0200

    [FLINK-32570][streaming] Deprecates 
org.apache.flink.streaming.api.windowing.time.Time-related APIs in favor of 
Duration
    
    Signed-off-by: Matthias Pohl <[email protected]>
---
 .../flink/streaming/examples/join/WindowJoin.java  |   3 +-
 .../examples/sideoutput/SideOutputExample.java     |   3 +-
 .../examples/socket/SocketWindowWordCount.java     |   5 +-
 .../GroupedProcessingTimeWindowExample.java        |   5 +-
 .../examples/windowing/SessionWindowing.java       |   3 +-
 .../examples/windowing/TopSpeedWindowing.java      |   4 +-
 .../apache/flink/cep/scala/pattern/Pattern.scala   |  20 +++-
 .../flink/cep/scala/pattern/PatternTest.scala      |   6 +-
 .../cep/functions/TimedOutPartialMatchHandler.java |   6 +-
 .../apache/flink/cep/nfa/compiler/NFACompiler.java |  43 ++++---
 .../java/org/apache/flink/cep/pattern/Pattern.java | 131 +++++++++++++++++++--
 .../org/apache/flink/cep/pattern/Quantifier.java   |  29 ++++-
 .../apache/flink/cep/operator/CEPOperatorTest.java |   8 +-
 .../operator/CepProcessFunctionContextTest.java    |   4 +-
 .../api/datastream/AllWindowedStream.java          |  21 +++-
 .../streaming/api/datastream/CoGroupedStreams.java |  63 +++++++++-
 .../streaming/api/datastream/JoinedStreams.java    |  66 +++++++++--
 .../streaming/api/datastream/KeyedStream.java      |  31 +++--
 .../streaming/api/datastream/WindowedStream.java   |  19 ++-
 .../environment/StreamExecutionEnvironment.java    |  10 +-
 .../BoundedOutOfOrdernessTimestampExtractor.java   |  25 ++--
 .../assigners/EventTimeSessionWindows.java         |  18 ++-
 .../assigners/ProcessingTimeSessionWindows.java    |  18 ++-
 .../assigners/SlidingEventTimeWindows.java         |  47 +++++++-
 .../assigners/SlidingProcessingTimeWindows.java    |  44 ++++++-
 .../assigners/TumblingEventTimeWindows.java        |  63 +++++++++-
 .../assigners/TumblingProcessingTimeWindows.java   |  65 +++++++++-
 .../api/windowing/evictors/TimeEvictor.java        |  30 ++++-
 .../flink/streaming/api/windowing/time/Time.java   |  20 ++++
 .../triggers/ContinuousEventTimeTrigger.java       |  16 ++-
 .../triggers/ContinuousProcessingTimeTrigger.java  |  16 ++-
 .../operators/windowing/WindowOperatorBuilder.java |   9 +-
 .../api/datastream/CoGroupedStreamsTest.java       |  14 +--
 .../api/datastream/JoinedStreamsTest.java          |  19 +--
 .../planner/match/PatternTranslatorTestBase.scala  |   6 +-
 35 files changed, 754 insertions(+), 136 deletions(-)

diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 3b6d6c2bd89..abd44392a60 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.time.Duration;
 
@@ -122,7 +121,7 @@ public class WindowJoin {
         return grades.join(salaries)
                 .where(new NameKeySelector())
                 .equalTo(new NameKeySelector())
-                
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
+                
.window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize)))
                 .apply(
                         new JoinFunction<
                                 Tuple2<String, Integer>,
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
index 13dc391e817..6e33f5d9ffc 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
@@ -39,7 +39,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
@@ -107,7 +106,7 @@ public class SideOutputExample {
         DataStream<Tuple2<String, Integer>> counts =
                 tokenized
                         .keyBy(value -> value.f0)
-                        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
+                        
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
                         // group by the tuple field "0" and sum up tuple field 
"1"
                         .sum(1);
 
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index 76fb37f321f..584183a3ba9 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -24,7 +24,8 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.time.Duration;
 
 /**
  * Implements a streaming windowed version of the "WordCount" program.
@@ -77,7 +78,7 @@ public class SocketWindowWordCount {
                                         },
                                 Types.POJO(WordWithCount.class))
                         .keyBy(value -> value.word)
-                        
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
+                        
.window(TumblingProcessingTimeWindows.of(Duration.ofSeconds(5)))
                         .reduce((a, b) -> new WordWithCount(a.word, a.count + 
b.count))
                         .returns(WordWithCount.class);
 
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index c8020b7d1c3..9db4e35a172 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -31,10 +31,11 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import 
org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /** An example of grouped stream windowing into sliding time windows. */
 public class GroupedProcessingTimeWindowExample {
 
@@ -60,7 +61,7 @@ public class GroupedProcessingTimeWindowExample {
         stream.keyBy(value -> value.f0)
                 .window(
                         SlidingProcessingTimeWindows.of(
-                                Time.milliseconds(2500), 
Time.milliseconds(500)))
+                                Duration.ofMillis(2500), 
Duration.ofMillis(500)))
                 .reduce(new SummingReducer())
 
                 // alternative: use a apply function which does not 
pre-aggregate
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 34b3dd7d285..5708a7ae90e 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import 
org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -85,7 +84,7 @@ public class SessionWindowing {
         // We create sessions for each id with max timeout of 3 time units
         DataStream<Tuple3<String, Long, Integer>> aggregated =
                 source.keyBy(value -> value.f0)
-                        
.window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
+                        
.window(EventTimeSessionWindows.withGap(Duration.ofMillis(3L)))
                         .sum(2);
 
         if (fileOutput) {
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 3cca4cc8f8f..3d12511ac03 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -36,13 +36,11 @@ import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
 import org.apache.flink.streaming.examples.windowing.util.CarGeneratorFunction;
 import org.apache.flink.streaming.examples.wordcount.util.CLI;
 
 import java.time.Duration;
-import java.util.concurrent.TimeUnit;
 
 /**
  * An example of grouped stream windowing where different eviction and trigger 
policies can be used.
@@ -130,7 +128,7 @@ public class TopSpeedWindowing {
                                         .withTimestampAssigner((car, ts) -> 
car.f3))
                         .keyBy(value -> value.f0)
                         .window(GlobalWindows.create())
-                        .evictor(TimeEvictor.of(Time.of(evictionSec, 
TimeUnit.SECONDS)))
+                        
.evictor(TimeEvictor.of(Duration.ofSeconds(evictionSec)))
                         .trigger(
                                 DeltaTrigger.of(
                                         triggerMeters,
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index cbbe04d9adc..04606368a03 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -25,6 +25,8 @@ import 
org.apache.flink.cep.pattern.conditions.IterativeCondition.{Context => JC
 import org.apache.flink.cep.scala.conditions.Context
 import org.apache.flink.streaming.api.windowing.time.Time
 
+import java.time.Duration
+
 /**
  * Base class for a pattern definition.
  *
@@ -64,8 +66,14 @@ class Pattern[T, F <: T](jPattern: JPattern[T, F]) {
   def getName: String = jPattern.getName
 
   /** @return Window length in which the pattern match has to occur */
+  @deprecated(message = "Use getWindowSize", since = "1.19.0")
   def getWindowTime: Option[Time] = {
-    Option(jPattern.getWindowTime)
+    getWindowSize.map(Time.of)
+  }
+
+  /** @return Window length in which the pattern match has to occur */
+  def getWindowSize: Option[Duration] = {
+    Option(jPattern.getWindowSize.orElse(null))
   }
 
   /** @return currently applied quantifier to this pattern */
@@ -253,17 +261,23 @@ class Pattern[T, F <: T](jPattern: JPattern[T, F]) {
     until(condFun)
   }
 
+  @deprecated(message = "Use within(Duration)", since = "1.19.0")
+  def within(windowTime: Time): Pattern[T, F] = {
+    jPattern.within(Time.toDuration(windowTime))
+    this
+  }
+
   /**
    * Defines the maximum time interval in which a matching pattern has to be 
completed in order to
    * be considered valid. This interval corresponds to the maximum time gap 
between first and the
    * last event.
    *
    * @param windowTime
-   *   Time of the matching window
+   *   Duration of the matching window
    * @return
    *   The same pattern operator with the new window length
    */
-  def within(windowTime: Time): Pattern[T, F] = {
+  def within(windowTime: Duration): Pattern[T, F] = {
     jPattern.within(windowTime)
     this
   }
diff --git 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index 23453c5d167..170eb1fae46 100644
--- 
a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ 
b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -202,9 +202,9 @@ class PatternTest {
     && threeWayEquals(pattern.getName, pattern.wrappedPattern.getName, 
jPattern.getName)
     // check equal time windows
     && threeWayEquals(
-      pattern.getWindowTime.orNull,
-      pattern.wrappedPattern.getWindowTime,
-      jPattern.getWindowTime)
+      pattern.getWindowSize.orNull,
+      pattern.wrappedPattern.getWindowSize.orElse(null),
+      jPattern.getWindowSize.orElse(null))
     // check congruent class names / types
     && threeWayEquals(
       pattern.getClass.getSimpleName,
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
index ea790e9fc48..ae222d4e0e2 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
@@ -19,8 +19,8 @@
 package org.apache.flink.cep.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.windowing.time.Time;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 
@@ -42,8 +42,8 @@ public interface TimedOutPartialMatchHandler<IN> {
 
     /**
      * Called for every timed out partial match (due to {@link
-     * org.apache.flink.cep.pattern.Pattern#within(Time)}). It enables custom 
handling, e.g. one can
-     * emit the timed out results through a side output:
+     * org.apache.flink.cep.pattern.Pattern#within(Duration)}). It enables 
custom handling, e.g. one
+     * can emit the timed out results through a side output:
      *
      * <pre>{@code
      * private final OutputTag<T> timedOutPartialMatchesTag = ...
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 6fd9fa9f2ce..331b2d97033 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -34,9 +34,9 @@ import 
org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.RichAndCondition;
 import org.apache.flink.cep.pattern.conditions.RichNotCondition;
-import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -318,8 +318,7 @@ public class NFACompiler {
          */
         private State<T> createEndingState() {
             State<T> endState = createState(ENDING_STATE_NAME, 
State.StateType.Final);
-            windowTime =
-                    
Optional.ofNullable(currentPattern.getWindowTime()).map(Time::toMilliseconds);
+            windowTime = 
currentPattern.getWindowSize().map(Duration::toMillis);
             return endState;
         }
 
@@ -336,7 +335,7 @@ public class NFACompiler {
                 if (currentPattern.getQuantifier().getConsumingStrategy()
                         == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                     // skip notFollow patterns, they are converted into edge 
conditions
-                    if 
((currentPattern.getWindowTime(WithinType.PREVIOUS_AND_CURRENT) != null
+                    if 
((currentPattern.getWindowSize(WithinType.PREVIOUS_AND_CURRENT).isPresent()
                                     || getWindowTime() > 0)
                             && lastSink.isFinal()) {
                         final State<T> notFollow = 
createState(State.StateType.Pending, true);
@@ -370,12 +369,15 @@ public class NFACompiler {
                 followingPattern = currentPattern;
                 currentPattern = currentPattern.getPrevious();
 
-                final Time currentWindowTime = currentPattern.getWindowTime();
-                if (currentWindowTime != null
-                        && currentWindowTime.toMilliseconds() < 
windowTime.orElse(Long.MAX_VALUE)) {
-                    // the window time is the global minimum of all window 
times of each state
-                    windowTime = 
Optional.of(currentWindowTime.toMilliseconds());
-                }
+                // the window time is the global minimum of all window times 
of each state
+                currentPattern
+                        .getWindowSize()
+                        .map(Duration::toMillis)
+                        .filter(
+                                windowSizeInMillis ->
+                                        windowSizeInMillis < 
windowTime.orElse(Long.MAX_VALUE))
+                        .ifPresent(
+                                windowSizeInMillis -> windowTime = 
Optional.of(windowSizeInMillis));
             }
             return lastSink;
         }
@@ -422,13 +424,20 @@ public class NFACompiler {
             State<T> state = createState(currentPattern.getName(), stateType);
             if (isTake) {
                 Times times = currentPattern.getTimes();
-                Time windowTime = 
currentPattern.getWindowTime(WithinType.PREVIOUS_AND_CURRENT);
-                if (times == null && windowTime != null) {
-                    windowTimes.put(state.getName(), 
windowTime.toMilliseconds());
-                } else if (times != null
-                        && times.getWindowTime() != null
-                        && state.getName().contains(STATE_NAME_DELIM)) {
-                    windowTimes.put(state.getName(), 
times.getWindowTime().toMilliseconds());
+                Optional<Duration> windowSize =
+                        
currentPattern.getWindowSize(WithinType.PREVIOUS_AND_CURRENT);
+                if (times == null) {
+                    windowSize
+                            .map(Duration::toMillis)
+                            .ifPresent(
+                                    windowSizeInMillis ->
+                                            windowTimes.put(state.getName(), 
windowSizeInMillis));
+                } else if (state.getName().contains(STATE_NAME_DELIM)) {
+                    times.getWindowSize()
+                            .map(Duration::toMillis)
+                            .ifPresent(
+                                    windowSizeInMillis ->
+                                            windowTimes.put(state.getName(), 
windowSizeInMillis));
                 }
             }
             return state;
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index e5a9a78976c..0b41224fdd2 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -34,8 +34,10 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Base class for a pattern definition.
@@ -64,7 +66,7 @@ public class Pattern<T, F extends T> {
     private IterativeCondition<F> condition;
 
     /** Window length in which the pattern match has to occur. */
-    private final Map<WithinType, Time> windowTimes = new HashMap<>();
+    private final Map<WithinType, Duration> windowTimes = new HashMap<>();
 
     /**
      * A quantifier for the pattern. By default set to {@link 
Quantifier#one(ConsumingStrategy)}.
@@ -102,12 +104,26 @@ public class Pattern<T, F extends T> {
         return name;
     }
 
+    /** @deprecated Use {@link #getWindowSize()} */
+    @Deprecated
+    @Nullable
     public Time getWindowTime() {
-        return windowTimes.get(WithinType.FIRST_AND_LAST);
+        return getWindowSize().map(Time::of).orElse(null);
     }
 
+    public Optional<Duration> getWindowSize() {
+        return getWindowSize(WithinType.FIRST_AND_LAST);
+    }
+
+    /** @deprecated Use {@link #getWindowSize(WithinType)}. */
+    @Deprecated
+    @Nullable
     public Time getWindowTime(WithinType withinType) {
-        return windowTimes.get(withinType);
+        return getWindowSize(withinType).map(Time::of).orElse(null);
+    }
+
+    public Optional<Duration> getWindowSize(WithinType withinType) {
+        return Optional.ofNullable(windowTimes.get(withinType));
     }
 
     public Quantifier getQuantifier() {
@@ -243,6 +259,20 @@ public class Pattern<T, F extends T> {
         return this;
     }
 
+    /**
+     * Defines the maximum time interval in which a matching pattern has to be 
completed in order to
+     * be considered valid. This interval corresponds to the maximum time gap 
between first and the
+     * last event.
+     *
+     * @param windowTime Time of the matching window
+     * @return The same pattern operator with the new window length
+     * @deprecated Use {@link #within(Duration)}.
+     */
+    @Deprecated
+    public Pattern<T, F> within(@Nullable Time windowTime) {
+        return within(Time.toDuration(windowTime));
+    }
+
     /**
      * Defines the maximum time interval in which a matching pattern has to be 
completed in order to
      * be considered valid. This interval corresponds to the maximum time gap 
between first and the
@@ -251,7 +281,7 @@ public class Pattern<T, F extends T> {
      * @param windowTime Time of the matching window
      * @return The same pattern operator with the new window length
      */
-    public Pattern<T, F> within(Time windowTime) {
+    public Pattern<T, F> within(@Nullable Duration windowTime) {
         return within(windowTime, WithinType.FIRST_AND_LAST);
     }
 
@@ -262,8 +292,22 @@ public class Pattern<T, F extends T> {
      * @param withinType Type of the within interval between events
      * @param windowTime Time of the matching window
      * @return The same pattern operator with the new window length
+     * @deprecated Use {@link #within(Duration, WithinType)}.
      */
-    public Pattern<T, F> within(Time windowTime, WithinType withinType) {
+    @Deprecated
+    public Pattern<T, F> within(@Nullable Time windowTime, WithinType 
withinType) {
+        return within(Time.toDuration(windowTime), withinType);
+    }
+
+    /**
+     * Defines the maximum time interval in which a matching pattern has to be 
completed in order to
+     * be considered valid. This interval corresponds to the maximum time gap 
between events.
+     *
+     * @param withinType Type of the within interval between events
+     * @param windowTime Time of the matching window
+     * @return The same pattern operator with the new window length
+     */
+    public Pattern<T, F> within(@Nullable Duration windowTime, WithinType 
withinType) {
         if (windowTime != null) {
             windowTimes.put(withinType, windowTime);
         }
@@ -369,7 +413,7 @@ public class Pattern<T, F extends T> {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> oneOrMore() {
-        return oneOrMore(null);
+        return oneOrMore((Duration) null);
     }
 
     /**
@@ -385,8 +429,28 @@ public class Pattern<T, F extends T> {
      * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
      *     applied.
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     * @deprecated Use {@link #oneOrMore(Duration)}
      */
+    @Deprecated
     public Pattern<T, F> oneOrMore(@Nullable Time windowTime) {
+        return oneOrMore(Time.toDuration(windowTime));
+    }
+
+    /**
+     * Specifies that this pattern can occur {@code one or more} times and 
time interval corresponds
+     * to the maximum time gap between previous and current event for each 
times. This means at
+     * least one and at most infinite number of events can be matched to this 
pattern.
+     *
+     * <p>If this quantifier is enabled for a pattern {@code 
A.oneOrMore().followedBy(B)} and a
+     * sequence of events {@code A1 A2 B} appears, this will generate 
patterns: {@code A1 B} and
+     * {@code A1 A2 B}. See also {@link #allowCombinations()}.
+     *
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
+     *     applied.
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> oneOrMore(@Nullable Duration windowTime) {
         checkIfNoNotPattern();
         checkIfQuantifierApplied();
         this.quantifier = 
Quantifier.looping(quantifier.getConsumingStrategy());
@@ -416,7 +480,7 @@ public class Pattern<T, F extends T> {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> times(int times) {
-        return times(times, null);
+        return times(times, (Duration) null);
     }
 
     /**
@@ -427,8 +491,23 @@ public class Pattern<T, F extends T> {
      * @param windowTime time of the matching window between times
      * @return The same pattern with number of times applied
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     * @deprecated Using {@link #times(int, Duration)}
      */
+    @Deprecated
     public Pattern<T, F> times(int times, @Nullable Time windowTime) {
+        return times(times, Time.toDuration(windowTime));
+    }
+
+    /**
+     * Specifies exact number of times that this pattern should be matched and 
time interval
+     * corresponds to the maximum time gap between previous and current event 
for each times.
+     *
+     * @param times number of times matching event must appear
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with number of times applied
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> times(int times, @Nullable Duration windowTime) {
         checkIfNoNotPattern();
         checkIfQuantifierApplied();
         Preconditions.checkArgument(times > 0, "You should give a positive 
number greater than 0.");
@@ -446,7 +525,7 @@ public class Pattern<T, F extends T> {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> times(int from, int to) {
-        return times(from, to, null);
+        return times(from, to, (Duration) null);
     }
 
     /**
@@ -458,8 +537,24 @@ public class Pattern<T, F extends T> {
      * @param windowTime time of the matching window between times
      * @return The same pattern with the number of times range applied
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     * @deprecated Use {@link #times(int, int, Duration)}
      */
+    @Deprecated
     public Pattern<T, F> times(int from, int to, @Nullable Time windowTime) {
+        return times(from, to, Time.toDuration(windowTime));
+    }
+
+    /**
+     * Specifies that the pattern can occur between from and to times with 
time interval corresponds
+     * to the maximum time gap between previous and current event for each 
times.
+     *
+     * @param from number of times matching event must appear at least
+     * @param to number of times matching event must appear at most
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with the number of times range applied
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> times(int from, int to, @Nullable Duration 
windowTime) {
         checkIfNoNotPattern();
         checkIfQuantifierApplied();
         this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
@@ -480,7 +575,7 @@ public class Pattern<T, F extends T> {
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
      */
     public Pattern<T, F> timesOrMore(int times) {
-        return timesOrMore(times, null);
+        return timesOrMore(times, (Duration) null);
     }
 
     /**
@@ -494,8 +589,26 @@ public class Pattern<T, F extends T> {
      * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
      *     applied.
      * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     * @deprecated Use {@link #timesOrMore(int, Duration)}
      */
+    @Deprecated
     public Pattern<T, F> timesOrMore(int times, @Nullable Time windowTime) {
+        return timesOrMore(times, Time.toDuration(windowTime));
+    }
+
+    /**
+     * Specifies that this pattern can occur the specified times at least with 
interval corresponds
+     * to the maximum time gap between previous and current event for each 
times. This means at
+     * least the specified times and at most infinite number of events can be 
matched to this
+     * pattern.
+     *
+     * @param times number of times at least matching event must appear
+     * @param windowTime time of the matching window between times
+     * @return The same pattern with a {@link 
Quantifier#looping(ConsumingStrategy)} quantifier
+     *     applied.
+     * @throws MalformedPatternException if the quantifier is not applicable 
to this pattern.
+     */
+    public Pattern<T, F> timesOrMore(int times, @Nullable Duration windowTime) 
{
         checkIfNoNotPattern();
         checkIfQuantifierApplied();
         this.quantifier = 
Quantifier.looping(quantifier.getConsumingStrategy());
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index d929368936a..2e5efd48dd5 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -23,8 +23,10 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.EnumSet;
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * A quantifier describing the Pattern. There are three main groups of {@link 
Quantifier}.
@@ -190,9 +192,9 @@ public class Quantifier {
     public static class Times {
         private final int from;
         private final int to;
-        private final @Nullable Time windowTime;
+        private final @Nullable Duration windowTime;
 
-        private Times(int from, int to, @Nullable Time windowTime) {
+        private Times(int from, int to, @Nullable Duration windowTime) {
             Preconditions.checkArgument(
                     from > 0, "The from should be a positive number greater 
than 0.");
             Preconditions.checkArgument(
@@ -211,15 +213,33 @@ public class Quantifier {
             return to;
         }
 
+        /** @deprecated Use {@link #getWindowSize()}. */
+        @Deprecated
         public Time getWindowTime() {
-            return windowTime;
+            return getWindowSize().map(Time::of).orElse(null);
         }
 
+        public Optional<Duration> getWindowSize() {
+            return Optional.ofNullable(windowTime);
+        }
+
+        /** @deprecated Use {@link #of(int, int, Duration)} */
+        @Deprecated
         public static Times of(int from, int to, @Nullable Time windowTime) {
+            return of(from, to, Time.toDuration(windowTime));
+        }
+
+        public static Times of(int from, int to, @Nullable Duration 
windowTime) {
             return new Times(from, to, windowTime);
         }
 
+        /** @deprecated Use {@link #of(int, Duration)} */
+        @Deprecated
         public static Times of(int times, @Nullable Time windowTime) {
+            return of(times, Time.toDuration(windowTime));
+        }
+
+        public static Times of(int times, @Nullable Duration windowTime) {
             return new Times(times, times, windowTime);
         }
 
@@ -237,8 +257,7 @@ public class Quantifier {
                     && ((windowTime == null && times.windowTime == null)
                             || (windowTime != null
                                     && times.windowTime != null
-                                    && windowTime.toMilliseconds()
-                                            == 
times.windowTime.toMilliseconds()));
+                                    && windowTime.toMillis() == 
times.windowTime.toMillis()));
         }
 
         @Override
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index a065f01655c..6a9786b0125 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -44,7 +44,6 @@ import org.apache.flink.mock.Whitebox;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -62,6 +61,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.Mockito;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -1290,7 +1290,7 @@ public class CEPOperatorTest extends TestLogger {
                             .where(SimpleCondition.of(value -> 
value.getName().equals("end")))
                             // add a window timeout to test whether timestamps 
of elements in the
                             // priority queue in CEP operator are correctly 
checkpointed/restored
-                            .within(Time.milliseconds(10L));
+                            .within(Duration.ofMillis(10L));
 
             return NFACompiler.compileFactory(pattern, 
handleTimeout).createNFA();
         }
@@ -1325,7 +1325,7 @@ public class CEPOperatorTest extends TestLogger {
                             .optional()
                             .followedBy("end")
                             .where(SimpleCondition.of(value -> 
value.getName().equals("a")))
-                            .within(Time.milliseconds(10L));
+                            .within(Duration.ofMillis(10L));
 
             return NFACompiler.compileFactory(pattern, 
handleTimeout).createNFA();
         }
@@ -1355,7 +1355,7 @@ public class CEPOperatorTest extends TestLogger {
                             .where(SimpleCondition.of(value -> 
value.getName().equals("a")))
                             .followedBy("end")
                             .where(SimpleCondition.of(value -> 
value.getName().equals("b")))
-                            .within(Time.milliseconds(10L));
+                            .within(Duration.ofMillis(10L));
 
             return NFACompiler.compileFactory(pattern, 
handleTimeout).createNFA();
         }
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
index ab4e006baba..cce6541744b 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
@@ -32,6 +31,7 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
@@ -418,7 +418,7 @@ public class CepProcessFunctionContextTest extends 
TestLogger {
         public NFA<Event> createNFA() {
 
             Pattern<Event, ?> pattern =
-                    
Pattern.<Event>begin("1").next("2").within(Time.milliseconds(10));
+                    
Pattern.<Event>begin("1").next("2").within(Duration.ofMillis(10));
 
             return NFACompiler.compileFactory(pattern, true).createNFA();
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 639d2af5f64..ce475847c0d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -62,6 +62,8 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
+import java.time.Duration;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -132,10 +134,25 @@ public class AllWindowedStream<T, W extends Window> {
      * is {@code 0L}.
      *
      * <p>Setting an allowed lateness is only valid for event-time windows.
+     *
+     * @deprecated Use {@link #allowedLateness(Duration)}, instead.
      */
+    @Deprecated
     @PublicEvolving
     public AllWindowedStream<T, W> allowedLateness(Time lateness) {
-        final long millis = lateness.toMilliseconds();
+        return allowedLateness(lateness.toDuration());
+    }
+
+    /**
+     * Sets the time by which elements are allowed to be late. Elements that 
arrive behind the
+     * watermark by more than the specified time will be dropped. By default, 
the allowed lateness
+     * is {@code 0L}.
+     *
+     * <p>Setting an allowed lateness is only valid for event-time windows.
+     */
+    @PublicEvolving
+    public AllWindowedStream<T, W> allowedLateness(Duration lateness) {
+        final long millis = lateness.toMillis();
         checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
 
         this.allowedLateness = millis;
@@ -145,7 +162,7 @@ public class AllWindowedStream<T, W extends Window> {
     /**
      * Send late arriving data to the side output identified by the given 
{@link OutputTag}. Data is
      * considered late after the watermark has passed the end of the window 
plus the allowed
-     * lateness set using {@link #allowedLateness(Time)}.
+     * lateness set using {@link #allowedLateness(Duration)}.
      *
      * <p>You can get the stream of late data using {@link
      * SingleOutputStreamOperator#getSideOutput(OutputTag)} on the {@link
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index ce328dc26d7..b9a253d4e25 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -42,10 +42,14 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 
 import static java.util.Objects.requireNonNull;
 
@@ -68,7 +72,7 @@ import static java.util.Objects.requireNonNull;
  * DataStream<T> result = one.coGroup(two)
  *     .where(new MyFirstKeySelector())
  *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
  *     .apply(new MyCoGroupFunction());
  * }</pre>
  */
@@ -201,7 +205,7 @@ public class CoGroupedStreams<T1, T2> {
                         assigner,
                         null,
                         null,
-                        null);
+                        (Duration) null);
             }
         }
     }
@@ -233,10 +237,15 @@ public class CoGroupedStreams<T1, T2> {
 
         private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
 
-        private final Time allowedLateness;
+        @Nullable private final Duration allowedLateness;
 
         private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
 
+        /**
+         * @deprecated Use {@link WithWindow#WithWindow(DataStream, 
DataStream, KeySelector,
+         *     KeySelector, TypeInformation, WindowAssigner, Trigger, Evictor, 
Duration)}
+         */
+        @Deprecated
         protected WithWindow(
                 DataStream<T1> input1,
                 DataStream<T2> input2,
@@ -246,7 +255,29 @@ public class CoGroupedStreams<T1, T2> {
                 WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                 Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                 Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
-                Time allowedLateness) {
+                @Nullable Time allowedLateness) {
+            this(
+                    input1,
+                    input2,
+                    keySelector1,
+                    keySelector2,
+                    keyType,
+                    windowAssigner,
+                    trigger,
+                    evictor,
+                    Time.toDuration(allowedLateness));
+        }
+
+        protected WithWindow(
+                DataStream<T1> input1,
+                DataStream<T2> input2,
+                KeySelector<T1, KEY> keySelector1,
+                KeySelector<T2, KEY> keySelector2,
+                TypeInformation<KEY> keyType,
+                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
+                @Nullable Duration allowedLateness) {
             this.input1 = input1;
             this.input2 = input2;
 
@@ -303,9 +334,21 @@ public class CoGroupedStreams<T1, T2> {
          * Sets the time by which elements are allowed to be late.
          *
          * @see WindowedStream#allowedLateness(Time)
+         * @deprecated Use {@link #allowedLateness(Duration)}
          */
+        @Deprecated
         @PublicEvolving
-        public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
+        public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Time 
newLateness) {
+            return allowedLateness(Time.toDuration(newLateness));
+        }
+
+        /**
+         * Sets the time by which elements are allowed to be late.
+         *
+         * @see WindowedStream#allowedLateness(Duration)
+         */
+        @PublicEvolving
+        public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Duration 
newLateness) {
             return new WithWindow<>(
                     input1,
                     input2,
@@ -421,9 +464,17 @@ public class CoGroupedStreams<T1, T2> {
             return (SingleOutputStreamOperator<T>) apply(function, resultType);
         }
 
+        /** @deprecated Use {@link #getAllowedLatenessDuration()} */
+        @Deprecated
         @VisibleForTesting
+        @Nullable
         Time getAllowedLateness() {
-            return allowedLateness;
+            return getAllowedLatenessDuration().map(Time::of).orElse(null);
+        }
+
+        @VisibleForTesting
+        Optional<Duration> getAllowedLatenessDuration() {
+            return Optional.ofNullable(allowedLateness);
         }
 
         @VisibleForTesting
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index 83c7ac82c87..9e64d93596a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -35,6 +35,11 @@ import 
org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Optional;
+
 import static java.util.Objects.requireNonNull;
 
 /**
@@ -56,7 +61,7 @@ import static java.util.Objects.requireNonNull;
  * DataStream<T> result = one.join(two)
  *     .where(new MyFirstKeySelector())
  *     .equalTo(new MyFirstKeySelector())
- *     .window(TumblingEventTimeWindows.of(Time.of(5, TimeUnit.SECONDS)))
+ *     .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
  *     .apply(new MyJoinFunction());
  * }</pre>
  */
@@ -187,7 +192,7 @@ public class JoinedStreams<T1, T2> {
                         assigner,
                         null,
                         null,
-                        null);
+                        (Duration) null);
             }
         }
     }
@@ -219,10 +224,38 @@ public class JoinedStreams<T1, T2> {
 
         private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
 
-        private final Time allowedLateness;
+        @Nullable private final Duration allowedLateness;
 
         private CoGroupedStreams.WithWindow<T1, T2, KEY, W> 
coGroupedWindowedStream;
 
+        /**
+         * @deprecated Use {@link WithWindow#WithWindow(DataStream, 
DataStream, KeySelector,
+         *     KeySelector, TypeInformation, WindowAssigner, Trigger, Evictor, 
Duration)}.
+         */
+        @Deprecated
+        @PublicEvolving
+        protected WithWindow(
+                DataStream<T1> input1,
+                DataStream<T2> input2,
+                KeySelector<T1, KEY> keySelector1,
+                KeySelector<T2, KEY> keySelector2,
+                TypeInformation<KEY> keyType,
+                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
+                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
+                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
+                @Nullable Time allowedLateness) {
+            this(
+                    input1,
+                    input2,
+                    keySelector1,
+                    keySelector2,
+                    keyType,
+                    windowAssigner,
+                    trigger,
+                    evictor,
+                    Time.toDuration(allowedLateness));
+        }
+
         @PublicEvolving
         protected WithWindow(
                 DataStream<T1> input1,
@@ -233,7 +266,7 @@ public class JoinedStreams<T1, T2> {
                 WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                 Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                 Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
-                Time allowedLateness) {
+                @Nullable Duration allowedLateness) {
 
             this.input1 = requireNonNull(input1);
             this.input2 = requireNonNull(input2);
@@ -291,10 +324,22 @@ public class JoinedStreams<T1, T2> {
         /**
          * Sets the time by which elements are allowed to be late.
          *
-         * @see WindowedStream#allowedLateness(Time)
+         * @see WindowedStream#allowedLateness(Duration)
+         * @deprecated Use {@link #allowedLateness(Duration)}.
+         */
+        @Deprecated
+        @PublicEvolving
+        public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Time 
newLateness) {
+            return allowedLateness(Time.toDuration(newLateness));
+        }
+
+        /**
+         * Sets the time by which elements are allowed to be late.
+         *
+         * @see WindowedStream#allowedLateness(Duration)
          */
         @PublicEvolving
-        public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
+        public WithWindow<T1, T2, KEY, W> allowedLateness(@Nullable Duration 
newLateness) {
             return new WithWindow<>(
                     input1,
                     input2,
@@ -481,9 +526,16 @@ public class JoinedStreams<T1, T2> {
             return (SingleOutputStreamOperator<T>) apply(function, resultType);
         }
 
+        /** @deprecated Use {@link #getAllowedLatenessDuration()}} */
         @VisibleForTesting
+        @Nullable
         Time getAllowedLateness() {
-            return allowedLateness;
+            return getAllowedLatenessDuration().map(Time::of).orElse(null);
+        }
+
+        @VisibleForTesting
+        Optional<Duration> getAllowedLatenessDuration() {
+            return Optional.ofNullable(allowedLateness);
         }
 
         @VisibleForTesting
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 6b3f2d1139c..c55c63a93a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -73,6 +73,7 @@ import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Stack;
@@ -429,7 +430,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 
     /**
      * Join elements of this {@link KeyedStream} with elements of another 
{@link KeyedStream} over a
-     * time interval that can be specified with {@link 
IntervalJoin#between(Time, Time)}.
+     * time interval that can be specified with {@link 
IntervalJoin#between(Duration, Duration)}.
      *
      * @param otherStream The other keyed stream to join this keyed stream with
      * @param <T1> Type parameter of elements in the other stream
@@ -497,9 +498,30 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
          *
          * @param lowerBound The lower bound. Needs to be smaller than or 
equal to the upperBound
          * @param upperBound The upper bound. Needs to be bigger than or equal 
to the lowerBound
+         * @deprecated Use {@link #between(Duration, Duration)}
          */
+        @Deprecated
         @PublicEvolving
         public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time 
upperBound) {
+            return between(lowerBound.toDuration(), upperBound.toDuration());
+        }
+
+        /**
+         * Specifies the time boundaries over which the join operation works, 
so that
+         *
+         * <pre>
+         * leftElement.timestamp + lowerBound <= rightElement.timestamp <= 
leftElement.timestamp + upperBound
+         * </pre>
+         *
+         * <p>By default both the lower and the upper bound are inclusive. 
This can be configured
+         * with {@link IntervalJoined#lowerBoundExclusive()} and {@link
+         * IntervalJoined#upperBoundExclusive()}
+         *
+         * @param lowerBound The lower bound. Needs to be smaller than or 
equal to the upperBound
+         * @param upperBound The upper bound. Needs to be bigger than or equal 
to the lowerBound
+         */
+        @PublicEvolving
+        public IntervalJoined<T1, T2, KEY> between(Duration lowerBound, 
Duration upperBound) {
             if (timeBehaviour != TimeBehaviour.EventTime) {
                 throw new UnsupportedTimeCharacteristicException(
                         "Time-bounded stream joins are only supported in event 
time");
@@ -509,12 +531,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
             checkNotNull(upperBound, "An upper bound needs to be provided for 
a time-bounded join");
 
             return new IntervalJoined<>(
-                    streamOne,
-                    streamTwo,
-                    lowerBound.toMilliseconds(),
-                    upperBound.toMilliseconds(),
-                    true,
-                    true);
+                    streamOne, streamTwo, lowerBound.toMillis(), 
upperBound.toMillis(), true, true);
         }
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 7e0b9aa48f1..47b22a6057a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -43,6 +43,8 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder;
 import org.apache.flink.util.OutputTag;
 
+import java.time.Duration;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -103,9 +105,24 @@ public class WindowedStream<T, K, W extends Window> {
      * is {@code 0L}.
      *
      * <p>Setting an allowed lateness is only valid for event-time windows.
+     *
+     * @deprecated Use {@link #allowedLateness(Duration)}
      */
+    @Deprecated
     @PublicEvolving
     public WindowedStream<T, K, W> allowedLateness(Time lateness) {
+        return allowedLateness(lateness.toDuration());
+    }
+
+    /**
+     * Sets the time by which elements are allowed to be late. Elements that 
arrive behind the
+     * watermark by more than the specified time will be dropped. By default, 
the allowed lateness
+     * is {@code 0L}.
+     *
+     * <p>Setting an allowed lateness is only valid for event-time windows.
+     */
+    @PublicEvolving
+    public WindowedStream<T, K, W> allowedLateness(Duration lateness) {
         builder.allowedLateness(lateness);
         return this;
     }
@@ -113,7 +130,7 @@ public class WindowedStream<T, K, W extends Window> {
     /**
      * Send late arriving data to the side output identified by the given 
{@link OutputTag}. Data is
      * considered late after the watermark has passed the end of the window 
plus the allowed
-     * lateness set using {@link #allowedLateness(Time)}.
+     * lateness set using {@link #allowedLateness(Duration)}.
      *
      * <p>You can get the stream of late data using {@link
      * SingleOutputStreamOperator#getSideOutput(OutputTag)} on the {@link
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index dff237cce20..711dd59d423 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -106,6 +106,7 @@ import 
org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
 import org.apache.flink.streaming.api.transformations.CacheTransformation;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.ExceptionUtils;
@@ -979,10 +980,11 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
      *     event-time mode. If you need to disable watermarks, please use 
{@link
      *     ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using 
{@link
      *     TimeCharacteristic#IngestionTime}, please manually set an 
appropriate {@link
-     *     WatermarkStrategy}. If you are using generic "time window" 
operations (for example {@link
-     *     
org.apache.flink.streaming.api.datastream.KeyedStream#timeWindow(org.apache.flink.streaming.api.windowing.time.Time)}
-     *     that change behaviour based on the time characteristic, please use 
equivalent operations
-     *     that explicitly specify processing time or event time.
+     *     WatermarkStrategy}. If you are using generic "time window" 
operations (for example
+     *     through {@link
+     *     
org.apache.flink.streaming.api.datastream.KeyedStream#window(WindowAssigner)} 
that change
+     *     behaviour based on the time characteristic, please use equivalent 
operations that
+     *     explicitly specify processing time or event time.
      */
     @PublicEvolving
     @Deprecated
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
index bb78958761b..0ee8121f1c8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import java.time.Duration;
+
 /**
  * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks 
that lag behind the
  * element with the maximum timestamp (in event time) seen so far by a fixed 
amount of time, <code>
@@ -47,15 +49,24 @@ public abstract class 
BoundedOutOfOrdernessTimestampExtractor<T>
      */
     private final long maxOutOfOrderness;
 
+    /**
+     * @deprecated Use {@link
+     *     
BoundedOutOfOrdernessTimestampExtractor#BoundedOutOfOrdernessTimestampExtractor(Duration)}
+     */
+    @Deprecated
     public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
-        if (maxOutOfOrderness.toMilliseconds() < 0) {
-            throw new RuntimeException(
-                    "Tried to set the maximum allowed "
-                            + "lateness to "
-                            + maxOutOfOrderness
-                            + ". This parameter cannot be negative.");
+        this(maxOutOfOrderness.toDuration());
+    }
+
+    public BoundedOutOfOrdernessTimestampExtractor(Duration maxOutOfOrderness) 
{
+        if (maxOutOfOrderness.isNegative()) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Tried to set the maximum allowed lateness to %s. 
This parameter cannot be negative.",
+                            maxOutOfOrderness));
         }
-        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
+
+        this.maxOutOfOrderness = maxOutOfOrderness.toMillis();
         this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index ee3a0e0c263..7ee633b0dbb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -40,7 +41,7 @@ import java.util.Collections;
  * DataStream<Tuple2<String, Integer>> in = ...;
  * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
  * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(EventTimeSessionWindows.withGap(Time.minutes(1)));
+ *   keyed.window(EventTimeSessionWindows.withGap(Duration.ofMinutes(1)));
  * }</pre>
  */
 public class EventTimeSessionWindows extends MergingWindowAssigner<Object, 
TimeWindow> {
@@ -85,9 +86,22 @@ public class EventTimeSessionWindows extends 
MergingWindowAssigner<Object, TimeW
      *
      * @param size The session timeout, i.e. the time gap between sessions
      * @return The policy.
+     * @deprecated Use {@link #withGap(Duration)}
      */
+    @Deprecated
     public static EventTimeSessionWindows withGap(Time size) {
-        return new EventTimeSessionWindows(size.toMilliseconds());
+        return withGap(size.toDuration());
+    }
+
+    /**
+     * Creates a new {@code SessionWindows} {@link WindowAssigner} that 
assigns elements to sessions
+     * based on the element timestamp.
+     *
+     * @param size The session timeout, i.e. the time gap between sessions
+     * @return The policy.
+     */
+    public static EventTimeSessionWindows withGap(Duration size) {
+        return new EventTimeSessionWindows(size.toMillis());
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index c82309e7259..e77aaedd097 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -40,7 +41,7 @@ import java.util.Collections;
  * DataStream<Tuple2<String, Integer>> in = ...;
  * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
  * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)));
+ *   keyed.window(ProcessingTimeSessionWindows.withGap(Duration.ofMinutes(1)));
  * }</pre>
  */
 public class ProcessingTimeSessionWindows extends 
MergingWindowAssigner<Object, TimeWindow> {
@@ -87,9 +88,22 @@ public class ProcessingTimeSessionWindows extends 
MergingWindowAssigner<Object,
      *
      * @param size The session timeout, i.e. the time gap between sessions
      * @return The policy.
+     * @deprecated Use {@link #withGap(Duration)}
      */
+    @Deprecated
     public static ProcessingTimeSessionWindows withGap(Time size) {
-        return new ProcessingTimeSessionWindows(size.toMilliseconds());
+        return withGap(size.toDuration());
+    }
+
+    /**
+     * Creates a new {@code SessionWindows} {@link WindowAssigner} that 
assigns elements to sessions
+     * based on the element timestamp.
+     *
+     * @param size The session timeout, i.e. the time gap between sessions
+     * @return The policy.
+     */
+    public static ProcessingTimeSessionWindows withGap(Duration size) {
+        return new ProcessingTimeSessionWindows(size.toMillis());
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index 3711d67b0d2..4b9160aad2a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -41,7 +42,7 @@ import java.util.List;
  * DataStream<Tuple2<String, Integer>> in = ...;
  * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
  * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
- *   keyed.window(SlidingEventTimeWindows.of(Time.minutes(1), 
Time.seconds(10)));
+ *   keyed.window(SlidingEventTimeWindows.of(Duration.ofMinutes(1), 
Duration.ofSeconds(10)));
  * }</pre>
  */
 @PublicEvolving
@@ -115,9 +116,23 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
      * @param size The size of the generated windows.
      * @param slide The slide interval of the generated windows.
      * @return The time policy.
+     * @deprecated Use {@link #of(Duration, Duration)}
      */
+    @Deprecated
     public static SlidingEventTimeWindows of(Time size, Time slide) {
-        return new SlidingEventTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(), 0);
+        return of(size.toDuration(), slide.toDuration());
+    }
+
+    /**
+     * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} 
that assigns elements to
+     * sliding time windows based on the element timestamp.
+     *
+     * @param size The size of the generated windows.
+     * @param slide The slide interval of the generated windows.
+     * @return The time policy.
+     */
+    public static SlidingEventTimeWindows of(Duration size, Duration slide) {
+        return new SlidingEventTimeWindows(size.toMillis(), slide.toMillis(), 
0);
     }
 
     /**
@@ -138,10 +153,34 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
      * @param slide The slide interval of the generated windows.
      * @param offset The offset which window start would be shifted by.
      * @return The time policy.
+     * @deprecated Use {@link #of(Duration, Duration, Duration)}
      */
+    @Deprecated
     public static SlidingEventTimeWindows of(Time size, Time slide, Time 
offset) {
-        return new SlidingEventTimeWindows(
-                size.toMilliseconds(), slide.toMilliseconds(), 
offset.toMilliseconds());
+        return of(size.toDuration(), slide.toDuration(), offset.toDuration());
+    }
+
+    /**
+     * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} 
that assigns elements to
+     * time windows based on the element timestamp and offset.
+     *
+     * <p>For example, if you want window a stream by hour,but window begins 
at the 15th minutes of
+     * each hour, you can use {@code of(Duration.ofHours(1), 
Duration.ofMinutes(15))}, then you will
+     * get time windows start at 0:15:00,1:15:00,2:15:00,etc.
+     *
+     * <p>Rather than that,if you are living in somewhere which is not using 
UTC±00:00 time, such as
+     * China which is using UTC+08:00,and you want a time window with size of 
one day, and window
+     * begins at every 00:00:00 of local time,you may use {@code 
of(Duration.ofDays(1),
+     * Duration.ofHours(-8))}. The parameter of offset is {@code 
Duration.ofHours(-8))} since
+     * UTC+08:00 is 8 hours earlier than UTC time.
+     *
+     * @param size The size of the generated windows.
+     * @param slide The slide interval of the generated windows.
+     * @param offset The offset which window start would be shifted by.
+     * @return The time policy.
+     */
+    public static SlidingEventTimeWindows of(Duration size, Duration slide, 
Duration offset) {
+        return new SlidingEventTimeWindows(size.toMillis(), slide.toMillis(), 
offset.toMillis());
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 265a09286bf..1ea6de4d53d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -107,9 +108,23 @@ public class SlidingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWin
      * @param size The size of the generated windows.
      * @param slide The slide interval of the generated windows.
      * @return The time policy.
+     * @deprecated {@link #of(Duration, Duration)}
      */
+    @Deprecated
     public static SlidingProcessingTimeWindows of(Time size, Time slide) {
-        return new SlidingProcessingTimeWindows(size.toMilliseconds(), 
slide.toMilliseconds(), 0);
+        return of(size.toDuration(), slide.toDuration());
+    }
+
+    /**
+     * Creates a new {@code SlidingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+     * elements to sliding time windows based on the element timestamp.
+     *
+     * @param size The size of the generated windows.
+     * @param slide The slide interval of the generated windows.
+     * @return The time policy.
+     */
+    public static SlidingProcessingTimeWindows of(Duration size, Duration 
slide) {
+        return new SlidingProcessingTimeWindows(size.toMillis(), 
slide.toMillis(), 0);
     }
 
     /**
@@ -130,10 +145,35 @@ public class SlidingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWin
      * @param slide The slide interval of the generated windows.
      * @param offset The offset which window start would be shifted by.
      * @return The time policy.
+     * @deprecated Use {@link #of(Duration, Duration, Duration)}
      */
+    @Deprecated
     public static SlidingProcessingTimeWindows of(Time size, Time slide, Time 
offset) {
+        return of(size.toDuration(), slide.toDuration(), offset.toDuration());
+    }
+
+    /**
+     * Creates a new {@code SlidingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+     * elements to time windows based on the element timestamp and offset.
+     *
+     * <p>For example, if you want window a stream by hour,but window begins 
at the 15th minutes of
+     * each hour, you can use {@code of(Duration.ofHours(1), 
Duration.ofMinutes(15))}, then you will
+     * get time windows start at 0:15:00,1:15:00,2:15:00,etc.
+     *
+     * <p>Rather than that,if you are living in somewhere which is not using 
UTC±00:00 time, such as
+     * China which is using UTC+08:00,and you want a time window with size of 
one day, and window
+     * begins at every 00:00:00 of local time,you may use {@code 
of(Duration.ofDays(1),
+     * Duration.ofHours(-8))}. The parameter of offset is {@code 
Duration.ofHours(-8))} since
+     * UTC+08:00 is 8 hours earlier than UTC time.
+     *
+     * @param size The size of the generated windows.
+     * @param slide The slide interval of the generated windows.
+     * @param offset The offset which window start would be shifted by.
+     * @return The time policy.
+     */
+    public static SlidingProcessingTimeWindows of(Duration size, Duration 
slide, Duration offset) {
         return new SlidingProcessingTimeWindows(
-                size.toMilliseconds(), slide.toMilliseconds(), 
offset.toMilliseconds());
+                size.toMillis(), slide.toMillis(), offset.toMillis());
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 787514ca523..4fc414c66d0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -40,7 +41,7 @@ import java.util.Collections;
  * DataStream<Tuple2<String, Integer>> in = ...;
  * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
  * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
- *   keyed.window(TumblingEventTimeWindows.of(Time.minutes(1)));
+ *   keyed.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)));
  * }</pre>
  */
 @PublicEvolving
@@ -109,9 +110,22 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
      *
      * @param size The size of the generated windows.
      * @return The time policy.
+     * @deprecated Use {@link #of(Duration)}
      */
+    @Deprecated
     public static TumblingEventTimeWindows of(Time size) {
-        return new TumblingEventTimeWindows(size.toMilliseconds(), 0, 
WindowStagger.ALIGNED);
+        return of(size.toDuration());
+    }
+
+    /**
+     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} 
that assigns elements
+     * to time windows based on the element timestamp.
+     *
+     * @param size The size of the generated windows.
+     * @return The time policy.
+     */
+    public static TumblingEventTimeWindows of(Duration size) {
+        return new TumblingEventTimeWindows(size.toMillis(), 0, 
WindowStagger.ALIGNED);
     }
 
     /**
@@ -130,10 +144,33 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
      *
      * @param size The size of the generated windows.
      * @param offset The offset which window start would be shifted by.
+     * @deprecated Use {@link #of(Duration, Duration)}
      */
+    @Deprecated
     public static TumblingEventTimeWindows of(Time size, Time offset) {
+        return of(size.toDuration(), offset.toDuration());
+    }
+
+    /**
+     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} 
that assigns elements
+     * to time windows based on the element timestamp and offset.
+     *
+     * <p>For example, if you want window a stream by hour,but window begins 
at the 15th minutes of
+     * each hour, you can use {@code of(Duration.ofHours(1), 
Duration.ofMinutes(15))}, then you will
+     * get time windows start at 0:15:00,1:15:00,2:15:00,etc.
+     *
+     * <p>Rather than that,if you are living in somewhere which is not using 
UTC±00:00 time, such as
+     * China which is using UTC+08:00,and you want a time window with size of 
one day, and window
+     * begins at every 00:00:00 of local time,you may use {@code 
of(Duration.ofDays(1),
+     * Duration.ofHours(-8))}. The parameter of offset is {@code 
Duration.ofHours(-8))} since
+     * UTC+08:00 is 8 hours earlier than UTC time.
+     *
+     * @param size The size of the generated windows.
+     * @param offset The offset which window start would be shifted by.
+     */
+    public static TumblingEventTimeWindows of(Duration size, Duration offset) {
         return new TumblingEventTimeWindows(
-                size.toMilliseconds(), offset.toMilliseconds(), 
WindowStagger.ALIGNED);
+                size.toMillis(), offset.toMillis(), WindowStagger.ALIGNED);
     }
 
     /**
@@ -144,11 +181,27 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
      * @param size The size of the generated windows.
      * @param offset The globalOffset which window start would be shifted by.
      * @param windowStagger The utility that produces staggering offset in 
runtime.
+     * @deprecated Use {@link #of(Duration, Duration, WindowStagger)}
      */
+    @Deprecated
     @PublicEvolving
     public static TumblingEventTimeWindows of(Time size, Time offset, 
WindowStagger windowStagger) {
-        return new TumblingEventTimeWindows(
-                size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
+        return of(size.toDuration(), offset.toDuration(), windowStagger);
+    }
+
+    /**
+     * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} 
that assigns elements
+     * to time windows based on the element timestamp, offset and a staggering 
offset, depending on
+     * the staggering policy.
+     *
+     * @param size The size of the generated windows.
+     * @param offset The globalOffset which window start would be shifted by.
+     * @param windowStagger The utility that produces staggering offset in 
runtime.
+     */
+    @PublicEvolving
+    public static TumblingEventTimeWindows of(
+            Duration size, Duration offset, WindowStagger windowStagger) {
+        return new TumblingEventTimeWindows(size.toMillis(), 
offset.toMillis(), windowStagger);
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index e22b63acf57..2d82a4b1e57 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -40,7 +41,7 @@ import java.util.Collections;
  * DataStream<Tuple2<String, Integer>> in = ...;
  * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
  * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- *   keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), 
Time.of(10, SECONDS));
+ *   keyed.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1), 
Duration.ofSeconds(10));
  * }</pre>
  */
 public class TumblingProcessingTimeWindows extends WindowAssigner<Object, 
TimeWindow> {
@@ -105,9 +106,22 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
      *
      * @param size The size of the generated windows.
      * @return The time policy.
+     * @deprecated Use {@link #of(Duration)}
      */
+    @Deprecated
     public static TumblingProcessingTimeWindows of(Time size) {
-        return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0, 
WindowStagger.ALIGNED);
+        return of(size.toDuration());
+    }
+
+    /**
+     * Creates a new {@code TumblingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+     * elements to time windows based on the element timestamp.
+     *
+     * @param size The size of the generated windows.
+     * @return The time policy.
+     */
+    public static TumblingProcessingTimeWindows of(Duration size) {
+        return new TumblingProcessingTimeWindows(size.toMillis(), 0, 
WindowStagger.ALIGNED);
     }
 
     /**
@@ -127,10 +141,34 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
      * @param size The size of the generated windows.
      * @param offset The offset which window start would be shifted by.
      * @return The time policy.
+     * @deprecated Use {@link #of(Duration, Duration)}
      */
+    @Deprecated
     public static TumblingProcessingTimeWindows of(Time size, Time offset) {
+        return of(size.toDuration(), offset.toDuration());
+    }
+
+    /**
+     * Creates a new {@code TumblingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+     * elements to time windows based on the element timestamp and offset.
+     *
+     * <p>For example, if you want window a stream by hour,but window begins 
at the 15th minutes of
+     * each hour, you can use {@code of(Duration.ofHours(1), 
Duration.ofMinutes(15))}, then you will
+     * get time windows start at 0:15:00,1:15:00,2:15:00,etc.
+     *
+     * <p>Rather than that, if you are living in somewhere which is not using 
UTC±00:00 time, such
+     * as China which is using UTC+08:00,and you want a time window with size 
of one day, and window
+     * begins at every 00:00:00 of local time, you may use {@code 
of(Duration.ofDays(1),
+     * Duration.ofHours(-8))}. The parameter of offset is {@code 
Duration.ofHours(-8))} since
+     * UTC+08:00 is 8 hours earlier than UTC time.
+     *
+     * @param size The size of the generated windows.
+     * @param offset The offset which window start would be shifted by.
+     * @return The time policy.
+     */
+    public static TumblingProcessingTimeWindows of(Duration size, Duration 
offset) {
         return new TumblingProcessingTimeWindows(
-                size.toMilliseconds(), offset.toMilliseconds(), 
WindowStagger.ALIGNED);
+                size.toMillis(), offset.toMillis(), WindowStagger.ALIGNED);
     }
 
     /**
@@ -142,12 +180,29 @@ public class TumblingProcessingTimeWindows extends 
WindowAssigner<Object, TimeWi
      * @param offset The offset which window start would be shifted by.
      * @param windowStagger The utility that produces staggering offset in 
runtime.
      * @return The time policy.
+     * @deprecated Use {@link #of(Duration, Duration, WindowStagger)}
      */
+    @Deprecated
     @PublicEvolving
     public static TumblingProcessingTimeWindows of(
             Time size, Time offset, WindowStagger windowStagger) {
-        return new TumblingProcessingTimeWindows(
-                size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
+        return of(size.toDuration(), offset.toDuration(), windowStagger);
+    }
+
+    /**
+     * Creates a new {@code TumblingProcessingTimeWindows} {@link 
WindowAssigner} that assigns
+     * elements to time windows based on the element timestamp, offset and a 
staggering offset,
+     * depending on the staggering policy.
+     *
+     * @param size The size of the generated windows.
+     * @param offset The offset which window start would be shifted by.
+     * @param windowStagger The utility that produces staggering offset in 
runtime.
+     * @return The time policy.
+     */
+    @PublicEvolving
+    public static TumblingProcessingTimeWindows of(
+            Duration size, Duration offset, WindowStagger windowStagger) {
+        return new TumblingProcessingTimeWindows(size.toMillis(), 
offset.toMillis(), windowStagger);
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 449293ef9ca..5daa589ab01 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -24,6 +24,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
 
+import java.time.Duration;
 import java.util.Iterator;
 
 /**
@@ -124,9 +125,21 @@ public class TimeEvictor<W extends Window> implements 
Evictor<Object, W> {
      * before the window function.
      *
      * @param windowSize The amount of time for which to keep elements.
+     * @deprecated Use {@link #of(Duration)}
      */
+    @Deprecated
     public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
-        return new TimeEvictor<>(windowSize.toMilliseconds());
+        return of(windowSize.toDuration());
+    }
+
+    /**
+     * Creates a {@code TimeEvictor} that keeps the given number of elements. 
Eviction is done
+     * before the window function.
+     *
+     * @param windowSize The amount of time for which to keep elements.
+     */
+    public static <W extends Window> TimeEvictor<W> of(Duration windowSize) {
+        return new TimeEvictor<>(windowSize.toMillis());
     }
 
     /**
@@ -135,8 +148,21 @@ public class TimeEvictor<W extends Window> implements 
Evictor<Object, W> {
      *
      * @param windowSize The amount of time for which to keep elements.
      * @param doEvictAfter Whether eviction is done after window function.
+     * @deprecated Use {@link #of(Duration, boolean)}
      */
+    @Deprecated
     public static <W extends Window> TimeEvictor<W> of(Time windowSize, 
boolean doEvictAfter) {
-        return new TimeEvictor<>(windowSize.toMilliseconds(), doEvictAfter);
+        return of(windowSize.toDuration(), doEvictAfter);
+    }
+
+    /**
+     * Creates a {@code TimeEvictor} that keeps the given number of elements. 
Eviction is done
+     * before/after the window function based on the value of doEvictAfter.
+     *
+     * @param windowSize The amount of time for which to keep elements.
+     * @param doEvictAfter Whether eviction is done after window function.
+     */
+    public static <W extends Window> TimeEvictor<W> of(Duration windowSize, 
boolean doEvictAfter) {
+        return new TimeEvictor<>(windowSize.toMillis(), doEvictAfter);
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
index b842a263dc3..2e8fa587030 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/time/Time.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.api.windowing.time;
 
 import org.apache.flink.annotation.Public;
 
+import javax.annotation.Nullable;
+
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -27,10 +30,23 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * The definition of a time interval for windowing. The time characteristic 
referred to is the
  * default time characteristic set on the execution environment.
+ *
+ * @deprecated Use {@link Duration}
  */
+@Deprecated
 @Public
 public final class Time {
 
+    @Nullable
+    public static Duration toDuration(@Nullable Time time) {
+        return time != null ? time.toDuration() : null;
+    }
+
+    @Nullable
+    public static Time of(@Nullable Duration duration) {
+        return duration != null ? Time.milliseconds(duration.toMillis()) : 
null;
+    }
+
     /** The time unit for this policy's time interval. */
     private final TimeUnit unit;
 
@@ -74,6 +90,10 @@ public final class Time {
         return unit.toMillis(size);
     }
 
+    public Duration toDuration() {
+        return Duration.ofMillis(this.toMilliseconds());
+    }
+
     // ------------------------------------------------------------------------
     //  Factory
     // ------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 8d0dcb87c89..19222b43b48 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+import java.time.Duration;
+
 /**
  * A {@link Trigger} that continuously fires based on a given time interval. 
This fires based on
  * {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
@@ -133,9 +135,21 @@ public class ContinuousEventTimeTrigger<W extends Window> 
extends Trigger<Object
      *
      * @param interval The time interval at which to fire.
      * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+     * @deprecated Use {@link #of(Duration)}
      */
+    @Deprecated
     public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time 
interval) {
-        return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
+        return of(interval.toDuration());
+    }
+
+    /**
+     * Creates a trigger that continuously fires based on the given interval.
+     *
+     * @param interval The time interval at which to fire.
+     * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+     */
+    public static <W extends Window> ContinuousEventTimeTrigger<W> of(Duration 
interval) {
+        return new ContinuousEventTimeTrigger<>(interval.toMillis());
     }
 
     private static class Min implements ReduceFunction<Long> {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index e3b1325f0f6..5dda7deca55 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -27,6 +27,8 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+import java.time.Duration;
+
 /**
  * A {@link Trigger} that continuously fires based on a given time interval as 
measured by the clock
  * of the machine on which the job is running.
@@ -122,9 +124,21 @@ public class ContinuousProcessingTimeTrigger<W extends 
Window> extends Trigger<O
      *
      * @param interval The time interval at which to fire.
      * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+     * @deprecated Use {@link #of(Duration)}
      */
+    @Deprecated
     public static <W extends Window> ContinuousProcessingTimeTrigger<W> 
of(Time interval) {
-        return new 
ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
+        return of(interval.toDuration());
+    }
+
+    /**
+     * Creates a trigger that continuously fires based on the given interval.
+     *
+     * @param interval The time interval at which to fire.
+     * @param <W> The type of {@link Window Windows} on which this trigger can 
operate.
+     */
+    public static <W extends Window> ContinuousProcessingTimeTrigger<W> 
of(Duration interval) {
+        return new ContinuousProcessingTimeTrigger<>(interval.toMillis());
     }
 
     private static class Min implements ReduceFunction<Long> {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
index f9db7a38f38..8e9564618d0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorBuilder.java
@@ -57,6 +57,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.lang.reflect.Type;
+import java.time.Duration;
 
 /**
  * A builder for creating {@link WindowOperator WindowOperators}.
@@ -113,10 +114,16 @@ public class WindowOperatorBuilder<T, K, W extends 
Window> {
         this.trigger = trigger;
     }
 
+    /** @deprecated Use {@link #allowedLateness(Duration)}. */
+    @Deprecated
     public void allowedLateness(Time lateness) {
+        allowedLateness(lateness.toDuration());
+    }
+
+    public void allowedLateness(Duration lateness) {
         Preconditions.checkNotNull(lateness, "Allowed lateness cannot be 
null");
 
-        final long millis = lateness.toMilliseconds();
+        final long millis = lateness.toMillis();
         Preconditions.checkArgument(millis >= 0, "The allowed lateness cannot 
be negative.");
 
         this.allowedLateness = millis;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
index d585c3f612e..5a8446d264a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
+
 /** Unit test for {@link CoGroupedStreams}. */
 public class CoGroupedStreamsTest {
     private DataStream<String> dataStream1;
@@ -43,14 +44,14 @@ public class CoGroupedStreamsTest {
         dataStream1 = env.fromData("a1", "a2", "a3");
         dataStream2 = env.fromData("a1", "a2");
         keySelector = element -> element;
-        tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1L));
+        tsAssigner = TumblingEventTimeWindows.of(Duration.ofMillis(1L));
         coGroupFunction =
                 (CoGroupFunction<String, String, String>) (first, second, out) 
-> out.collect("");
     }
 
     @Test
     public void testDelegateToCoGrouped() {
-        Time lateness = Time.milliseconds(42L);
+        Duration lateness = Duration.ofMillis(42L);
 
         CoGroupedStreams.WithWindow<String, String, String, TimeWindow> 
withLateness =
                 dataStream1
@@ -63,12 +64,12 @@ public class CoGroupedStreamsTest {
         withLateness.apply(coGroupFunction, BasicTypeInfo.STRING_TYPE_INFO);
 
         Assert.assertEquals(
-                lateness.toMilliseconds(), 
withLateness.getWindowedStream().getAllowedLateness());
+                lateness.toMillis(), 
withLateness.getWindowedStream().getAllowedLateness());
     }
 
     @Test
     public void testSetAllowedLateness() {
-        Time lateness = Time.milliseconds(42L);
+        Duration lateness = Duration.ofMillis(42L);
 
         CoGroupedStreams.WithWindow<String, String, String, TimeWindow> 
withLateness =
                 dataStream1
@@ -78,7 +79,6 @@ public class CoGroupedStreamsTest {
                         .window(tsAssigner)
                         .allowedLateness(lateness);
 
-        Assert.assertEquals(
-                lateness.toMilliseconds(), 
withLateness.getAllowedLateness().toMilliseconds());
+        Assert.assertEquals(lateness, 
withLateness.getAllowedLatenessDuration().orElse(null));
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
index 879569d6bc5..4a3898a265c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
@@ -22,13 +22,14 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
-import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
+
 /** Unit test for {@link JoinedStreams}. */
 public class JoinedStreamsTest {
     private DataStream<String> dataStream1;
@@ -43,13 +44,13 @@ public class JoinedStreamsTest {
         dataStream1 = env.fromData("a1", "a2", "a3");
         dataStream2 = env.fromData("a1", "a2");
         keySelector = element -> element;
-        tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1));
+        tsAssigner = TumblingEventTimeWindows.of(Duration.ofMillis(1));
         joinFunction = (first, second) -> first + second;
     }
 
     @Test
     public void testDelegateToCoGrouped() {
-        Time lateness = Time.milliseconds(42L);
+        Duration lateness = Duration.ofMillis(42L);
 
         JoinedStreams.WithWindow<String, String, String, TimeWindow> 
withLateness =
                 dataStream1
@@ -62,13 +63,16 @@ public class JoinedStreamsTest {
         withLateness.apply(joinFunction, BasicTypeInfo.STRING_TYPE_INFO);
 
         Assert.assertEquals(
-                lateness.toMilliseconds(),
-                
withLateness.getCoGroupedWindowedStream().getAllowedLateness().toMilliseconds());
+                lateness,
+                withLateness
+                        .getCoGroupedWindowedStream()
+                        .getAllowedLatenessDuration()
+                        .orElse(null));
     }
 
     @Test
     public void testSetAllowedLateness() {
-        Time lateness = Time.milliseconds(42L);
+        Duration lateness = Duration.ofMillis(42L);
 
         JoinedStreams.WithWindow<String, String, String, TimeWindow> 
withLateness =
                 dataStream1
@@ -78,7 +82,6 @@ public class JoinedStreamsTest {
                         .window(tsAssigner)
                         .allowedLateness(lateness);
 
-        Assert.assertEquals(
-                lateness.toMilliseconds(), 
withLateness.getAllowedLateness().toMilliseconds());
+        Assert.assertEquals(lateness, 
withLateness.getAllowedLatenessDuration().orElse(null));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
index c8c135c1abc..82344d73aac 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/match/PatternTranslatorTestBase.scala
@@ -121,10 +121,10 @@ abstract class PatternTranslatorTestBase {
       val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
         currentRight.getAfterMatchSkipStrategy
 
-      val sameTimeWindow = if (currentLeft.getWindowTime != null && 
currentRight != null) {
-        currentLeft.getWindowTime.toMilliseconds == 
currentRight.getWindowTime.toMilliseconds
+      val sameTimeWindow = if (currentLeft.getWindowSize.isPresent && 
currentRight != null) {
+        currentLeft.getWindowSize.get.toMillis == 
currentRight.getWindowSize.get.toMillis
       } else {
-        currentLeft.getWindowTime == null && currentRight.getWindowTime == null
+        !currentLeft.getWindowSize.isPresent && 
!currentRight.getWindowSize.isPresent
       }
 
       currentLeft = currentLeft.getPrevious

Reply via email to