This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 43ca07e726c40c296ddc349eab87ab98a9926303
Author: Weijie Guo <[email protected]>
AuthorDate: Tue Jun 6 21:02:53 2023 +0800

    ProcessOperator supports operator list state.
---
 .../flink/api/common/state/AppendingState.java     |  0
 .../apache/flink/api/common/state/ListState.java   |  0
 .../api/common/state/ListStateDeclaration.java     | 30 +++++++++++--------
 .../flink/api/common/state/MergingState.java       |  0
 .../org/apache/flink/api/common/state/State.java   |  0
 .../flink/api/common/state/StateDeclaration.java   |  5 ++--
 .../{State.java => StateDeclarationConverter.java} | 27 ++++++++---------
 .../flink/processfunction/api/ProcessFunction.java |  5 ++--
 .../flink/processfunction/api/RuntimeContext.java  |  4 ++-
 .../flink/processfunction/api/StateDescriptor.java | 21 -------------
 .../examples/SimpleStatefulMap.java                | 35 ++++++++++++++--------
 .../processfunction/operators/ProcessOperator.java | 30 +++++++++++++++----
 12 files changed, 86 insertions(+), 71 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/AppendingState.java
similarity index 100%
rename from 
flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
rename to 
flink-core-api/src/main/java/org/apache/flink/api/common/state/AppendingState.java
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListState.java
similarity index 100%
rename from 
flink-core/src/main/java/org/apache/flink/api/common/state/ListState.java
rename to 
flink-core-api/src/main/java/org/apache/flink/api/common/state/ListState.java
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java
similarity index 60%
copy from flink-core/src/main/java/org/apache/flink/api/common/state/State.java
copy to 
flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java
index bd5211df618..a9ee9f219f0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java
@@ -18,19 +18,23 @@
 
 package org.apache.flink.api.common.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
 
-/**
- * Interface that different types of partitioned state must implement.
- *
- * <p>The state is only accessible by functions applied on a {@code 
KeyedStream}. The key is
- * automatically supplied by the system, so the function always sees the value 
mapped to the key of
- * the current element. That way, the system can handle stream and state 
partitioning consistently
- * together.
- */
-@PublicEvolving
-public interface State {
+public class ListStateDeclaration<T> implements StateDeclaration {
+    private final String name;
+
+    private final TypeDescriptor<T> elementTypeDescriptor;
+
+    public ListStateDeclaration(String name, TypeDescriptor<T> 
elementTypeDescriptor) {
+        this.name = name;
+        this.elementTypeDescriptor = elementTypeDescriptor;
+    }
+
+    public String getName() {
+        return name;
+    }
 
-    /** Removes the value mapped under the current key. */
-    void clear();
+    public TypeDescriptor<T> getElementTypeDescriptor() {
+        return elementTypeDescriptor;
+    }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MergingState.java
similarity index 100%
rename from 
flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
rename to 
flink-core-api/src/main/java/org/apache/flink/api/common/state/MergingState.java
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/State.java
similarity index 100%
copy from flink-core/src/main/java/org/apache/flink/api/common/state/State.java
copy to 
flink-core-api/src/main/java/org/apache/flink/api/common/state/State.java
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java
similarity index 88%
rename from 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java
rename to 
flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java
index 3fc48ab137d..5070f2ac8f6 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.processfunction.api;
+package org.apache.flink.api.common.state;
 
-public interface State {}
+/** Declaration for state. */
+public interface StateDeclaration {}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDeclarationConverter.java
similarity index 55%
rename from 
flink-core/src/main/java/org/apache/flink/api/common/state/State.java
rename to 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDeclarationConverter.java
index bd5211df618..363c21ec619 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/State.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDeclarationConverter.java
@@ -18,19 +18,18 @@
 
 package org.apache.flink.api.common.state;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.TypeInformationUtils;
 
-/**
- * Interface that different types of partitioned state must implement.
- *
- * <p>The state is only accessible by functions applied on a {@code 
KeyedStream}. The key is
- * automatically supplied by the system, so the function always sees the value 
mapped to the key of
- * the current element. That way, the system can handle stream and state 
partitioning consistently
- * together.
- */
-@PublicEvolving
-public interface State {
-
-    /** Removes the value mapped under the current key. */
-    void clear();
+/** Utils to convert {@link StateDeclaration} to {@link StateDeclaration}. */
+public class StateDeclarationConverter {
+    public static <T> ListStateDescriptor<T> getListStateDescriptor(
+            ListStateDeclaration<T> stateDeclaration) {
+        //noinspection unchecked
+        return new ListStateDescriptor<>(
+                stateDeclaration.getName(),
+                (TypeInformation<T>)
+                        TypeInformationUtils.fromTypeDescriptor(
+                                stateDeclaration.getElementTypeDescriptor()));
+    }
 }
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
index 55919cd8307..f83c7967137 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.processfunction.api;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.state.StateDeclaration;
 
 import java.util.Collections;
 import java.util.Map;
@@ -26,10 +27,10 @@ import java.util.function.Consumer;
 
 @FunctionalInterface
 public interface ProcessFunction<IN, OUT> extends Function {
-    void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx);
+    void processRecord(IN record, Consumer<OUT> output, RuntimeContext ctx) 
throws Exception;
 
     // Explicitly declares states upfront. See FLIP-22.
-    default Map<String, StateDescriptor> usesStates() { // stateId -> 
stateDescriptor
+    default Map<String, StateDeclaration> usesStates() { // stateId -> 
stateDeclaration
         return Collections.emptyMap();
     }
 }
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
index 908eeb8367a..f5db74d87fa 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.processfunction.api;
 
+import org.apache.flink.api.common.state.ListState;
+
 public interface RuntimeContext {
-    State getState(String stateId);
+    <T> ListState<T> getListState(String stateId) throws Exception;
 }
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java
deleted file mode 100644
index 838de57d9c0..00000000000
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.processfunction.api;
-
-public interface StateDescriptor {}
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
index 5f849dbf8e4..86097640448 100644
--- 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.processfunction.examples;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
 import org.apache.flink.processfunction.api.ExecutionEnvironment;
 import org.apache.flink.processfunction.api.ProcessFunction;
 import org.apache.flink.processfunction.api.RuntimeContext;
-import org.apache.flink.processfunction.api.State;
-import org.apache.flink.processfunction.api.StateDescriptor;
 
 import java.util.Collections;
 import java.util.Map;
@@ -36,26 +38,35 @@ public class SimpleStatefulMap {
                 .process(new CalcTimeDiffFunc())
                 // Don't use Lambda reference as PrintStream is not 
serializable.
                 .tmpToConsumerSink(
-                        (timeDiff) -> System.out.println("%d milliseconds 
since last timestamp."));
+                        (timeDiff) ->
+                                System.out.printf(
+                                        "%d milliseconds since last timestamp. 
\n", timeDiff));
         env.execute();
     }
 
     private static class CalcTimeDiffFunc implements ProcessFunction<Long, 
Long> {
+
         static final String STATE_ID = "lastTimestamp";
 
         @Override
-        public void processRecord(Long record, Consumer<Long> output, 
RuntimeContext ctx) {
-            State state = ctx.getState(STATE_ID);
-            // TODO:
-            //  long diff = record - state.getValue();
-            //  state.setValue(record)
-            //  output.accept(diff);
+        public void processRecord(Long record, Consumer<Long> output, 
RuntimeContext ctx)
+                throws Exception {
+            ListState<Long> state = ctx.getListState(STATE_ID);
+            if (!state.get().iterator().hasNext()) {
+                // for first record
+                output.accept(0L);
+            } else {
+                long diff = record - state.get().iterator().next();
+                output.accept(diff);
+            }
+            state.update(Collections.singletonList(record));
         }
 
         @Override
-        public Map<String, StateDescriptor> usesStates() {
-            // TODO: state descriptor for type LONG
-            return Collections.singletonMap(STATE_ID, null);
+        public Map<String, StateDeclaration> usesStates() {
+            ListStateDeclaration<Long> listStateDeclaration =
+                    new ListStateDeclaration<>(STATE_ID, TypeDescriptors.LONG);
+            return Collections.singletonMap(STATE_ID, listStateDeclaration);
         }
     }
 }
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java
index 70700ede5eb..693e9078337 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/operators/ProcessOperator.java
@@ -18,14 +18,20 @@
 
 package org.apache.flink.processfunction.operators;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDeclaration;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.StateDeclaration;
+import org.apache.flink.api.common.state.StateDeclarationConverter;
 import org.apache.flink.processfunction.api.ProcessFunction;
 import org.apache.flink.processfunction.api.RuntimeContext;
-import org.apache.flink.processfunction.api.State;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
 
+import java.util.Map;
 import java.util.function.Consumer;
 
 /** Operator for {@link org.apache.flink.processfunction.api.ProcessFunction}. 
*/
@@ -51,7 +57,7 @@ public class ProcessOperator<IN, OUT>
     }
 
     @Override
-    public void processElement(StreamRecord<IN> element) {
+    public void processElement(StreamRecord<IN> element) throws Exception {
         userFunction.processRecord(element.getValue(), outputCollector, 
context);
     }
 
@@ -65,12 +71,24 @@ public class ProcessOperator<IN, OUT>
         }
     }
 
-    private static class ContextImpl implements RuntimeContext {
+    private class ContextImpl implements RuntimeContext {
+        private final Map<String, StateDeclaration> allRegisteredStates;
+
+        private ContextImpl() {
+            allRegisteredStates = userFunction.usesStates();
+        }
 
         @Override
-        public State getState(String stateId) {
-            // TODO return state.
-            return null;
+        public <T> ListState<T> getListState(String stateId) throws Exception {
+            StateDeclaration stateDeclaration = 
allRegisteredStates.get(stateId);
+            Preconditions.checkNotNull(stateDeclaration, "No list state with 
id: " + stateId);
+            Preconditions.checkState(stateDeclaration instanceof 
ListStateDeclaration);
+
+            //noinspection unchecked
+            ListStateDescriptor<T> listStateDescriptor =
+                    StateDeclarationConverter.getListStateDescriptor(
+                            (ListStateDeclaration<T>) stateDeclaration);
+            return getOperatorStateBackend().getListState(listStateDescriptor);
         }
     }
 }

Reply via email to