This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a75df8776feeda2c5496fda38845e1836e8c6a90 Author: yunfengzhou-hub <[email protected]> AuthorDate: Tue Sep 24 19:28:25 2024 +0800 [FLINK-36355][runtime] Remove deprecated TimestampAssigner --- .../tests/DataStreamAllroundTestJobFactory.java | 4 +- .../tests/DataStreamAllroundTestProgram.java | 4 +- .../RocksDBStateMemoryControlTestProgram.java | 4 +- .../tests/StatefulStreamJobUpgradeTestProgram.java | 6 +- .../test/java/org/apache/flink/cep/CEPITCase.java | 17 +++-- .../flink/streaming/api/datastream/DataStream.java | 50 ------------- .../streaming/api/functions/TimestampAssigner.java | 50 ------------- .../assigners/SlidingEventTimeWindows.java | 3 +- .../assigners/TumblingEventTimeWindows.java | 3 +- .../AssignerWithPeriodicWatermarksAdapter.java | 84 --------------------- .../AssignerWithPunctuatedWatermarksAdapter.java | 87 ---------------------- .../WatermarkStrategyWithPeriodicWatermarks.java} | 50 +++++++++++-- ...WatermarkStrategyWithPunctuatedWatermarks.java} | 51 ++++++++++--- .../timestamps/AscendingTimestampExtractor.java | 5 +- .../BoundedOutOfOrdernessTimestampExtractor.java | 9 ++- .../apache/flink/streaming/api/DataStreamTest.java | 4 +- .../stream/StreamExecLegacyTableSourceScan.java | 6 +- .../runtime/stream/sql/IntervalJoinITCase.scala | 6 +- .../table/planner/runtime/utils/TimeTestUtil.scala | 4 +- .../PeriodicWatermarkAssignerWrapper.java | 5 +- .../ProcTimeMiniBatchAssignerOperator.java | 5 +- ...ava => PunctuatedWatermarkStrategyWrapper.java} | 8 +- .../streaming/runtime/BroadcastStateITCase.java | 4 +- .../test/streaming/runtime/CoGroupJoinITCase.java | 6 +- .../test/streaming/runtime/SideOutputITCase.java | 4 +- 25 files changed, 141 insertions(+), 338 deletions(-) diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java index 1720004be2e..85107ebb222 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.tests; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -429,8 +430,7 @@ public class DataStreamAllroundTestJobFactory { SEQUENCE_GENERATOR_SRC_SLEEP_AFTER_ELEMENTS.defaultValue())); } - static BoundedOutOfOrdernessTimestampExtractor<Event> createTimestampExtractor( - ParameterTool pt) { + static WatermarkStrategy<Event> createWatermarkStrategy(ParameterTool pt) { return new BoundedOutOfOrdernessTimestampExtractor<Event>( Duration.ofMillis( pt.getLong( diff --git a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java index bd3e991a430..47488974383 100644 --- a/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java +++ b/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java @@ -49,7 +49,7 @@ import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory. import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindow; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSlidingWindowCheckMapper; -import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.isSimulateFailures; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE; @@ -94,7 +94,7 @@ public class DataStreamAllroundTestProgram { env.addSource(createEventSource(pt)) .name(EVENT_SOURCE.getName()) .uid(EVENT_SOURCE.getUid()) - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) + .assignTimestampsAndWatermarks(createWatermarkStrategy(pt)) .keyBy(Event::getKey) .map( createArtificialKeyedStateMapper( diff --git a/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java b/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java index f984000d4f6..c4ae019900b 100644 --- a/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java +++ b/flink-end-to-end-tests/flink-rocksdb-state-memory-control-test/src/main/java/org/apache/flink/streaming/tests/RocksDBStateMemoryControlTestProgram.java @@ -36,7 +36,7 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.ParameterTool; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.applyTumblingWindows; -import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; import static org.apache.flink.streaming.tests.TestOperatorEnum.EVENT_SOURCE; import static org.apache.flink.streaming.tests.TestOperatorEnum.TIME_WINDOW_OPER; @@ -61,7 +61,7 @@ public class RocksDBStateMemoryControlTestProgram { env.addSource(DataStreamAllroundTestJobFactory.createEventSource(pt)) .name(EVENT_SOURCE.getName()) .uid(EVENT_SOURCE.getUid()) - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) + .assignTimestampsAndWatermarks(createWatermarkStrategy(pt)) .keyBy(Event::getKey); keyedStream diff --git a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java index 349ba6efdc4..a280c256b41 100644 --- a/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java +++ b/flink-end-to-end-tests/flink-stream-stateful-job-upgrade-test/src/main/java/org/apache/flink/streaming/tests/StatefulStreamJobUpgradeTestProgram.java @@ -36,7 +36,7 @@ import java.util.List; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createArtificialKeyedStateMapper; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createEventSource; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createSemanticsCheckMapper; -import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createTimestampExtractor; +import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.createWatermarkStrategy; import static org.apache.flink.streaming.tests.DataStreamAllroundTestJobFactory.setupEnvironment; /** @@ -94,7 +94,7 @@ public class StatefulStreamJobUpgradeTestProgram { env.addSource(createEventSource(pt)) .name("EventSource") .uid("EventSource") - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) + .assignTimestampsAndWatermarks(createWatermarkStrategy(pt)) .keyBy(Event::getKey); List<TypeSerializer<ComplexPayload>> stateSer = @@ -119,7 +119,7 @@ public class StatefulStreamJobUpgradeTestProgram { env.addSource(createEventSource(pt)) .name("EventSource") .uid("EventSource") - .assignTimestampsAndWatermarks(createTimestampExtractor(pt)) + .assignTimestampsAndWatermarks(createWatermarkStrategy(pt)) .map(new UpgradeEvent()) .keyBy(UpgradedEvent::getKey); diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 86e2afa8a8d..6e4c51778e7 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -41,8 +41,8 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks; import org.apache.flink.test.util.AbstractTestBaseJUnit4; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Collector; @@ -219,7 +219,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 { // last element for high final watermark Tuple2.of(new Event(5, "middle", 5.0), 100L)) .assignTimestampsAndWatermarks( - new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { + new WatermarkStrategyWithPunctuatedWatermarks< + Tuple2<Event, Long>>() { @Override public long extractTimestamp( @@ -300,7 +301,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 { Tuple2.of(new Event(3, "middle", 6.0), 9L), Tuple2.of(new Event(3, "end", 7.0), 7L)) .assignTimestampsAndWatermarks( - new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { + new WatermarkStrategyWithPunctuatedWatermarks< + Tuple2<Event, Long>>() { @Override public long extractTimestamp( @@ -458,7 +460,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 { Tuple2.of(new Event(1, "start", 2.0), 4L), Tuple2.of(new Event(1, "end", 2.0), 6L)) .assignTimestampsAndWatermarks( - new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { + new WatermarkStrategyWithPunctuatedWatermarks< + Tuple2<Event, Long>>() { @Override public long extractTimestamp( @@ -551,7 +554,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 { Tuple2.of(new Event(1, "start", 2.0), 4L), Tuple2.of(new Event(1, "end", 2.0), 6L)) .assignTimestampsAndWatermarks( - new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { + new WatermarkStrategyWithPunctuatedWatermarks< + Tuple2<Event, Long>>() { @Override public long extractTimestamp( @@ -714,7 +718,8 @@ public class CEPITCase extends AbstractTestBaseJUnit4 { // last element for high final watermark Tuple2.of(new Event(7, "middle", 5.0), 100L)) .assignTimestampsAndWatermarks( - new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { + new WatermarkStrategyWithPunctuatedWatermarks< + Tuple2<Event, Long>>() { @Override public long extractTimestamp( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 489c9c6ef91..175931c3dee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -49,8 +49,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.configuration.RpcOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction; import org.apache.flink.streaming.api.functions.sink.legacy.PrintSinkFunction; @@ -80,8 +78,6 @@ import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.operators.util.AssignerWithPeriodicWatermarksAdapter; -import org.apache.flink.streaming.runtime.operators.util.AssignerWithPunctuatedWatermarksAdapter; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -672,52 +668,6 @@ public class DataStream<T> { return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation); } - /** - * Assigns timestamps to the elements in the data stream and periodically creates watermarks to - * signal event time progress. - * - * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link - * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new - * interfaces support watermark idleness and no longer need to differentiate between "periodic" - * and "punctuated" watermarks. - * - * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead. - */ - @Deprecated - public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( - AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) { - - final AssignerWithPeriodicWatermarks<T> cleanedAssigner = - clean(timestampAndWatermarkAssigner); - final WatermarkStrategy<T> wms = - new AssignerWithPeriodicWatermarksAdapter.Strategy<>(cleanedAssigner); - - return assignTimestampsAndWatermarks(wms); - } - - /** - * Assigns timestamps to the elements in the data stream and creates watermarks based on events, - * to signal event time progress. - * - * <p>This method uses the deprecated watermark generator interfaces. Please switch to {@link - * #assignTimestampsAndWatermarks(WatermarkStrategy)} to use the new interfaces instead. The new - * interfaces support watermark idleness and no longer need to differentiate between "periodic" - * and "punctuated" watermarks. - * - * @deprecated Please use {@link #assignTimestampsAndWatermarks(WatermarkStrategy)} instead. - */ - @Deprecated - public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( - AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner) { - - final AssignerWithPunctuatedWatermarks<T> cleanedAssigner = - clean(timestampAndWatermarkAssigner); - final WatermarkStrategy<T> wms = - new AssignerWithPunctuatedWatermarksAdapter.Strategy<>(cleanedAssigner); - - return assignTimestampsAndWatermarks(wms); - } - // ------------------------------------------------------------------------ // Data sinks // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java deleted file mode 100644 index 4d945742e79..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/TimestampAssigner.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.functions; - -import org.apache.flink.api.common.functions.Function; - -/** - * A {@code TimestampAssigner} assigns event time timestamps to elements. These timestamps are used - * by all functions that operate on event time, for example event time windows. - * - * <p>Timestamps are represented in milliseconds since the Epoch (midnight, January 1, 1970 UTC). - * - * @param <T> The type of the elements to which this assigner assigns timestamps. - * @deprecated use {@link org.apache.flink.api.common.eventtime.TimestampAssigner} - */ -@Deprecated -public interface TimestampAssigner<T> - extends org.apache.flink.api.common.eventtime.TimestampAssigner<T>, Function { - - /** - * Assigns a timestamp to an element, in milliseconds since the Epoch. - * - * <p>The method is passed the previously assigned timestamp of the element. That previous - * timestamp may have been assigned from a previous assigner, by ingestion time. If the element - * did not carry a timestamp before, this value is {@code Long.MIN_VALUE}. - * - * @param element The element that the timestamp will be assigned to. - * @param recordTimestamp The previous internal timestamp of the element, or a negative value, - * if no timestamp has been assigned yet. - * @return The new timestamp. - */ - @Override - long extractTimestamp(T element, long recordTimestamp); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java index be291e27036..f338738bbb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java @@ -79,8 +79,7 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> } else { throw new RuntimeException( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " - + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " - + "'DataStream.assignTimestampsAndWatermarks(...)'?"); + + "Did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java index 95434b9bb3b..5f4eccae041 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java @@ -82,8 +82,7 @@ public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> } else { throw new RuntimeException( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " - + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " - + "'DataStream.assignTimestampsAndWatermarks(...)'?"); + + "Did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?"); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPeriodicWatermarksAdapter.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPeriodicWatermarksAdapter.java deleted file mode 100644 index ab54a2ec499..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPeriodicWatermarksAdapter.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.util; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.eventtime.TimestampAssigner; -import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; -import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.eventtime.WatermarkGenerator; -import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; -import org.apache.flink.api.common.eventtime.WatermarkOutput; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * An adapter that wraps a {@link AssignerWithPeriodicWatermarks} into a {@link WatermarkGenerator}. - */ -@Internal -@SuppressWarnings("deprecation") -public final class AssignerWithPeriodicWatermarksAdapter<T> implements WatermarkGenerator<T> { - - private final AssignerWithPeriodicWatermarks<T> wms; - - public AssignerWithPeriodicWatermarksAdapter(AssignerWithPeriodicWatermarks<T> wms) { - this.wms = checkNotNull(wms); - } - - @Override - public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {} - - @Override - public void onPeriodicEmit(WatermarkOutput output) { - final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark(); - if (next != null) { - output.emitWatermark(new Watermark(next.getTimestamp())); - } - } - - // ------------------------------------------------------------------------ - - /** - * A WatermarkStrategy that returns an {@link AssignerWithPeriodicWatermarks} wrapped as a - * {@link WatermarkGenerator}. - */ - public static final class Strategy<T> implements WatermarkStrategy<T> { - private static final long serialVersionUID = 1L; - - private final AssignerWithPeriodicWatermarks<T> wms; - - public Strategy(AssignerWithPeriodicWatermarks<T> wms) { - this.wms = checkNotNull(wms); - } - - @Override - public TimestampAssigner<T> createTimestampAssigner( - TimestampAssignerSupplier.Context context) { - return wms; - } - - @Override - public WatermarkGenerator<T> createWatermarkGenerator( - WatermarkGeneratorSupplier.Context context) { - return new AssignerWithPeriodicWatermarksAdapter<>(wms); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPunctuatedWatermarksAdapter.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPunctuatedWatermarksAdapter.java deleted file mode 100644 index 8ca9a2ce38c..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/AssignerWithPunctuatedWatermarksAdapter.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.operators.util; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.eventtime.TimestampAssigner; -import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; -import org.apache.flink.api.common.eventtime.Watermark; -import org.apache.flink.api.common.eventtime.WatermarkGenerator; -import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; -import org.apache.flink.api.common.eventtime.WatermarkOutput; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * An adapter that wraps a {@link AssignerWithPunctuatedWatermarks} into a {@link - * WatermarkGenerator}. - */ -@Internal -@SuppressWarnings("deprecation") -public final class AssignerWithPunctuatedWatermarksAdapter<T> implements WatermarkGenerator<T> { - - private final AssignerWithPunctuatedWatermarks<T> wms; - - public AssignerWithPunctuatedWatermarksAdapter(AssignerWithPunctuatedWatermarks<T> wms) { - this.wms = checkNotNull(wms); - } - - @Override - public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { - final org.apache.flink.streaming.api.watermark.Watermark next = - wms.checkAndGetNextWatermark(event, eventTimestamp); - - if (next != null) { - output.emitWatermark(new Watermark(next.getTimestamp())); - } - } - - @Override - public void onPeriodicEmit(WatermarkOutput output) {} - - // ------------------------------------------------------------------------ - - /** - * A WatermarkStrategy that returns an {@link AssignerWithPunctuatedWatermarks} wrapped as a - * {@link WatermarkGenerator}. - */ - public static final class Strategy<T> implements WatermarkStrategy<T> { - private static final long serialVersionUID = 1L; - - private final AssignerWithPunctuatedWatermarks<T> wms; - - public Strategy(AssignerWithPunctuatedWatermarks<T> wms) { - this.wms = checkNotNull(wms); - } - - @Override - public TimestampAssigner<T> createTimestampAssigner( - TimestampAssignerSupplier.Context context) { - return wms; - } - - @Override - public WatermarkGenerator<T> createWatermarkGenerator( - WatermarkGeneratorSupplier.Context context) { - return new AssignerWithPunctuatedWatermarksAdapter<>(wms); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPeriodicWatermarks.java similarity index 62% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPeriodicWatermarks.java index 93ac8f3425e..dcd0c37b501 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPeriodicWatermarks.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPeriodicWatermarks.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,18 +16,25 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.functions; +package org.apache.flink.streaming.runtime.operators.util; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.watermark.Watermark; import javax.annotation.Nullable; /** - * The {@code AssignerWithPeriodicWatermarks} assigns event time timestamps to elements, and - * generates low watermarks that signal event time progress within the stream. These timestamps and - * watermarks are used by functions and operators that operate on event time, for example event time - * windows. + * The {@code WatermarkStrategyWithPeriodicWatermarks} assigns event time timestamps to elements, + * and generates low watermarks that signal event time progress within the stream. These timestamps + * and watermarks are used by functions and operators that operate on event time, for example event + * time windows. * * <p>Use this class to generate watermarks in a periodical interval. At most every {@code i} * milliseconds (configured via {@link ExecutionConfig#getAutoWatermarkInterval()}), the system will @@ -46,8 +53,9 @@ import javax.annotation.Nullable; * @param <T> The type of the elements to which this assigner assigns timestamps. * @see org.apache.flink.streaming.api.watermark.Watermark */ -@Deprecated -public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> { +@Internal +public interface WatermarkStrategyWithPeriodicWatermarks<T> + extends WatermarkStrategy<T>, TimestampAssigner<T> { /** * Returns the current watermark. This method is periodically called by the system to retrieve @@ -70,4 +78,30 @@ public interface AssignerWithPeriodicWatermarks<T> extends TimestampAssigner<T> */ @Nullable Watermark getCurrentWatermark(); + + @Override + default TimestampAssigner<T> createTimestampAssigner( + TimestampAssignerSupplier.Context context) { + return this; + } + + @Override + default WatermarkGenerator<T> createWatermarkGenerator( + WatermarkGeneratorSupplier.Context context) { + return new WatermarkGenerator<T>() { + @Override + public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {} + + @Override + public void onPeriodicEmit(WatermarkOutput output) { + final org.apache.flink.streaming.api.watermark.Watermark next = + getCurrentWatermark(); + if (next != null) { + output.emitWatermark( + new org.apache.flink.api.common.eventtime.Watermark( + next.getTimestamp())); + } + } + }; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPunctuatedWatermarks.java similarity index 66% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPunctuatedWatermarks.java index 310167c1ed9..1f5992542f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/AssignerWithPunctuatedWatermarks.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/util/WatermarkStrategyWithPunctuatedWatermarks.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,9 +16,16 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.functions; +package org.apache.flink.streaming.runtime.operators.util; -import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import javax.annotation.Nullable; @@ -34,7 +41,7 @@ import javax.annotation.Nullable; * previous watermark (to preserve the contract of ascending watermarks). * * <p>For use cases that should periodically emit watermarks based on element timestamps, use the - * {@link AssignerWithPeriodicWatermarks} instead. + * {@link WatermarkStrategyWithPeriodicWatermarks} instead. * * <p>The following example illustrates how to use this timestamp extractor and watermark generator. * It assumes elements carry a timestamp that describes when they were created, and that some @@ -62,9 +69,9 @@ import javax.annotation.Nullable; * @param <T> The type of the elements to which this assigner assigns timestamps. * @see org.apache.flink.streaming.api.watermark.Watermark */ -@Deprecated -public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T> { - +@Internal +public interface WatermarkStrategyWithPunctuatedWatermarks<T> + extends WatermarkStrategy<T>, TimestampAssigner<T> { /** * Asks this implementation if it wants to emit a watermark. This method is called right after * the {@link #extractTimestamp(Object, long)} method. @@ -75,10 +82,36 @@ public interface AssignerWithPunctuatedWatermarks<T> extends TimestampAssigner<T * smaller than that of the last emitted one, then no new watermark will be generated. * * <p>For an example how to use this method, see the documentation of {@link - * AssignerWithPunctuatedWatermarks this class}. + * WatermarkStrategyWithPunctuatedWatermarks this class}. * * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. */ @Nullable - Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp); + org.apache.flink.streaming.api.watermark.Watermark checkAndGetNextWatermark( + T lastElement, long extractedTimestamp); + + @Override + default TimestampAssigner<T> createTimestampAssigner( + TimestampAssignerSupplier.Context context) { + return this; + } + + @Override + default WatermarkGenerator<T> createWatermarkGenerator( + WatermarkGeneratorSupplier.Context context) { + return new WatermarkGenerator<T>() { + @Override + public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { + final org.apache.flink.streaming.api.watermark.Watermark next = + checkAndGetNextWatermark(event, eventTimestamp); + + if (next != null) { + output.emitWatermark(new Watermark(next.getTimestamp())); + } + } + + @Override + public void onPeriodicEmit(WatermarkOutput output) {} + }; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java index c59087476f0..f047f5ec8ef 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/AscendingTimestampExtractor.java @@ -19,8 +19,8 @@ package org.apache.flink.streaming.api.functions.timestamps; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +36,8 @@ import static java.util.Objects.requireNonNull; */ @Deprecated @PublicEvolving -public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> { +public abstract class AscendingTimestampExtractor<T> + implements WatermarkStrategyWithPeriodicWatermarks<T> { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java index c07f9ac34e6..f2fd0b1d8d4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/BoundedOutOfOrdernessTimestampExtractor.java @@ -18,21 +18,22 @@ package org.apache.flink.streaming.api.functions.timestamps; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks; import java.time.Duration; /** - * This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the - * element with the maximum timestamp (in event time) seen so far by a fixed amount of time, <code> + * This is a {@link WatermarkStrategyWithPeriodicWatermarks} used to emit Watermarks that lag behind + * the element with the maximum timestamp (in event time) seen so far by a fixed amount of time, + * <code> * t_late</code>. This can help reduce the number of elements that are ignored due to lateness when * computing the final result for a given window, in the case where we know that elements arrive no * later than <code>t_late</code> units of time after the watermark that signals that the system * event-time has advanced past their (event-time) timestamp. */ public abstract class BoundedOutOfOrdernessTimestampExtractor<T> - implements AssignerWithPeriodicWatermarks<T> { + implements WatermarkStrategyWithPeriodicWatermarks<T> { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 4c600d621e0..c02464b9557 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -49,7 +49,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; @@ -71,6 +70,7 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo import org.apache.flink.streaming.api.windowing.triggers.CountTrigger; import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -1219,7 +1219,7 @@ class DataStreamTest { } private abstract static class CustomWmEmitter<T> - implements AssignerWithPunctuatedWatermarks<T> { + implements WatermarkStrategyWithPunctuatedWatermarks<T> { @Nullable @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java index 289eaad07fb..1f45247840e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.java @@ -40,7 +40,7 @@ import org.apache.flink.table.planner.sources.TableSourceUtil; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.operators.TableStreamOperator; import org.apache.flink.table.runtime.operators.wmassigners.PeriodicWatermarkAssignerWrapper; -import org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkAssignerWrapper; +import org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkStrategyWrapper; import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner; import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner; import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy; @@ -151,8 +151,8 @@ public class StreamExecLegacyTableSourceScan extends CommonExecLegacyTableSource return ingestedTable.assignTimestampsAndWatermarks( watermarkGenerator); } else if (strategy instanceof PunctuatedWatermarkAssigner) { - PunctuatedWatermarkAssignerWrapper watermarkGenerator = - new PunctuatedWatermarkAssignerWrapper( + PunctuatedWatermarkStrategyWrapper watermarkGenerator = + new PunctuatedWatermarkStrategyWrapper( (PunctuatedWatermarkAssigner) strategy, rowtimeFieldIdx, tableSource.getProducedDataType()); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala index 3b637c34ce4..253208e5062 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala @@ -17,8 +17,8 @@ */ package org.apache.flink.table.planner.runtime.stream.sql -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.planner.runtime.utils._ @@ -1151,7 +1151,7 @@ class IntervalJoinITCase(mode: StateBackendMode) extends StreamingWithStateTestB } private class Row4WatermarkExtractor - extends AssignerWithPunctuatedWatermarks[(Int, Long, String, Long)] { + extends WatermarkStrategyWithPunctuatedWatermarks[(Int, Long, String, Long)] { override def checkAndGetNextWatermark( lastElement: (Int, Long, String, Long), @@ -1167,7 +1167,7 @@ private class Row4WatermarkExtractor } private class Row3WatermarkExtractor2 - extends AssignerWithPunctuatedWatermarks[(String, String, Long)] { + extends WatermarkStrategyWithPunctuatedWatermarks[(String, String, Long)] { override def checkAndGetNextWatermark( lastElement: (String, String, Long), diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala index 2dd2d73e162..3e7fcce7971 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TimeTestUtil.scala @@ -20,10 +20,10 @@ package org.apache.flink.table.planner.runtime.utils import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.typeinfo.Types import org.apache.flink.runtime.state.{StateInitializationContext, StateSnapshotContext} -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction import org.apache.flink.streaming.api.operators.{AbstractStreamOperator, OneInputStreamOperator} import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks import org.apache.flink.streaming.runtime.streamrecord.{RecordAttributes, StreamRecord} import org.apache.flink.table.planner.JLong @@ -45,7 +45,7 @@ object TimeTestUtil { } class TimestampAndWatermarkWithOffset[T <: Product](offset: Long) - extends AssignerWithPunctuatedWatermarks[T] { + extends WatermarkStrategyWithPunctuatedWatermarks[T] { override def checkAndGetNextWatermark(lastElement: T, extractedTimestamp: Long): Watermark = { new Watermark(extractedTimestamp - offset) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java index 3ed95564949..42ed9012454 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PeriodicWatermarkAssignerWrapper.java @@ -19,8 +19,8 @@ package org.apache.flink.table.runtime.operators.wmassigners; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks; import org.apache.flink.table.data.RowData; import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner; @@ -28,7 +28,8 @@ import javax.annotation.Nullable; /** Generates periodic watermarks based on a {@link PeriodicWatermarkAssigner}. */ @Internal -public class PeriodicWatermarkAssignerWrapper implements AssignerWithPeriodicWatermarks<RowData> { +public class PeriodicWatermarkAssignerWrapper + implements WatermarkStrategyWithPeriodicWatermarks<RowData> { private static final long serialVersionUID = 1L; private final PeriodicWatermarkAssigner assigner; private final int timeFieldIdx; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java index 1fcb490c526..ab0f0f34b3a 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java @@ -20,7 +20,6 @@ package org.apache.flink.table.runtime.operators.wmassigners; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.metrics.Gauge; -import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -94,7 +93,9 @@ public class ProcTimeMiniBatchAssignerOperator extends AbstractStreamOperator<Ro /** * Override the base implementation to completely ignore watermarks propagated from upstream (we - * rely only on the {@link AssignerWithPeriodicWatermarks} to emit watermarks from here). + * rely only on the {@link + * org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPeriodicWatermarks} to + * emit watermarks from here). */ @Override public void processWatermark(Watermark mark) throws Exception { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkStrategyWrapper.java similarity index 91% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkStrategyWrapper.java index a3f99e2bf2e..46f44012b2d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkAssignerWrapper.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/PunctuatedWatermarkStrategyWrapper.java @@ -19,8 +19,8 @@ package org.apache.flink.table.runtime.operators.wmassigners; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; @@ -33,8 +33,8 @@ import javax.annotation.Nullable; /** Generates periodic watermarks based on a {@link PunctuatedWatermarkAssigner}. */ @Internal -public class PunctuatedWatermarkAssignerWrapper - implements AssignerWithPunctuatedWatermarks<RowData> { +public class PunctuatedWatermarkStrategyWrapper + implements WatermarkStrategyWithPunctuatedWatermarks<RowData> { private static final long serialVersionUID = 1L; private final PunctuatedWatermarkAssigner assigner; private final int timeFieldIdx; @@ -46,7 +46,7 @@ public class PunctuatedWatermarkAssignerWrapper * @param sourceType the type of source */ @SuppressWarnings("unchecked") - public PunctuatedWatermarkAssignerWrapper( + public PunctuatedWatermarkStrategyWrapper( PunctuatedWatermarkAssigner assigner, int timeFieldIdx, DataType sourceType) { this.assigner = assigner; this.timeFieldIdx = timeFieldIdx; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java index 895bedfed74..2b9167faa98 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java @@ -25,11 +25,11 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks; import org.apache.flink.test.util.AbstractTestBaseJUnit4; import org.apache.flink.util.Collector; @@ -197,7 +197,7 @@ public class BroadcastStateITCase extends AbstractTestBaseJUnit4 { } private abstract static class CustomWmEmitter<T> - implements AssignerWithPunctuatedWatermarks<T> { + implements WatermarkStrategyWithPunctuatedWatermarks<T> { private static final long serialVersionUID = -5187335197674841233L; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java index 0ab086850b8..31cd6301a06 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CoGroupJoinITCase.java @@ -26,13 +26,13 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.CoGroupedStreams; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks; import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.test.util.AbstractTestBaseJUnit4; @@ -425,7 +425,7 @@ public class CoGroupJoinITCase extends AbstractTestBaseJUnit4 { } private static class Tuple2TimestampExtractor - implements AssignerWithPunctuatedWatermarks<Tuple2<String, Integer>> { + implements WatermarkStrategyWithPunctuatedWatermarks<Tuple2<String, Integer>> { @Override public long extractTimestamp(Tuple2<String, Integer> element, long previousTimestamp) { @@ -440,7 +440,7 @@ public class CoGroupJoinITCase extends AbstractTestBaseJUnit4 { } private static class Tuple3TimestampExtractor - implements AssignerWithPunctuatedWatermarks<Tuple3<String, String, Integer>> { + implements WatermarkStrategyWithPunctuatedWatermarks<Tuple3<String, String, Integer>> { @Override public long extractTimestamp( diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java index 4902f48f383..9f60d9358d2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java @@ -24,7 +24,6 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.streaming.api.functions.co.CoProcessFunction; @@ -39,6 +38,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; +import org.apache.flink.streaming.runtime.operators.util.WatermarkStrategyWithPunctuatedWatermarks; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.test.streaming.runtime.util.TestListResultSink; import org.apache.flink.test.util.AbstractTestBaseJUnit4; @@ -855,7 +855,7 @@ public class SideOutputITCase extends AbstractTestBaseJUnit4 implements Serializ } private static class TestWatermarkAssigner - implements AssignerWithPunctuatedWatermarks<Integer> { + implements WatermarkStrategyWithPunctuatedWatermarks<Integer> { private static final long serialVersionUID = 1L; @Nullable
