This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch process-func-api-poc-weijie
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 242e9e57af79b7d59dd232b51b2cce5048f870cd
Author: Weijie Guo <[email protected]>
AuthorDate: Wed May 31 15:52:08 2023 +0800

    Using ConsumerFunction and SupplierFunction to avoid serialization 
exception and support lambda type inference.
---
 .../pom.xml                                         | 21 +++++++++++++--------
 .../apache/flink/api/common/functions/Function.java | 20 ++++++++++----------
 .../flink/util/function/ConsumerFunction.java       | 17 +++++++++--------
 .../flink/util/function/SupplierFunction.java       | 19 ++++++++++---------
 flink-core/pom.xml                                  |  2 +-
 .../flink-process-function-api/pom.xml              |  8 ++++++++
 .../flink/processfunction/api/DataStream.java       |  4 ++--
 .../processfunction/api/ExecutionEnvironment.java   |  4 ++--
 .../flink/processfunction/examples/SimpleMap.java   |  4 +++-
 .../flink/processfunction/DataStreamImpl.java       |  5 ++---
 .../processfunction/ExecutionEnvironmentImpl.java   |  4 ++--
 pom.xml                                             |  1 +
 12 files changed, 63 insertions(+), 46 deletions(-)

diff --git a/flink-process-function-parent/flink-process-function-api/pom.xml 
b/flink-core-api/pom.xml
similarity index 75%
copy from flink-process-function-parent/flink-process-function-api/pom.xml
copy to flink-core-api/pom.xml
index 1812f311ca4..d597c50bc1f 100644
--- a/flink-process-function-parent/flink-process-function-api/pom.xml
+++ b/flink-core-api/pom.xml
@@ -20,21 +20,26 @@
 <project xmlns="http://maven.apache.org/POM/4.0.0";
                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
                 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-       <modelVersion>4.0.0</modelVersion>
        <parent>
+               <artifactId>flink-parent</artifactId>
                <groupId>org.apache.flink</groupId>
-               <artifactId>flink-process-function-parent</artifactId>
                <version>1.18-SNAPSHOT</version>
        </parent>
+       <modelVersion>4.0.0</modelVersion>
 
-       <artifactId>flink-process-function-api</artifactId>
-
-       <packaging>jar</packaging>
+       <artifactId>flink-core-api</artifactId>
+       <name>Flink : Core API </name>
 
        <properties>
-               <maven.compiler.source>11</maven.compiler.source>
-               <maven.compiler.target>11</maven.compiler.target>
-               
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+               <maven.compiler.source>8</maven.compiler.source>
+               <maven.compiler.target>8</maven.compiler.target>
        </properties>
 
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-annotations</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
 </project>
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/functions/Function.java
similarity index 67%
copy from 
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
copy to 
flink-core-api/src/main/java/org/apache/flink/api/common/functions/Function.java
index 83c679b00e5..9069752dba2 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/functions/Function.java
@@ -16,15 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.flink.processfunction;
+package org.apache.flink.api.common.functions;
 
-import org.apache.flink.processfunction.api.DataStream;
+import org.apache.flink.annotation.Public;
 
-import java.util.function.Consumer;
-
-public class DataStreamImpl<T> implements DataStream<T> {
-    @Override
-    public void tmpToConsumerSink(Consumer<T> consumer) {
-        // TODO: keep calling `consumer.accept()` at runtime
-    }
-}
+/**
+ * The base interface for all user-defined functions.
+ *
+ * <p>This interface is empty in order to allow extending interfaces to be SAM 
(single abstract
+ * method) interfaces that can be implemented via Java 8 lambdas.
+ */
+@Public
+public interface Function extends java.io.Serializable {}
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerFunction.java
similarity index 71%
copy from 
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
copy to 
flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerFunction.java
index 83c679b00e5..5ba5bbfe0dd 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerFunction.java
@@ -16,15 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.processfunction;
+package org.apache.flink.util.function;
 
-import org.apache.flink.processfunction.api.DataStream;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.Function;
 
 import java.util.function.Consumer;
 
-public class DataStreamImpl<T> implements DataStream<T> {
-    @Override
-    public void tmpToConsumerSink(Consumer<T> consumer) {
-        // TODO: keep calling `consumer.accept()` at runtime
-    }
-}
+/**
+ * A {@link Consumer} but implements {@link Function} to support serialization 
and type inference.
+ */
+@Public
+@FunctionalInterface
+public interface ConsumerFunction<T> extends Consumer<T>, Function {}
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-core-api/src/main/java/org/apache/flink/util/function/SupplierFunction.java
similarity index 68%
copy from 
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
copy to 
flink-core-api/src/main/java/org/apache/flink/util/function/SupplierFunction.java
index 83c679b00e5..303994594b6 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-core-api/src/main/java/org/apache/flink/util/function/SupplierFunction.java
@@ -16,15 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.processfunction;
+package org.apache.flink.util.function;
 
-import org.apache.flink.processfunction.api.DataStream;
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.Function;
 
-import java.util.function.Consumer;
+import java.util.function.Supplier;
 
-public class DataStreamImpl<T> implements DataStream<T> {
-    @Override
-    public void tmpToConsumerSink(Consumer<T> consumer) {
-        // TODO: keep calling `consumer.accept()` at runtime
-    }
-}
+/**
+ * A {@link Supplier} but implements {@link Function} to support serialization 
and type inference.
+ */
+@Public
+@FunctionalInterface
+public interface SupplierFunction<T> extends Supplier<T>, Function {}
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index 5ba7b349639..3eb9d22900b 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -36,7 +36,7 @@ under the License.
        <dependencies>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-annotations</artifactId>
+                       <artifactId>flink-core-api</artifactId>
                        <version>${project.version}</version>
                </dependency>
 
diff --git a/flink-process-function-parent/flink-process-function-api/pom.xml 
b/flink-process-function-parent/flink-process-function-api/pom.xml
index 1812f311ca4..01b7166cbc7 100644
--- a/flink-process-function-parent/flink-process-function-api/pom.xml
+++ b/flink-process-function-parent/flink-process-function-api/pom.xml
@@ -37,4 +37,12 @@
                
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
 
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-core-api</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
 </project>
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
index 775b95faee3..70a0a5aa040 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.processfunction.api;
 
-import java.util.function.Consumer;
+import org.apache.flink.util.function.ConsumerFunction;
 
 public interface DataStream<T> {
     /** TODO: Temporal method. Will revisit sink functions later. */
-    void tmpToConsumerSink(Consumer<T> consumer);
+    void tmpToConsumerSink(ConsumerFunction<T> consumer);
 }
diff --git 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
index 36d7180d2c9..8f4d39427ce 100644
--- 
a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
+++ 
b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.processfunction.api;
 
-import java.util.function.Supplier;
+import org.apache.flink.util.function.SupplierFunction;
 
 public abstract class ExecutionEnvironment {
     public static ExecutionEnvironment getExecutionEnvironment()
@@ -32,5 +32,5 @@ public abstract class ExecutionEnvironment {
     public abstract void execute() throws Exception;
 
     /** TODO: Temporal method. Will revisit source functions later. */
-    public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> 
supplier);
+    public abstract <OUT> DataStream<OUT> 
tmpFromSupplierSource(SupplierFunction<OUT> supplier);
 }
diff --git 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
index a9f7212ba88..172b62c2853 100644
--- 
a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
+++ 
b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java
@@ -24,7 +24,9 @@ import 
org.apache.flink.processfunction.api.ExecutionEnvironment;
 public class SimpleMap {
     public static void main(String[] args) throws Exception {
         ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-        
env.tmpFromSupplierSource(System::currentTimeMillis).tmpToConsumerSink(System.out::println);
+        env.tmpFromSupplierSource(System::currentTimeMillis)
+                // Don't use Lambda reference as PrintStream is not 
serializable.
+                .tmpToConsumerSink((data) -> System.out.println(data));
         env.execute();
     }
 }
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
index 83c679b00e5..72cb6d276f5 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java
@@ -19,12 +19,11 @@
 package org.apache.flink.processfunction;
 
 import org.apache.flink.processfunction.api.DataStream;
-
-import java.util.function.Consumer;
+import org.apache.flink.util.function.ConsumerFunction;
 
 public class DataStreamImpl<T> implements DataStream<T> {
     @Override
-    public void tmpToConsumerSink(Consumer<T> consumer) {
+    public void tmpToConsumerSink(ConsumerFunction<T> consumer) {
         // TODO: keep calling `consumer.accept()` at runtime
     }
 }
diff --git 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
index c5554f23fe4..be0401df86c 100644
--- 
a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
+++ 
b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java
@@ -33,12 +33,12 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.SupplierFunction;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
 
 import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -63,7 +63,7 @@ public class ExecutionEnvironmentImpl extends 
ExecutionEnvironment {
     }
 
     @Override
-    public <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> supplier) 
{
+    public <OUT> DataStream<OUT> tmpFromSupplierSource(SupplierFunction<OUT> 
supplier) {
         // TODO: keep calling `supplier.get()` at runtime
         return new DataStreamImpl<>();
     }
diff --git a/pom.xml b/pom.xml
index 3a7ba4f4d65..73862a26b22 100644
--- a/pom.xml
+++ b/pom.xml
@@ -66,6 +66,7 @@ under the License.
                <module>flink-annotations</module>
                <module>flink-architecture-tests</module>
                <module>flink-core</module>
+               <module>flink-core-api</module>
                <module>flink-java</module>
                <module>flink-scala</module>
                <module>flink-filesystems</module>

Reply via email to