This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be1e804f67124822e869689ebc4dc87f4b41c9f1
Author: Arvid Heise <[email protected]>
AuthorDate: Fri Dec 10 22:54:20 2021 +0100

    [FLINK-25569][core] Extract public facing ProcessingTimeService to 
flink-core
---
 .../kafka/internals/AbstractFetcher.java           |  2 +-
 .../kinesis/internals/KinesisDataFetcher.java      |  2 +-
 .../common/operators/ProcessingTimeService.java    | 57 ++++++++++++++++++++++
 .../runtime/NeverFireProcessingTimeService.java    |  1 -
 .../sink/filesystem/StreamingFileSinkHelper.java   |  2 +-
 .../api/operators/LatencyMarkerEmitter.java        |  2 +-
 .../api/operators/StreamSourceContexts.java        |  2 +-
 .../operators/TimestampsAndWatermarksOperator.java |  2 +-
 .../runtime/tasks/ProcessingTimeCallback.java      | 41 ----------------
 .../runtime/tasks/ProcessingTimeService.java       | 17 +------
 .../flink/streaming/runtime/tasks/StreamTask.java  |  1 +
 .../runtime/operators/StreamTaskTimerTest.java     |  2 +-
 .../runtime/tasks/StreamOperatorWrapperTest.java   |  1 +
 .../streaming/runtime/tasks/StreamTaskTest.java    |  1 +
 .../tasks/SystemProcessingTimeServiceTest.java     |  1 +
 .../ProcTimeMiniBatchAssignerOperator.java         |  5 +-
 .../wmassigners/WatermarkAssignerOperator.java     |  4 +-
 .../lifecycle/graph/MultiInputTestOperator.java    |  2 +-
 .../graph/OneInputTestStreamOperator.java          |  2 +-
 .../graph/TwoInputTestStreamOperator.java          |  2 +-
 .../streaming/runtime/StreamTaskTimerITCase.java   |  2 +-
 21 files changed, 79 insertions(+), 72 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index dda8971..d189b6d5 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -22,12 +22,12 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.eventtime.WatermarkOutput;
 import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 37b12c2..21b3736 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.streaming.connectors.kinesis.internals;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import 
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
@@ -52,7 +53,6 @@ import 
org.apache.flink.streaming.connectors.kinesis.util.RecordEmitter;
 import 
org.apache.flink.streaming.connectors.kinesis.util.StreamConsumerRegistrarUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.WatermarkTracker;
 import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/ProcessingTimeService.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ProcessingTimeService.java
new file mode 100644
index 0000000..65a97f7
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/ProcessingTimeService.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
+
+/**
+ * A service that allows to get the current processing time and register 
timers that will execute
+ * the given {@link ProcessingTimeCallback} when firing.
+ */
+@PublicEvolving
+public interface ProcessingTimeService {
+    /** Returns the current processing time. */
+    long getCurrentProcessingTime();
+
+    /**
+     * Registers a task to be executed when (processing) time is {@code 
timestamp}.
+     *
+     * @param timestamp Time when the task is to be executed (in processing 
time)
+     * @param target The task to be executed
+     * @return The future that represents the scheduled task. This always 
returns some future, even
+     *     if the timer was shut down
+     */
+    ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
target);
+
+    /**
+     * A callback that can be registered via {@link #registerTimer(long, 
ProcessingTimeCallback)}.
+     */
+    @PublicEvolving
+    interface ProcessingTimeCallback {
+        /**
+         * This method is invoked with the time which the callback register 
for.
+         *
+         * @param time The time this callback was registered for.
+         */
+        void onProcessingTime(long time) throws IOException, 
InterruptedException, Exception;
+    }
+}
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
index eb73aad..3f18605 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/NeverFireProcessingTimeService.java
@@ -18,7 +18,6 @@
 package org.apache.flink.state.api.runtime;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.TimerService;
 import org.apache.flink.util.concurrent.NeverCompleteFuture;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java
index 6c806ba..0e653a5 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSinkHelper.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
@@ -26,7 +27,6 @@ import 
org.apache.flink.api.common.typeutils.base.LongSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import javax.annotation.Nullable;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java
index cbd677a..36fb1f0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/LatencyMarkerEmitter.java
@@ -18,9 +18,9 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import java.util.concurrent.ScheduledFuture;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index af5b0c5..c57d730 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.FlinkRuntimeException;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
index b435938..a10c92e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java
@@ -28,9 +28,9 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
+import static 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
deleted file mode 100644
index 033e97f..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * Interface for processing-time callbacks that can be registered at a {@link
- * ProcessingTimeService}.
- */
-@Internal
-@FunctionalInterface
-public interface ProcessingTimeCallback {
-
-    /**
-     * This method is invoked with the timestamp for which the trigger was 
scheduled.
-     *
-     * <p>If the triggering is delayed for whatever reason (trigger timer was 
blocked, JVM stalled
-     * due to a garbage collection), the timestamp supplied to this function 
will still be the
-     * original timestamp for which the trigger was scheduled.
-     *
-     * @param timestamp The timestamp for which the trigger event was 
scheduled.
-     */
-    void onProcessingTime(long timestamp) throws Exception;
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 96f5b48..bd551e8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -30,21 +30,8 @@ import java.util.concurrent.TimeUnit;
  * <p>The access to the time via {@link #getCurrentProcessingTime()} is always 
available, regardless
  * of whether the timer service has been shut down.
  */
-public interface ProcessingTimeService {
-
-    /** Returns the current processing time. */
-    long getCurrentProcessingTime();
-
-    /**
-     * Registers a task to be executed when (processing) time is {@code 
timestamp}.
-     *
-     * @param timestamp Time when the task is to be executed (in processing 
time)
-     * @param target The task to be executed
-     * @return The future that represents the scheduled task. This always 
returns some future, even
-     *     if the timer was shut down
-     */
-    ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
target);
-
+public interface ProcessingTimeService
+        extends org.apache.flink.api.common.operators.ProcessingTimeService {
     /**
      * Registers a task to be executed repeatedly at a fixed rate.
      *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 29d9d70..86fd373 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.AutoCloseableRegistry;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 3b344f7..e03ee80 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
@@ -47,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
index b255847..1ada5d0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapperTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.io.network.api.StopMode;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index ead5efc..4a70de3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.tasks;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.MailboxExecutor;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index 7043290..572d7d9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
index 8ae817e..5a81bea 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/ProcTimeMiniBatchAssignerOperator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.runtime.operators.wmassigners;
 
+import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -25,7 +26,6 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.table.data.RowData;
 
 /**
@@ -41,7 +41,8 @@ import org.apache.flink.table.data.RowData;
  * watermarks from upstream.
  */
 public class ProcTimeMiniBatchAssignerOperator extends 
AbstractStreamOperator<RowData>
-        implements OneInputStreamOperator<RowData, RowData>, 
ProcessingTimeCallback {
+        implements OneInputStreamOperator<RowData, RowData>,
+                ProcessingTimeService.ProcessingTimeCallback {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
index 35084e3..90559a5 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.table.data.RowData;
@@ -38,7 +37,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * watermarks.
  */
 public class WatermarkAssignerOperator extends AbstractStreamOperator<RowData>
-        implements OneInputStreamOperator<RowData, RowData>, 
ProcessingTimeCallback {
+        implements OneInputStreamOperator<RowData, RowData>,
+                
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback
 {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java
index 81a6758..226bcd7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/MultiInputTestOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.operators.lifecycle.graph;
 
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
 import 
org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
 import 
org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent;
@@ -37,7 +38,6 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 
 import java.util.HashMap;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java
index e1b8d42..711047e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/OneInputTestStreamOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.operators.lifecycle.graph;
 
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
 import 
org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
 import 
org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent;
@@ -33,7 +34,6 @@ import 
org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java
index 995f524..67073b4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TwoInputTestStreamOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.operators.lifecycle.graph;
 
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
 import 
org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
 import 
org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent;
@@ -33,7 +34,6 @@ import 
org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 
 import java.util.HashMap;
 import java.util.Map;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index ca39c6b..ae77904 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.streaming.runtime;
 
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -30,7 +31,6 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.TimerException;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.ExceptionUtils;

Reply via email to