This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5b214eb68 [FLINK-36246] Move async state related operators 
flink-runtime (#25306)
1d5b214eb68 is described below

commit 1d5b214eb681229bfb78f49416d78df100f887d4
Author: Zakelly <[email protected]>
AuthorDate: Wed Sep 11 13:46:36 2024 +0800

    [FLINK-36246] Move async state related operators flink-runtime (#25306)
---
 .../operators}/AbstractAsyncStateStreamOperator.java     |  5 ++++-
 .../operators}/AbstractAsyncStateStreamOperatorV2.java   |  9 ++++++---
 .../operators}/AbstractAsyncStateUdfStreamOperator.java  |  2 +-
 .../operators}/AbstractAsyncStateStreamOperatorTest.java |  7 ++++---
 .../AbstractAsyncStateStreamOperatorV2Test.java          | 16 ++++------------
 5 files changed, 19 insertions(+), 20 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
similarity index 97%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index 997aff389b9..f3432b43d23 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -42,6 +42,9 @@ import 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
similarity index 96%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index 3e7b74ab397..c54ecf81f56 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -40,6 +40,9 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -253,12 +256,12 @@ public abstract class 
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
     }
 
     @VisibleForTesting
-    AsyncExecutionController<?> getAsyncExecutionController() {
+    public AsyncExecutionController<?> getAsyncExecutionController() {
         return asyncExecutionController;
     }
 
     @VisibleForTesting
-    RecordContext getCurrentProcessingContext() {
+    public RecordContext getCurrentProcessingContext() {
         return currentProcessingContext;
     }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateUdfStreamOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java
similarity index 98%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateUdfStreamOperator.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java
index df981ec24ca..dbc5f56efe2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateUdfStreamOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
similarity index 98%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
rename to 
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
index 6bee23273ec..58113e687ef 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -51,7 +52,7 @@ import static 
org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncSta
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Basic tests for {@link AbstractAsyncStateStreamOperator}. */
-class AbstractAsyncStateStreamOperatorTest {
+public class AbstractAsyncStateStreamOperatorTest {
 
     protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, 
String>, String>
             createTestHarness(
@@ -329,7 +330,7 @@ class AbstractAsyncStateStreamOperatorTest {
     }
 
     /** {@link KeySelector} for tests. */
-    static class TestKeySelector implements KeySelector<Tuple2<Integer, 
String>, Integer> {
+    public static class TestKeySelector implements KeySelector<Tuple2<Integer, 
String>, Integer> {
         private static final long serialVersionUID = 1L;
 
         @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
similarity index 95%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
index 75e56fd1d40..6615675907c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
 import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.TestKeySelector;
+import 
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorV2;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -32,18 +34,8 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
-import org.apache.flink.streaming.api.operators.AbstractInput;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
-import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
-import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorTest.TestKeySelector;
+import 
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.util.function.ThrowingConsumer;

Reply via email to