This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch process-func-api-poc-weijie in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e8d39a60578f1e1ddfdc5214aac33d3fe1497a4 Author: Xintong Song <[email protected]> AuthorDate: Tue May 30 20:03:51 2023 +0800 Introduce DataStream, as well as supplier-source & consumer-sink for testing purpose. --- .../org/apache/flink/processfunction/api/DataStream.java | 12 +++++------- .../flink/processfunction/api/ExecutionEnvironment.java | 7 ++++++- .../apache/flink/processfunction/examples/SimpleMap.java | 4 +++- .../{ExecutionEnvironmentImpl.java => DataStreamImpl.java} | 12 +++++------- .../flink/processfunction/ExecutionEnvironmentImpl.java | 13 +++++++++++-- 5 files changed, 30 insertions(+), 18 deletions(-) diff --git a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java similarity index 69% copy from flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java copy to flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java index 531b87e214e..775b95faee3 100644 --- a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java +++ b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/DataStream.java @@ -16,13 +16,11 @@ * limitations under the License. */ -package org.apache.flink.processfunction.examples; +package org.apache.flink.processfunction.api; -import org.apache.flink.processfunction.api.ExecutionEnvironment; +import java.util.function.Consumer; -/** Usage: Must be executed with flink-process-function jar in classpath. */ -public class SimpleMap { - public static void main(String[] args) throws Exception { - ExecutionEnvironment.getExecutionEnvironment().foo(); - } +public interface DataStream<T> { + /** TODO: Temporal method. Will revisit sink functions later. */ + void tmpToConsumerSink(Consumer<T> consumer); } diff --git a/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java index a13d60d9385..6335c9e33b8 100644 --- a/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java +++ b/flink-process-function-api/src/main/java/org/apache/flink/processfunction/api/ExecutionEnvironment.java @@ -18,6 +18,8 @@ package org.apache.flink.processfunction.api; +import java.util.function.Supplier; + public abstract class ExecutionEnvironment { public static ExecutionEnvironment getExecutionEnvironment() throws ReflectiveOperationException { @@ -27,5 +29,8 @@ public abstract class ExecutionEnvironment { .invoke(null); } - public abstract void foo(); + public abstract void execute(); + + /** TODO: Temporal method. Will revisit source functions later. */ + public abstract <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> supplier); } diff --git a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java b/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java index 531b87e214e..a9f7212ba88 100644 --- a/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java +++ b/flink-process-function-examples/src/main/java/org/apache/flink/processfunction/examples/SimpleMap.java @@ -23,6 +23,8 @@ import org.apache.flink.processfunction.api.ExecutionEnvironment; /** Usage: Must be executed with flink-process-function jar in classpath. */ public class SimpleMap { public static void main(String[] args) throws Exception { - ExecutionEnvironment.getExecutionEnvironment().foo(); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.tmpFromSupplierSource(System::currentTimeMillis).tmpToConsumerSink(System.out::println); + env.execute(); } } diff --git a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java b/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java similarity index 72% copy from flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java copy to flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java index 8ea8f33a093..83c679b00e5 100644 --- a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java +++ b/flink-process-function/src/main/java/org/apache/flink/processfunction/DataStreamImpl.java @@ -18,15 +18,13 @@ package org.apache.flink.processfunction; -import org.apache.flink.processfunction.api.ExecutionEnvironment; +import org.apache.flink.processfunction.api.DataStream; -public class ExecutionEnvironmentImpl extends ExecutionEnvironment { - public static ExecutionEnvironmentImpl newInstance() { - return new ExecutionEnvironmentImpl(); - } +import java.util.function.Consumer; +public class DataStreamImpl<T> implements DataStream<T> { @Override - public void foo() { - System.out.println(getClass().getSimpleName()); + public void tmpToConsumerSink(Consumer<T> consumer) { + // TODO: keep calling `consumer.accept()` at runtime } } diff --git a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java b/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java index 8ea8f33a093..ee366d47b9d 100644 --- a/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java +++ b/flink-process-function/src/main/java/org/apache/flink/processfunction/ExecutionEnvironmentImpl.java @@ -18,15 +18,24 @@ package org.apache.flink.processfunction; +import org.apache.flink.processfunction.api.DataStream; import org.apache.flink.processfunction.api.ExecutionEnvironment; +import java.util.function.Supplier; + public class ExecutionEnvironmentImpl extends ExecutionEnvironment { public static ExecutionEnvironmentImpl newInstance() { return new ExecutionEnvironmentImpl(); } @Override - public void foo() { - System.out.println(getClass().getSimpleName()); + public void execute() { + // TODO: submit job for execution + } + + @Override + public <OUT> DataStream<OUT> tmpFromSupplierSource(Supplier<OUT> supplier) { + // TODO: keep calling `supplier.get()` at runtime + return new DataStreamImpl<>(); } }
