This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit 242e9e57af79b7d59dd232b51b2cce5048f870cd Author: Weijie Guo <[email protected]> AuthorDate: Wed May 31 15:52:08 2023 +0800 Using ConsumerFunction and SupplierFunction to avoid serialization exception and support lambda type inference. --- .../pom.xml | 21 +++++++++++++-------- .../apache/flink/api/common/functions/Function.java | 20 ++++++++++---------- .../flink/util/function/ConsumerFunction.java | 17 +++++++++-------- .../flink/util/function/SupplierFunction.java | 19 ++++++++++--------- flink-core/pom.xml | 2 +- .../flink-process-function-api/pom.xml | 8 ++++++++ .../flink/processfunction/api/DataStream.java | 4 ++-- .../processfunction/api/ExecutionEnvironment.java | 4 ++-- .../flink/processfunction/examples/SimpleMap.java | 4 +++- .../flink/processfunction/DataStreamImpl.java | 5 ++--- .../processfunction/ExecutionEnvironmentImpl.java | 4 ++-- pom.xml | 1 + 12 files changed, 63 insertions(+), 46 deletions(-) diff --git a/flink-process-function-parent/flink-process-function-api/pom.xml b/flink-core-api/pom.xml similarity index 75% copy from flink-process-function-parent/flink-process-function-api/pom.xml copy to flink-core-api/pom.xml index 1812f311ca4..d597c50bc1f 100644 --- a/flink-process-function-parent/flink-process-function-api/pom.xml +++ b/flink-core-api/pom.xml @@ -20,21 +20,26 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> <parent> + <artifactId>flink-parent</artifactId> <groupId>org.apache.flink</groupId> - <artifactId>flink-process-function-parent</artifactId> <version>1.18-SNAPSHOT</version> </parent> + <modelVersion>4.0.0</modelVersion> - <artifactId>flink-process-function-api</artifactId> - - <packaging>jar</packaging> + <artifactId>flink-core-api</artifactId> + <name>Flink : Core API </name> <properties> - <maven.compiler.source>11</maven.compiler.source> - <maven.compiler.target>11</maven.compiler.target> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-annotations</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> </project> diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-core-api/src/main/java/org/apache/flink/api/common/functions/Function.java similarity index 67% copy from flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java copy to flink-core-api/src/main/java/org/apache/flink/api/common/functions/Function.java index 83c679b00e5..9069752dba2 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/functions/Function.java @@ -16,15 +16,15 @@ * limitations under the License. */ -package org.apache.flink.processfunction; +package org.apache.flink.api.common.functions; -import org.apache.flink.processfunction.api.DataStream; +import org.apache.flink.annotation.Public; -import java.util.function.Consumer; - -public class DataStreamImpl<T> implements DataStream<T> { - @Override - public void tmpToConsumerSink(Consumer<T> consumer) { - // TODO: keep calling `consumer.accept()` at runtime - } -} +/** + * The base interface for all user-defined functions. + * + * <p>This interface is empty in order to allow extending interfaces to be SAM (single abstract + * method) interfaces that can be implemented via Java 8 lambdas. + */ +@Public +public interface Function extends java.io.Serializable {} diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerFunction.java similarity index 71% copy from flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java copy to flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerFunction.java index 83c679b00e5..5ba5bbfe0dd 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerFunction.java @@ -16,15 +16,16 @@ * limitations under the License. */ -package org.apache.flink.processfunction; +package org.apache.flink.util.function; -import org.apache.flink.processfunction.api.DataStream; +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.Function; import java.util.function.Consumer; -public class DataStreamImpl<T> implements DataStream<T> { - @Override - public void tmpToConsumerSink(Consumer<T> consumer) { - // TODO: keep calling `consumer.accept()` at runtime - } -} +/** + * A {@link Consumer} but implements {@link Function} to support serialization and type inference. + */ +@Public +@FunctionalInterface +public interface ConsumerFunction<T> extends Consumer<T>, Function {} diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-core-api/src/main/java/org/apache/flink/util/function/SupplierFunction.java similarity index 68% copy from flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java copy to flink-core-api/src/main/java/org/apache/flink/util/function/SupplierFunction.java index 83c679b00e5..303994594b6 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-core-api/src/main/java/org/apache/flink/util/function/SupplierFunction.java @@ -16,15 +16,16 @@ * limitations under the License. */ -package org.apache.flink.processfunction; +package org.apache.flink.util.function; -import org.apache.flink.processfunction.api.DataStream; +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.functions.Function; -import java.util.function.Consumer; +import java.util.function.Supplier; -public class DataStreamImpl<T> implements DataStream<T> { - @Override - public void tmpToConsumerSink(Consumer<T> consumer) { - // TODO: keep calling `consumer.accept()` at runtime - } -} +/** + * A {@link Supplier} but implements {@link Function} to support serialization and type inference. + */ +@Public +@FunctionalInterface +public interface SupplierFunction<T> extends Supplier<T>, Function {} diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 5ba7b349639..3eb9d22900b 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -36,7 +36,7 @@ under the License. <dependencies> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-annotations</artifactId> + <artifactId>flink-core-api</artifactId> <version>${project.version}</version> </dependency> diff --git a/flink-process-function-parent/flink-process-function-api/pom.xml b/flink-process-function-parent/flink-process-function-api/pom.xml index 1812f311ca4..01b7166cbc7 100644 --- a/flink-process-function-parent/flink-process-function-api/pom.xml +++ b/flink-process-function-parent/flink-process-function-api/pom.xml @@ -37,4 +37,12 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core-api</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </project> diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java index 775b95faee3..70a0a5aa040 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java @@ -18,9 +18,9 @@ package org.apache.flink.processfunction.api; -import java.util.function.Consumer; +import org.apache.flink.util.function.ConsumerFunction; public interface DataStream<T> { /** TODO: Temporal method. Will revisit sink functions later. */ - void tmpToConsumerSink(Consumer<T> consumer); + void tmpToConsumerSink(ConsumerFunction<T> consumer); } diff --git a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java index 36d7180d2c9..8f4d39427ce 100644 --- a/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java +++ b/flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java @@ -18,7 +18,7 @@ package org.apache.flink.processfunction.api; -import java.util.function.Supplier; +import org.apache.flink.util.function.SupplierFunction; public abstract class ExecutionEnvironment { public static ExecutionEnvironment getExecutionEnvironment() @@ -32,5 +32,5 @@ public abstract class ExecutionEnvironment { public abstract void execute() throws Exception; /** TODO: Temporal method. Will revisit source functions later. */ - public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> supplier); + public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(SupplierFunction<OUT> supplier); } diff --git a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java index a9f7212ba88..172b62c2853 100644 --- a/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java +++ b/flink-process-function-parent/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java @@ -24,7 +24,9 @@ import org.apache.flink.processfunction.api.ExecutionEnvironment; public class SimpleMap { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - env.tmpFromSupplierSource(System::currentTimeMillis).tmpToConsumerSink(System.out::println); + env.tmpFromSupplierSource(System::currentTimeMillis) + // Don't use Lambda reference as PrintStream is not serializable. + .tmpToConsumerSink((data) -> System.out.println(data)); env.execute(); } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java index 83c679b00e5..72cb6d276f5 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java @@ -19,12 +19,11 @@ package org.apache.flink.processfunction; import org.apache.flink.processfunction.api.DataStream; - -import java.util.function.Consumer; +import org.apache.flink.util.function.ConsumerFunction; public class DataStreamImpl<T> implements DataStream<T> { @Override - public void tmpToConsumerSink(Consumer<T> consumer) { + public void tmpToConsumerSink(ConsumerFunction<T> consumer) { // TODO: keep calling `consumer.accept()` at runtime } } diff --git a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java index c5554f23fe4..be0401df86c 100644 --- a/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java +++ b/flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java @@ -33,12 +33,12 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.function.SupplierFunction; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; import static org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -63,7 +63,7 @@ public class ExecutionEnvironmentImpl extends ExecutionEnvironment { } @Override - public <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> supplier) { + public <OUT> DataStream<OUT> tmpFromSupplierSource(SupplierFunction<OUT> supplier) { // TODO: keep calling `supplier.get()` at runtime return new DataStreamImpl<>(); } diff --git a/pom.xml b/pom.xml index 3a7ba4f4d65..73862a26b22 100644 --- a/pom.xml +++ b/pom.xml @@ -66,6 +66,7 @@ under the License. <module>flink-annotations</module> <module>flink-architecture-tests</module> <module>flink-core</module> + <module>flink-core-api</module> <module>flink-java</module> <module>flink-scala</module> <module>flink-filesystems</module>
