This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a75df8776feeda2c5496fda38845e1836e8c6a90
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Sep 24 19:28:25 2024 +0800

    [FLINK-36355][runtime] Remove deprecated TimestampAssigner
---
 .../tests/DataStreamAllroundTestJobFactory.java    |  4 +-
 .../tests/DataStreamAllroundTestProgram.java       |  4 +-
 .../RocksDBStateMemoryControlTestProgram.java      |  4 +-
 .../tests/StatefulStreamJobUpgradeTestProgram.java |  6 +-
 .../test/java/org/apache/flink/cep/CEPITCase.java  | 17 +++--
 .../flink/streaming/api/datastream/DataStream.java | 50 -------------
 .../streaming/api/functions/TimestampAssigner.java | 50 -------------
 .../assigners/SlidingEventTimeWindows.java         |  3 +-
 .../assigners/TumblingEventTimeWindows.java        |  3 +-
 .../AssignerWithPeriodicWatermarksAdapter.java     | 84 ---------------------
 .../AssignerWithPunctuatedWatermarksAdapter.java   | 87 ----------------------
 .../WatermarkStrategyWithPeriodicWatermarks.java}  | 50 +++++++++++--
 ...WatermarkStrategyWithPunctuatedWatermarks.java} | 51 ++++++++++---
 .../timestamps/AscendingTimestampExtractor.java    |  5 +-
 .../BoundedOutOfOrdernessTimestampExtractor.java   |  9 ++-
 .../apache/flink/streaming/api/DataStreamTest.java |  4 +-
 .../stream/StreamExecLegacyTableSourceScan.java    |  6 +-
 .../runtime/stream/sql/IntervalJoinITCase.scala    |  6 +-
 .../table/planner/runtime/utils/TimeTestUtil.scala |  4 +-
 .../PeriodicWatermarkAssignerWrapper.java          |  5 +-
 .../ProcTimeMiniBatchAssignerOperator.java         |  5 +-
 ...ava => PunctuatedWatermarkStrategyWrapper.java} |  8 +-
 .../streaming/runtime/BroadcastStateITCase.java    |  4 +-
 .../test/streaming/runtime/CoGroupJoinITCase.java  |  6 +-
 .../test/streaming/runtime/SideOutputITCase.java   |  4 +-
 25 files changed, 141 insertions(+), 338 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
index 1720004be2e..85107ebb222 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.tests;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -429,8 +430,7 @@ public class DataStreamAllroundTestJobFactory {
                         
SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue()));
     }
 
-    static BoundedOutOfOrdernessTimestampExtractor<Event> 
createTimestampExtractor(
-            ParameterTool pt) {
+    static WatermarkStrategy<Event> createWatermarkStrategy(ParameterTool pt) {
         return new BoundedOutOfOrdernessTimestampExtractor<Event>(
                 Duration.ofMillis(
                         pt.getLong(
diff --git 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
index bd3e991a430..47488974383 100644
--- 
a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
@@ -49,7 +49,7 @@ import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindow;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindowCheckMapper;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
 import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE;
@@ -94,7 +94,7 @@ public class DataStreamAllroundTestProgram {
                 env.addSource(createEventSource(pt))
                         .name(EVENT_SOURCE.getName())
                         .uid(EVENT_SOURCE.getUid())
-                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
                         .keyBy(Event::getKey)
                         .map(
                                 createArtificialKeyedStateMapper(
diff --git 
a/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java
 
b/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java
index f984000d4f6..c4ae019900b 100644
--- 
a/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java
@@ -36,7 +36,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.ParameterTool;
 
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
 import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE;
 import static 
org.apache.flink.streaming.tests.TestOperatorEnum.TIME_WINDOW_OPER;
@@ -61,7 +61,7 @@ public class RocksDBStateMemoryControlTestProgram {
                 
env.addSource(DataStreamAllroundTestJobFactory.createEventSource(pt))
                         .name(EVENT_SOURCE.getName())
                         .uid(EVENT_SOURCE.getUid())
-                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
                         .keyBy(Event::getKey);
 
         keyedStream
diff --git 
a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
 
b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
index 349ba6efdc4..a280c256b41 100644
--- 
a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
+++ 
b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java
@@ -36,7 +36,7 @@ import java.util.List;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper;
-import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor;
+import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy;
 import static 
org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment;
 
 /**
@@ -94,7 +94,7 @@ public class StatefulStreamJobUpgradeTestProgram {
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
                         .keyBy(Event::getKey);
 
         List<TypeSerializer<ComplexPayload>> stateSer =
@@ -119,7 +119,7 @@ public class StatefulStreamJobUpgradeTestProgram {
                 env.addSource(createEventSource(pt))
                         .name("EventSource")
                         .uid("EventSource")
-                        
.assignTimestampsAndWatermarks(createTimestampExtractor(pt))
+                        
.assignTimestampsAndWatermarks(createWatermarkStrategy(pt))
                         .map(new UpgradeEvent())
                         .keyBy(UpgradedEvent::getKey);
 
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 86e2afa8a8d..6e4c51778e7 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -41,8 +41,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
 import org.apache.flink.test.util.AbstractTestBaseJUnit4;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Collector;
@@ -219,7 +219,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 {
                                 // last element for high final watermark
                                 Tuple2.of(new Event(5, "middle", 5.0), 100L))
                         .assignTimestampsAndWatermarks(
-                                new 
AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
+                                new WatermarkStrategyWithPunctuatedWatermarks<
+                                        Tuple2<Event, Long>>() {
 
                                     @Override
                                     public long extractTimestamp(
@@ -300,7 +301,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 {
                                 Tuple2.of(new Event(3, "middle", 6.0), 9L),
                                 Tuple2.of(new Event(3, "end", 7.0), 7L))
                         .assignTimestampsAndWatermarks(
-                                new 
AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
+                                new WatermarkStrategyWithPunctuatedWatermarks<
+                                        Tuple2<Event, Long>>() {
 
                                     @Override
                                     public long extractTimestamp(
@@ -458,7 +460,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 {
                                 Tuple2.of(new Event(1, "start", 2.0), 4L),
                                 Tuple2.of(new Event(1, "end", 2.0), 6L))
                         .assignTimestampsAndWatermarks(
-                                new 
AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
+                                new WatermarkStrategyWithPunctuatedWatermarks<
+                                        Tuple2<Event, Long>>() {
 
                                     @Override
                                     public long extractTimestamp(
@@ -551,7 +554,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 {
                                 Tuple2.of(new Event(1, "start", 2.0), 4L),
                                 Tuple2.of(new Event(1, "end", 2.0), 6L))
                         .assignTimestampsAndWatermarks(
-                                new 
AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
+                                new WatermarkStrategyWithPunctuatedWatermarks<
+                                        Tuple2<Event, Long>>() {
 
                                     @Override
                                     public long extractTimestamp(
@@ -714,7 +718,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 {
                                 // last element for high final watermark
                                 Tuple2.of(new Event(7, "middle", 5.0), 100L))
                         .assignTimestampsAndWatermarks(
-                                new 
AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
+                                new WatermarkStrategyWithPunctuatedWatermarks<
+                                        Tuple2<Event, Long>>() {
 
                                     @Override
                                     public long extractTimestamp(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 489c9c6ef91..175931c3dee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -49,8 +49,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.RpcOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import 
org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction;
@@ -80,8 +78,6 @@ import 
org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.streaming.api.windowing.windows.Window;
-import 
org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter;
-import 
org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -672,52 +668,6 @@ public class DataStream<T> {
         return new SingleOutputStreamOperator<>(getExecutionEnvironment(), 
transformation);
     }
 
-    /**
-     * Assigns timestamps to the elements in the data stream and periodically 
creates watermarks to
-     * signal event time progress.
-     *
-     * <p>This method uses the deprecated watermark generator interfaces. 
Please switch to {@link
-     * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new 
interfaces instead. The new
-     * interfaces support watermark idleness and no longer need to 
differentiate between "periodic"
-     * and "punctuated" watermarks.
-     *
-     * @deprecated Please use {@link 
#assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
-     */
-    @Deprecated
-    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
-            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
-
-        final AssignerWithPeriodicWatermarks<T> cleanedAssigner =
-                clean(timestampAndWatermarkAssigner);
-        final WatermarkStrategy<T> wms =
-                new 
AssignerWithPeriodicWatermarksAdapter.Strategy<>(cleanedAssigner);
-
-        return assignTimestampsAndWatermarks(wms);
-    }
-
-    /**
-     * Assigns timestamps to the elements in the data stream and creates 
watermarks based on events,
-     * to signal event time progress.
-     *
-     * <p>This method uses the deprecated watermark generator interfaces. 
Please switch to {@link
-     * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new 
interfaces instead. The new
-     * interfaces support watermark idleness and no longer need to 
differentiate between "periodic"
-     * and "punctuated" watermarks.
-     *
-     * @deprecated Please use {@link 
#assignTimestampsAndWatermarks(WatermarkStrategy)} instead.
-     */
-    @Deprecated
-    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
-            AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) 
{
-
-        final AssignerWithPunctuatedWatermarks<T> cleanedAssigner =
-                clean(timestampAndWatermarkAssigner);
-        final WatermarkStrategy<T> wms =
-                new 
AssignerWithPunctuatedWatermarksAdapter.Strategy<>(cleanedAssigner);
-
-        return assignTimestampsAndWatermarks(wms);
-    }
-
     // ------------------------------------------------------------------------
     //  Data sinks
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java
deleted file mode 100644
index 4d945742e79..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.api.common.functions.Function;
-
-/**
- * A {@code TimestampAssigner} assigns event time timestamps to elements. 
These timestamps are used
- * by all functions that operate on event time, for example event time windows.
- *
- * <p>Timestamps are represented in milliseconds since the Epoch (midnight, 
January 1, 1970 UTC).
- *
- * @param <T> The type of the elements to which this assigner assigns 
timestamps.
- * @deprecated use {@link 
org.apache.flink.api.common.eventtime.TimestampAssigner}
- */
-@Deprecated
-public interface TimestampAssigner<T>
-        extends org.apache.flink.api.common.eventtime.TimestampAssigner<T>, 
Function {
-
-    /**
-     * Assigns a timestamp to an element, in milliseconds since the Epoch.
-     *
-     * <p>The method is passed the previously assigned timestamp of the 
element. That previous
-     * timestamp may have been assigned from a previous assigner, by ingestion 
time. If the element
-     * did not carry a timestamp before, this value is {@code Long.MIN_VALUE}.
-     *
-     * @param element The element that the timestamp will be assigned to.
-     * @param recordTimestamp The previous internal timestamp of the element, 
or a negative value,
-     *     if no timestamp has been assigned yet.
-     * @return The new timestamp.
-     */
-    @Override
-    long extractTimestamp(T element, long recordTimestamp);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
index be291e27036..f338738bbb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java
@@ -79,8 +79,7 @@ public class SlidingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
         } else {
             throw new RuntimeException(
                     "Record has Long.MIN_VALUE timestamp (= no timestamp 
marker). "
-                            + "Is the time characteristic set to 
'ProcessingTime', or did you forget to call "
-                            + 
"'DataStream.assignTimestampsAndWatermarks(...)'?");
+                            + "Did you forget to call 
'DataStream.assignTimestampsAndWatermarks(...)'?");
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
index 95434b9bb3b..5f4eccae041 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java
@@ -82,8 +82,7 @@ public class TumblingEventTimeWindows extends 
WindowAssigner<Object, TimeWindow>
         } else {
             throw new RuntimeException(
                     "Record has Long.MIN_VALUE timestamp (= no timestamp 
marker). "
-                            + "Is the time characteristic set to 
'ProcessingTime', or did you forget to call "
-                            + 
"'DataStream.assignTimestampsAndWatermarks(...)'?");
+                            + "Did you forget to call 
'DataStream.assignTimestampsAndWatermarks(...)'?");
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPeriodicWatermarksAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPeriodicWatermarksAdapter.java
deleted file mode 100644
index ab54a2ec499..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPeriodicWatermarksAdapter.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.eventtime.TimestampAssigner;
-import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
-import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An adapter that wraps a {@link AssignerWithPeriodicWatermarks} into a 
{@link WatermarkGenerator}.
- */
-@Internal
-@SuppressWarnings("deprecation")
-public final class AssignerWithPeriodicWatermarksAdapter<T> implements 
WatermarkGenerator<T> {
-
-    private final AssignerWithPeriodicWatermarks<T> wms;
-
-    public 
AssignerWithPeriodicWatermarksAdapter(AssignerWithPeriodicWatermarks<T> wms) {
-        this.wms = checkNotNull(wms);
-    }
-
-    @Override
-    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) 
{}
-
-    @Override
-    public void onPeriodicEmit(WatermarkOutput output) {
-        final org.apache.flink.streaming.api.watermark.Watermark next = 
wms.getCurrentWatermark();
-        if (next != null) {
-            output.emitWatermark(new Watermark(next.getTimestamp()));
-        }
-    }
-
-    // ------------------------------------------------------------------------
-
-    /**
-     * A WatermarkStrategy that returns an {@link 
AssignerWithPeriodicWatermarks} wrapped as a
-     * {@link WatermarkGenerator}.
-     */
-    public static final class Strategy<T> implements WatermarkStrategy<T> {
-        private static final long serialVersionUID = 1L;
-
-        private final AssignerWithPeriodicWatermarks<T> wms;
-
-        public Strategy(AssignerWithPeriodicWatermarks<T> wms) {
-            this.wms = checkNotNull(wms);
-        }
-
-        @Override
-        public TimestampAssigner<T> createTimestampAssigner(
-                TimestampAssignerSupplier.Context context) {
-            return wms;
-        }
-
-        @Override
-        public WatermarkGenerator<T> createWatermarkGenerator(
-                WatermarkGeneratorSupplier.Context context) {
-            return new AssignerWithPeriodicWatermarksAdapter<>(wms);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPunctuatedWatermarksAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPunctuatedWatermarksAdapter.java
deleted file mode 100644
index 8ca9a2ce38c..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPunctuatedWatermarksAdapter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators.util;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.eventtime.TimestampAssigner;
-import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
-import org.apache.flink.api.common.eventtime.Watermark;
-import org.apache.flink.api.common.eventtime.WatermarkGenerator;
-import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
-import org.apache.flink.api.common.eventtime.WatermarkOutput;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An adapter that wraps a {@link AssignerWithPunctuatedWatermarks} into a 
{@link
- * WatermarkGenerator}.
- */
-@Internal
-@SuppressWarnings("deprecation")
-public final class AssignerWithPunctuatedWatermarksAdapter<T> implements 
WatermarkGenerator<T> {
-
-    private final AssignerWithPunctuatedWatermarks<T> wms;
-
-    public 
AssignerWithPunctuatedWatermarksAdapter(AssignerWithPunctuatedWatermarks<T> 
wms) {
-        this.wms = checkNotNull(wms);
-    }
-
-    @Override
-    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
-        final org.apache.flink.streaming.api.watermark.Watermark next =
-                wms.checkAndGetNextWatermark(event, eventTimestamp);
-
-        if (next != null) {
-            output.emitWatermark(new Watermark(next.getTimestamp()));
-        }
-    }
-
-    @Override
-    public void onPeriodicEmit(WatermarkOutput output) {}
-
-    // ------------------------------------------------------------------------
-
-    /**
-     * A WatermarkStrategy that returns an {@link 
AssignerWithPunctuatedWatermarks} wrapped as a
-     * {@link WatermarkGenerator}.
-     */
-    public static final class Strategy<T> implements WatermarkStrategy<T> {
-        private static final long serialVersionUID = 1L;
-
-        private final AssignerWithPunctuatedWatermarks<T> wms;
-
-        public Strategy(AssignerWithPunctuatedWatermarks<T> wms) {
-            this.wms = checkNotNull(wms);
-        }
-
-        @Override
-        public TimestampAssigner<T> createTimestampAssigner(
-                TimestampAssignerSupplier.Context context) {
-            return wms;
-        }
-
-        @Override
-        public WatermarkGenerator<T> createWatermarkGenerator(
-                WatermarkGeneratorSupplier.Context context) {
-            return new AssignerWithPunctuatedWatermarksAdapter<>(wms);
-        }
-    }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPeriodicWatermarks.java
similarity index 62%
rename from 
flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
rename to 
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPeriodicWatermarks.java
index 93ac8f3425e..dcd0c37b501 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPeriodicWatermarks.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,18 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.functions;
+package org.apache.flink.streaming.runtime.operators.util;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.streaming.api.watermark.Watermark;
 
 import javax.annotation.Nullable;
 
 /**
- * The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to 
elements, and
- * generates low watermarks that signal event time progress within the stream. 
These timestamps and
- * watermarks are used by functions and operators that operate on event time, 
for example event time
- * windows.
+ * The {@code WatermarkStrategyWithPeriodicWatermarks} assigns event time 
timestamps to elements,
+ * and generates low watermarks that signal event time progress within the 
stream. These timestamps
+ * and watermarks are used by functions and operators that operate on event 
time, for example event
+ * time windows.
  *
  * <p>Use this class to generate watermarks in a periodical interval. At most 
every {@code i}
  * milliseconds (configured via {@link 
ExecutionConfig#getAutoWatermarkInterval()}), the system will
@@ -46,8 +53,9 @@ import javax.annotation.Nullable;
  * @param <T> The type of the elements to which this assigner assigns 
timestamps.
  * @see org.apache.flink.streaming.api.watermark.Watermark
  */
-@Deprecated
-public interface AssignerWithPeriodicWatermarks<T> extends 
TimestampAssigner<T> {
+@Internal
+public interface WatermarkStrategyWithPeriodicWatermarks<T>
+        extends WatermarkStrategy<T>, TimestampAssigner<T> {
 
     /**
      * Returns the current watermark. This method is periodically called by 
the system to retrieve
@@ -70,4 +78,30 @@ public interface AssignerWithPeriodicWatermarks<T> extends 
TimestampAssigner<T>
      */
     @Nullable
     Watermark getCurrentWatermark();
+
+    @Override
+    default TimestampAssigner<T> createTimestampAssigner(
+            TimestampAssignerSupplier.Context context) {
+        return this;
+    }
+
+    @Override
+    default WatermarkGenerator<T> createWatermarkGenerator(
+            WatermarkGeneratorSupplier.Context context) {
+        return new WatermarkGenerator<T>() {
+            @Override
+            public void onEvent(T event, long eventTimestamp, WatermarkOutput 
output) {}
+
+            @Override
+            public void onPeriodicEmit(WatermarkOutput output) {
+                final org.apache.flink.streaming.api.watermark.Watermark next =
+                        getCurrentWatermark();
+                if (next != null) {
+                    output.emitWatermark(
+                            new 
org.apache.flink.api.common.eventtime.Watermark(
+                                    next.getTimestamp()));
+                }
+            }
+        };
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPunctuatedWatermarks.java
similarity index 66%
rename from 
flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
rename to 
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPunctuatedWatermarks.java
index 310167c1ed9..1f5992542f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPunctuatedWatermarks.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,9 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.api.functions;
+package org.apache.flink.streaming.runtime.operators.util;
 
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 
 import javax.annotation.Nullable;
 
@@ -34,7 +41,7 @@ import javax.annotation.Nullable;
  * previous watermark (to preserve the contract of ascending watermarks).
  *
  * <p>For use cases that should periodically emit watermarks based on element 
timestamps, use the
- * {@link AssignerWithPeriodicWatermarks} instead.
+ * {@link WatermarkStrategyWithPeriodicWatermarks} instead.
  *
  * <p>The following example illustrates how to use this timestamp extractor 
and watermark generator.
  * It assumes elements carry a timestamp that describes when they were 
created, and that some
@@ -62,9 +69,9 @@ import javax.annotation.Nullable;
  * @param <T> The type of the elements to which this assigner assigns 
timestamps.
  * @see org.apache.flink.streaming.api.watermark.Watermark
  */
-@Deprecated
-public interface AssignerWithPunctuatedWatermarks<T> extends 
TimestampAssigner<T> {
-
+@Internal
+public interface WatermarkStrategyWithPunctuatedWatermarks<T>
+        extends WatermarkStrategy<T>, TimestampAssigner<T> {
     /**
      * Asks this implementation if it wants to emit a watermark. This method 
is called right after
      * the {@link #extractTimestamp(Object, long)} method.
@@ -75,10 +82,36 @@ public interface AssignerWithPunctuatedWatermarks<T> 
extends TimestampAssigner<T
      * smaller than that of the last emitted one, then no new watermark will 
be generated.
      *
      * <p>For an example how to use this method, see the documentation of 
{@link
-     * AssignerWithPunctuatedWatermarks this class}.
+     * WatermarkStrategyWithPunctuatedWatermarks this class}.
      *
      * @return {@code Null}, if no watermark should be emitted, or the next 
watermark to emit.
      */
     @Nullable
-    Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
+    org.apache.flink.streaming.api.watermark.Watermark 
checkAndGetNextWatermark(
+            T lastElement, long extractedTimestamp);
+
+    @Override
+    default TimestampAssigner<T> createTimestampAssigner(
+            TimestampAssignerSupplier.Context context) {
+        return this;
+    }
+
+    @Override
+    default WatermarkGenerator<T> createWatermarkGenerator(
+            WatermarkGeneratorSupplier.Context context) {
+        return new WatermarkGenerator<T>() {
+            @Override
+            public void onEvent(T event, long eventTimestamp, WatermarkOutput 
output) {
+                final org.apache.flink.streaming.api.watermark.Watermark next =
+                        checkAndGetNextWatermark(event, eventTimestamp);
+
+                if (next != null) {
+                    output.emitWatermark(new Watermark(next.getTimestamp()));
+                }
+            }
+
+            @Override
+            public void onPeriodicEmit(WatermarkOutput output) {}
+        };
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java
index c59087476f0..f047f5ec8ef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.api.functions.timestamps;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +36,8 @@ import static java.util.Objects.requireNonNull;
  */
 @Deprecated
 @PublicEvolving
-public abstract class AscendingTimestampExtractor<T> implements 
AssignerWithPeriodicWatermarks<T> {
+public abstract class AscendingTimestampExtractor<T>
+        implements WatermarkStrategyWithPeriodicWatermarks<T> {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
index c07f9ac34e6..f2fd0b1d8d4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java
@@ -18,21 +18,22 @@
 
 package org.apache.flink.streaming.api.functions.timestamps;
 
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks;
 
 import java.time.Duration;
 
 /**
- * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks 
that lag behind the
- * element with the maximum timestamp (in event time) seen so far by a fixed 
amount of time, <code>
+ * This is a {@link WatermarkStrategyWithPeriodicWatermarks} used to emit 
Watermarks that lag behind
+ * the element with the maximum timestamp (in event time) seen so far by a 
fixed amount of time,
+ * <code>
  * t_late</code>. This can help reduce the number of elements that are ignored 
due to lateness when
  * computing the final result for a given window, in the case where we know 
that elements arrive no
  * later than <code>t_late</code> units of time after the watermark that 
signals that the system
  * event-time has advanced past their (event-time) timestamp.
  */
 public abstract class BoundedOutOfOrdernessTimestampExtractor<T>
-        implements AssignerWithPeriodicWatermarks<T> {
+        implements WatermarkStrategyWithPeriodicWatermarks<T> {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 4c600d621e0..c02464b9557 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -49,7 +49,6 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
@@ -71,6 +70,7 @@ import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -1219,7 +1219,7 @@ class DataStreamTest {
     }
 
     private abstract static class CustomWmEmitter<T>
-            implements AssignerWithPunctuatedWatermarks<T> {
+            implements WatermarkStrategyWithPunctuatedWatermarks<T> {
 
         @Nullable
         @Override
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
index 289eaad07fb..1f45247840e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java
@@ -40,7 +40,7 @@ import org.apache.flink.table.planner.sources.TableSourceUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 import 
org.apache.flink.table.runtime.operators.wmassigners.PeriodicWatermarkAssignerWrapper;
-import 
org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkAssignerWrapper;
+import 
org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkStrategyWrapper;
 import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
 import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
 import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
@@ -151,8 +151,8 @@ public class StreamExecLegacyTableSourceScan extends 
CommonExecLegacyTableSource
                                         return 
ingestedTable.assignTimestampsAndWatermarks(
                                                 watermarkGenerator);
                                     } else if (strategy instanceof 
PunctuatedWatermarkAssigner) {
-                                        PunctuatedWatermarkAssignerWrapper 
watermarkGenerator =
-                                                new 
PunctuatedWatermarkAssignerWrapper(
+                                        PunctuatedWatermarkStrategyWrapper 
watermarkGenerator =
+                                                new 
PunctuatedWatermarkStrategyWrapper(
                                                         
(PunctuatedWatermarkAssigner) strategy,
                                                         rowtimeFieldIdx,
                                                         
tableSource.getProducedDataType());
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
index 3b637c34ce4..253208e5062 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.planner.runtime.utils._
@@ -1151,7 +1151,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends 
StreamingWithStateTestB
 }
 
 private class Row4WatermarkExtractor
-  extends AssignerWithPunctuatedWatermarks[(Int, Long, String, Long)] {
+  extends WatermarkStrategyWithPunctuatedWatermarks[(Int, Long, String, Long)] 
{
 
   override def checkAndGetNextWatermark(
       lastElement: (Int, Long, String, Long),
@@ -1167,7 +1167,7 @@ private class Row4WatermarkExtractor
 }
 
 private class Row3WatermarkExtractor2
-  extends AssignerWithPunctuatedWatermarks[(String, String, Long)] {
+  extends WatermarkStrategyWithPunctuatedWatermarks[(String, String, Long)] {
 
   override def checkAndGetNextWatermark(
       lastElement: (String, String, Long),
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
index 2dd2d73e162..3e7fcce7971 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.planner.runtime.utils
 import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
 import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.runtime.state.{StateInitializationContext, 
StateSnapshotContext}
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction
 import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, 
OneInputStreamOperator}
 import org.apache.flink.streaming.api.watermark.Watermark
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks
 import org.apache.flink.streaming.runtime.streamrecord.{RecordAttributes, 
StreamRecord}
 import org.apache.flink.table.planner.JLong
 
@@ -45,7 +45,7 @@ object TimeTestUtil {
   }
 
   class TimestampAndWatermarkWithOffset[T <: Product](offset: Long)
-    extends AssignerWithPunctuatedWatermarks[T] {
+    extends WatermarkStrategyWithPunctuatedWatermarks[T] {
 
     override def checkAndGetNextWatermark(lastElement: T, extractedTimestamp: 
Long): Watermark = {
       new Watermark(extractedTimestamp - offset)
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
index 3ed95564949..42ed9012454 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.runtime.operators.wmassigners;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
 
@@ -28,7 +28,8 @@ import javax.annotation.Nullable;
 
 /** Generates periodic watermarks based on a {@link 
PeriodicWatermarkAssigner}. */
 @Internal
-public class PeriodicWatermarkAssignerWrapper implements 
AssignerWithPeriodicWatermarks<RowData> {
+public class PeriodicWatermarkAssignerWrapper
+        implements WatermarkStrategyWithPeriodicWatermarks<RowData> {
     private static final long serialVersionUID = 1L;
     private final PeriodicWatermarkAssigner assigner;
     private final int timeFieldIdx;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
index 1fcb490c526..ab0f0f34b3a 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.operators.wmassigners;
 
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -94,7 +93,9 @@ public class ProcTimeMiniBatchAssignerOperator extends 
AbstractStreamOperator<Ro
 
     /**
      * Override the base implementation to completely ignore watermarks 
propagated from upstream (we
-     * rely only on the {@link AssignerWithPeriodicWatermarks} to emit 
watermarks from here).
+     * rely only on the {@link
+     * 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks}
 to
+     * emit watermarks from here).
      */
     @Override
     public void processWatermark(Watermark mark) throws Exception {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkStrategyWrapper.java
similarity index 91%
rename from 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
rename to 
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkStrategyWrapper.java
index a3f99e2bf2e..46f44012b2d 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkStrategyWrapper.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.runtime.operators.wmassigners;
 
 import org.apache.flink.annotation.Internal;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.util.DataFormatConverters;
@@ -33,8 +33,8 @@ import javax.annotation.Nullable;
 
 /** Generates periodic watermarks based on a {@link 
PunctuatedWatermarkAssigner}. */
 @Internal
-public class PunctuatedWatermarkAssignerWrapper
-        implements AssignerWithPunctuatedWatermarks<RowData> {
+public class PunctuatedWatermarkStrategyWrapper
+        implements WatermarkStrategyWithPunctuatedWatermarks<RowData> {
     private static final long serialVersionUID = 1L;
     private final PunctuatedWatermarkAssigner assigner;
     private final int timeFieldIdx;
@@ -46,7 +46,7 @@ public class PunctuatedWatermarkAssignerWrapper
      * @param sourceType the type of source
      */
     @SuppressWarnings("unchecked")
-    public PunctuatedWatermarkAssignerWrapper(
+    public PunctuatedWatermarkStrategyWrapper(
             PunctuatedWatermarkAssigner assigner, int timeFieldIdx, DataType 
sourceType) {
         this.assigner = assigner;
         this.timeFieldIdx = timeFieldIdx;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 895bedfed74..2b9167faa98 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
 import org.apache.flink.test.util.AbstractTestBaseJUnit4;
 import org.apache.flink.util.Collector;
 
@@ -197,7 +197,7 @@ public class BroadcastStateITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     private abstract static class CustomWmEmitter<T>
-            implements AssignerWithPunctuatedWatermarks<T> {
+            implements WatermarkStrategyWithPunctuatedWatermarks<T> {
 
         private static final long serialVersionUID = -5187335197674841233L;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
index 0ab086850b8..31cd6301a06 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java
@@ -26,13 +26,13 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.test.util.AbstractTestBaseJUnit4;
@@ -425,7 +425,7 @@ public class CoGroupJoinITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     private static class Tuple2TimestampExtractor
-            implements AssignerWithPunctuatedWatermarks<Tuple2<String, 
Integer>> {
+            implements 
WatermarkStrategyWithPunctuatedWatermarks<Tuple2<String, Integer>> {
 
         @Override
         public long extractTimestamp(Tuple2<String, Integer> element, long 
previousTimestamp) {
@@ -440,7 +440,7 @@ public class CoGroupJoinITCase extends 
AbstractTestBaseJUnit4 {
     }
 
     private static class Tuple3TimestampExtractor
-            implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, 
Integer>> {
+            implements 
WatermarkStrategyWithPunctuatedWatermarks<Tuple3<String, String, Integer>> {
 
         @Override
         public long extractTimestamp(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 4902f48f383..9f60d9358d2 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
@@ -39,6 +38,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import 
org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
 import org.apache.flink.test.util.AbstractTestBaseJUnit4;
@@ -855,7 +855,7 @@ public class SideOutputITCase extends 
AbstractTestBaseJUnit4 implements Serializ
     }
 
     private static class TestWatermarkAssigner
-            implements AssignerWithPunctuatedWatermarks<Integer> {
+            implements WatermarkStrategyWithPunctuatedWatermarks<Integer> {
         private static final long serialVersionUID = 1L;
 
         @Nullable

Reply via email to