This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6d04280adc3883a69da21bfd4857ee012a5896f
Author: Xintong Song <[email protected]>
AuthorDate: Sun Jun 4 12:29:55 2023 +0800

    Introduce interfaces for declaring and using states.
---
 .../flink/processfunction/api/ProcessFunction.java |  8 +++
 .../flink/processfunction/api/RuntimeContext.java  |  4 +-
 .../api/{RuntimeContext.java => State.java}        |  2 +-
 .../{RuntimeContext.java => StateDescriptor.java}  |  2 +-
 .../examples/SimpleStatefulMap.java                | 61 ++++++++++++++++++++++
 5 files changed, 74 insertions(+), 3 deletions(-)

diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
index bbaa8c4cad2..796305fadb2 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ProcessFunction.java
@@ -18,7 +18,15 @@
 
 package org.apache.flink.processfunction.api;
 
+import java.util.Collections;
+import java.util.Map;
+
 @FunctionalInterface
 public interface ProcessFunction<IN, OUT> {
     OUT processRecord(IN record, RuntimeContext ctx);
+
+    // Explicitly declares states upfront. See FLIP-22.
+    default Map<String, StateDescriptor> usesStates() { // stateId -> 
stateDescriptor
+        return Collections.emptyMap();
+    }
 }
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
index 5a26ea9196a..908eeb8367a 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
@@ -18,4 +18,6 @@
 
 package org.apache.flink.processfunction.api;
 
-public interface RuntimeContext {}
+public interface RuntimeContext {
+    State getState(String stateId);
+}
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java
similarity index 96%
copy from 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
copy to 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java
index 5a26ea9196a..3fc48ab137d 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/State.java
@@ -18,4 +18,4 @@
 
 package org.apache.flink.processfunction.api;
 
-public interface RuntimeContext {}
+public interface State {}
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java
similarity index 95%
copy from 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
copy to 
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java
index 5a26ea9196a..838de57d9c0 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/RuntimeContext.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/StateDescriptor.java
@@ -18,4 +18,4 @@
 
 package org.apache.flink.processfunction.api;
 
-public interface RuntimeContext {}
+public interface StateDescriptor {}
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
new file mode 100644
index 00000000000..df759e8eb45
--- /dev/null
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleStatefulMap.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.processfunction.examples;
+
+import org.apache.flink.processfunction.api.ExecutionEnvironment;
+import org.apache.flink.processfunction.api.ProcessFunction;
+import org.apache.flink.processfunction.api.RuntimeContext;
+import org.apache.flink.processfunction.api.State;
+import org.apache.flink.processfunction.api.StateDescriptor;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Usage: Must be executed with flink-process-function and flink-dist jar in 
classpath. */
+public class SimpleStatefulMap {
+    public static void main(String[] args) throws Exception {
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.tmpFromSupplierSource(System::currentTimeMillis)
+                .process(new CalcTimeDiffFunc())
+                // Don't use Lambda reference as PrintStream is not 
serializable.
+                .tmpToConsumerSink(
+                        (timeDiff) -> System.out.println("%d milliseconds 
since last timestamp."));
+        env.execute();
+    }
+
+    private static class CalcTimeDiffFunc implements ProcessFunction<Long, 
Long> {
+        static final String STATE_ID = "lastTimestamp";
+
+        @Override
+        public Long processRecord(Long record, RuntimeContext ctx) {
+            State state = ctx.getState(STATE_ID);
+            // TODO:
+            //  long diff = record - state.getValue();
+            //  state.setValue(record)
+            //  return diff;
+            return record;
+        }
+
+        @Override
+        public Map<String, StateDescriptor> usesStates() {
+            // TODO: state descriptor for type LONG
+            return Collections.singletonMap(STATE_ID, null);
+        }
+    }
+}

Reply via email to