This is an automated email from the ASF dual-hosted git repository.
zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1d5b214eb68 [FLINK-36246] Move async state related operators
flink-runtime (#25306)
1d5b214eb68 is described below
commit 1d5b214eb681229bfb78f49416d78df100f887d4
Author: Zakelly <[email protected]>
AuthorDate: Wed Sep 11 13:46:36 2024 +0800
[FLINK-36246] Move async state related operators flink-runtime (#25306)
---
.../operators}/AbstractAsyncStateStreamOperator.java | 5 ++++-
.../operators}/AbstractAsyncStateStreamOperatorV2.java | 9 ++++++---
.../operators}/AbstractAsyncStateUdfStreamOperator.java | 2 +-
.../operators}/AbstractAsyncStateStreamOperatorTest.java | 7 ++++---
.../AbstractAsyncStateStreamOperatorV2Test.java | 16 ++++------------
5 files changed, 19 insertions(+), 20 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
similarity index 97%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
index 997aff389b9..f3432b43d23 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
@@ -42,6 +42,9 @@ import
org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
similarity index 96%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
index 3e7b74ab397..c54ecf81f56 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
@@ -40,6 +40,9 @@ import
org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -253,12 +256,12 @@ public abstract class
AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractSt
}
@VisibleForTesting
- AsyncExecutionController<?> getAsyncExecutionController() {
+ public AsyncExecutionController<?> getAsyncExecutionController() {
return asyncExecutionController;
}
@VisibleForTesting
- RecordContext getCurrentProcessingContext() {
+ public RecordContext getCurrentProcessingContext() {
return currentProcessingContext;
}
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateUdfStreamOperator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java
similarity index 98%
rename from
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateUdfStreamOperator.java
rename to
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java
index df981ec24ca..dbc5f56efe2 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateUdfStreamOperator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateUdfStreamOperator.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
similarity index 98%
rename from
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
rename to
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
index 6bee23273ec..58113e687ef 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.runtime.asyncprocessing.operators;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -36,6 +36,7 @@ import
org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
@@ -51,7 +52,7 @@ import static
org.apache.flink.runtime.state.StateBackendTestUtils.buildAsyncSta
import static org.assertj.core.api.Assertions.assertThat;
/** Basic tests for {@link AbstractAsyncStateStreamOperator}. */
-class AbstractAsyncStateStreamOperatorTest {
+public class AbstractAsyncStateStreamOperatorTest {
protected KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer,
String>, String>
createTestHarness(
@@ -329,7 +330,7 @@ class AbstractAsyncStateStreamOperatorTest {
}
/** {@link KeySelector} for tests. */
- static class TestKeySelector implements KeySelector<Tuple2<Integer,
String>, Integer> {
+ public static class TestKeySelector implements KeySelector<Tuple2<Integer,
String>, Integer> {
private static final long serialVersionUID = 1L;
@Override
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
similarity index 95%
rename from
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
rename to
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
index 75e56fd1d40..6615675907c 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperatorV2Test.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractAsyncStateStreamOperatorV2Test.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.operators.asyncprocessing;
+package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -25,6 +25,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
+import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorTest.TestKeySelector;
+import
org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateStreamOperatorV2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
@@ -32,18 +34,8 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
-import org.apache.flink.streaming.api.operators.AbstractInput;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.Input;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
-import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
-import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
-import
org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorTest.TestKeySelector;
+import
org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;