This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit be1e804f67124822e869689ebc4dc87f4b41c9f1 Author: Arvid Heise <[email protected]> AuthorDate: Fri Dec 10 22:54:20 2021 +0100 [FLINK-25569][core] Extract public facing ProcessingTimeService to flink-core --- .../kafka/internals/AbstractFetcher.java | 2 +- .../kinesis/internals/KinesisDataFetcher.java | 2 +- .../common/operators/ProcessingTimeService.java | 57 ++++++++++++++++++++++ .../runtime/NeverFireProcessingTimeService.java | 1 - .../sink/filesystem/StreamingFileSinkHelper.java | 2 +- .../api/operators/LatencyMarkerEmitter.java | 2 +- .../api/operators/StreamSourceContexts.java | 2 +- .../operators/TimestampsAndWatermarksOperator.java | 2 +- .../runtime/tasks/ProcessingTimeCallback.java | 41 ---------------- .../runtime/tasks/ProcessingTimeService.java | 17 +------ .../flink/streaming/runtime/tasks/StreamTask.java | 1 + .../runtime/operators/StreamTaskTimerTest.java | 2 +- .../runtime/tasks/StreamOperatorWrapperTest.java | 1 + .../streaming/runtime/tasks/StreamTaskTest.java | 1 + .../tasks/SystemProcessingTimeServiceTest.java | 1 + .../ProcTimeMiniBatchAssignerOperator.java | 5 +- .../wmassigners/WatermarkAssignerOperator.java | 4 +- .../lifecycle/graph/MultiInputTestOperator.java | 2 +- .../graph/OneInputTestStreamOperator.java | 2 +- .../graph/TwoInputTestStreamOperator.java | 2 +- .../streaming/runtime/StreamTaskTimerITCase.java | 2 +- 21 files changed, 79 insertions(+), 72 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index dda8971..d189b6d5 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -22,12 +22,12 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode; import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.SerializedValue; diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 37b12c2..21b3736 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kinesis.internals; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; @@ -52,7 +53,6 @@ import org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter; import org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil; import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker; import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/ProcessingTimeService.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/ProcessingTimeService.java new file mode 100644 index 0000000..65a97f7 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/ProcessingTimeService.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.operators; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.IOException; +import java.util.concurrent.ScheduledFuture; + +/** + * A service that allows to get the current processing time and register timers that will execute + * the given {@link ProcessingTimeCallback} when firing. + */ +@PublicEvolving +public interface ProcessingTimeService { + /** Returns the current processing time. */ + long getCurrentProcessingTime(); + + /** + * Registers a task to be executed when (processing) time is {@code timestamp}. + * + * @param timestamp Time when the task is to be executed (in processing time) + * @param target The task to be executed + * @return The future that represents the scheduled task. This always returns some future, even + * if the timer was shut down + */ + ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target); + + /** + * A callback that can be registered via {@link #registerTimer(long, ProcessingTimeCallback)}. + */ + @PublicEvolving + interface ProcessingTimeCallback { + /** + * This method is invoked with the time which the callback register for. + * + * @param time The time this callback was registered for. + */ + void onProcessingTime(long time) throws IOException, InterruptedException, Exception; + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java index eb73aad..3f18605 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java @@ -18,7 +18,6 @@ package org.apache.flink.state.api.runtime; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.TimerService; import org.apache.flink.util.concurrent.NeverCompleteFuture; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java index 6c806ba..0e653a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -26,7 +27,6 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import javax.annotation.Nullable; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java index cbd677a..36fb1f0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import java.util.concurrent.ScheduledFuture; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index af5b0c5..c57d730 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -17,11 +17,11 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.util.FlinkRuntimeException; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java index b435938..a10c92e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java @@ -28,9 +28,9 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; +import static org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import static org.apache.flink.util.Preconditions.checkNotNull; /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java deleted file mode 100644 index 033e97f..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.tasks; - -import org.apache.flink.annotation.Internal; - -/** - * Interface for processing-time callbacks that can be registered at a {@link - * ProcessingTimeService}. - */ -@Internal -@FunctionalInterface -public interface ProcessingTimeCallback { - - /** - * This method is invoked with the timestamp for which the trigger was scheduled. - * - * <p>If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled - * due to a garbage collection), the timestamp supplied to this function will still be the - * original timestamp for which the trigger was scheduled. - * - * @param timestamp The timestamp for which the trigger event was scheduled. - */ - void onProcessingTime(long timestamp) throws Exception; -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java index 96f5b48..bd551e8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java @@ -30,21 +30,8 @@ import java.util.concurrent.TimeUnit; * <p>The access to the time via {@link #getCurrentProcessingTime()} is always available, regardless * of whether the timer service has been shut down. */ -public interface ProcessingTimeService { - - /** Returns the current processing time. */ - long getCurrentProcessingTime(); - - /** - * Registers a task to be executed when (processing) time is {@code timestamp}. - * - * @param timestamp Time when the task is to be executed (in processing time) - * @param target The task to be executed - * @return The future that represents the scheduled task. This always returns some future, even - * if the timer was shut down - */ - ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target); - +public interface ProcessingTimeService + extends org.apache.flink.api.common.operators.ProcessingTimeService { /** * Registers a task to be executed repeatedly at a fixed rate. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 29d9d70..86fd373 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.AutoCloseableRegistry; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java index 3b344f7..e03ee80 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java @@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness; @@ -47,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java index b255847..1ada5d0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.io.network.api.StopMode; import org.apache.flink.runtime.operators.testutils.MockEnvironment; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index ead5efc..4a70de3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java index 7043290..572d7d9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.tasks; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java index 8ae817e..5a81bea 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java @@ -18,6 +18,7 @@ package org.apache.flink.table.runtime.operators.wmassigners; +import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.metrics.Gauge; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -25,7 +26,6 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.table.data.RowData; /** @@ -41,7 +41,8 @@ import org.apache.flink.table.data.RowData; * watermarks from upstream. */ public class ProcTimeMiniBatchAssignerOperator extends AbstractStreamOperator<RowData> - implements OneInputStreamOperator<RowData, RowData>, ProcessingTimeCallback { + implements OneInputStreamOperator<RowData, RowData>, + ProcessingTimeService.ProcessingTimeCallback { private static final long serialVersionUID = 1L; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java index 35084e3..90559a5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java @@ -25,7 +25,6 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import org.apache.flink.table.data.RowData; @@ -38,7 +37,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * watermarks. */ public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData> - implements OneInputStreamOperator<RowData, RowData>, ProcessingTimeCallback { + implements OneInputStreamOperator<RowData, RowData>, + org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback { private static final long serialVersionUID = 1L; diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java index 81a6758..226bcd7 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.operators.lifecycle.graph; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.runtime.operators.lifecycle.command.TestCommand; import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent; import org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent; @@ -37,7 +38,6 @@ import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.util.HashMap; diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java index e1b8d42..711047e 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.operators.lifecycle.graph; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.runtime.operators.lifecycle.command.TestCommand; import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent; import org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent; @@ -33,7 +34,6 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import java.util.HashMap; import java.util.Map; diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java index 995f524..67073b4 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.operators.lifecycle.graph; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.runtime.operators.lifecycle.command.TestCommand; import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent; import org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent; @@ -33,7 +34,6 @@ import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import java.util.HashMap; import java.util.Map; diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java index ca39c6b..ae77904 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.streaming.runtime; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -30,7 +31,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.TimerException; import org.apache.flink.test.util.AbstractTestBase; import org.apache.flink.util.ExceptionUtils;
