WencongLiu commented on code in PR #24422:
URL: https://github.com/apache/flink/pull/24422#discussion_r1523101423


##########
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/stream/BroadcastStream.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.api.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.functions.KeySelector;
+import 
org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
+
+/** This interface represents a stream that each parallel task processes the 
same data. */
+@Experimental
+public interface BroadcastStream<T> {
+    /**
+     * Apply a two input operation to this and other {@link 
KeyedPartitionStream}.
+     *
+     * @param other {@link KeyedPartitionStream} to perform operation with two 
input.
+     * @param processFunction to perform operation.
+     * @return new stream with this operation.
+     */
+    <K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
+            KeyedPartitionStream<K, T_OTHER> other,
+            TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> 
processFunction);
+
+    /**
+     * Apply a two input operation to this and other {@link 
NonKeyedPartitionStream}.
+     *
+     * @param other {@link NonKeyedPartitionStream} to perform operation with 
two input.
+     * @param processFunction to perform operation.
+     * @return new stream with this operation.
+     */
+    <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
+            NonKeyedPartitionStream<T_OTHER> other,
+            TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> 
processFunction);
+
+    /**
+     * Apply a two input operation to this and other {@link 
KeyedPartitionStream}.
+     *
+     * <p>This method is used to avoid shuffle after applying the process 
function. It is required
+     * that for the record from non-broadcast input, the new {@link 
KeySelector} must extract the
+     * same key as the original {@link KeySelector}s on the {@link 
KeyedPartitionStream}. For the
+     * record from broadcast input, the output key from keyed partition itself 
instead of new key
+     * selector, so it is safe already.

Review Comment:
   For `KeyedPartitionStream` and `BroadcastStream`,  we may need to explain 
more details on why we need a new key selector, why it avoid shuffle process, 
and the concept of `safe`.



##########
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/utils/StreamUtils.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl.utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.v2.Sink;
+import org.apache.flink.api.connector.v2.WrappedSink;
+import org.apache.flink.api.java.Utils;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
+import 
org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.process.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.process.api.function.TwoOutputStreamProcessFunction;
+import org.apache.flink.process.impl.stream.DataStream;
+import org.apache.flink.process.impl.stream.KeyedPartitionStreamImpl;
+import org.apache.flink.process.impl.stream.NonKeyedPartitionStreamImpl;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import 
org.apache.flink.streaming.api.transformations.ProcessFunctionSinkTransformation;
+import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+
+/** Utils to handle things like extract type information for DataStream. */

Review Comment:
   Currently, the StreamUtils utility class is responsible for too many things, 
including extracting type information, building transformations, creating new 
DataStreams, adding sink operators, among others. The Java comments for the 
class state that its purpose is to "handle things like extract type information 
for DataStream". We might need to specify in the Java comments which types of 
utility methods this class will provide to improve readability. 



##########
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/stream/KeyedPartitionStream.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.api.stream;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.connector.v2.Sink;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
+import 
org.apache.flink.process.api.function.TwoInputBroadcastStreamProcessFunction;
+import 
org.apache.flink.process.api.function.TwoInputNonBroadcastStreamProcessFunction;
+import org.apache.flink.process.api.function.TwoOutputStreamProcessFunction;
+import 
org.apache.flink.process.api.stream.NonKeyedPartitionStream.TwoNonKeyedPartitionStreams;
+
+/** This interface represents a stream that each parallel task processes the 
same data. */

Review Comment:
   The note of class shouldn't be same with `BroadcastStream`.



##########
flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/function/ProcessFunction.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.api.function;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.Function;
+
+/** Base class for all user defined process functions. */
+@Experimental
+public interface ProcessFunction extends Function {
+    /**
+     * Initialization method for the function. It is called before the actual 
working methods (like
+     * processRecord) and thus suitable for one time setup work.
+     *
+     * <p>By default, this method does nothing.
+     *
+     * @throws Exception Implementations may forward exceptions, which are 
caught by the runtime.
+     *     When the runtime catches an exception, it aborts the task and lets 
the fail-over logic
+     *     decide whether to retry the task execution.
+     */
+    default void open() throws Exception {}

Review Comment:
   `RichFunction` also has methods like `open(OpenContext openContext)`,` 
close()`, etc., that can sense the lifecycle of operators. In the open method 
of ProcessFunction, is it allowed to provide a parameter like `OpenContext`?



##########
flink-core-api/pom.xml:
##########
@@ -0,0 +1,60 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+       <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.20-SNAPSHOT</version>
+       </parent>
+
+       <artifactId>flink-core-api</artifactId>
+       <name>Flink : Core API</name>
+
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-annotations</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- publish some test base classes -->

Review Comment:
   Currently there doesn't exist a clear use case for sharing test classes 
among different modules. I would suggest we remove the test-jar configuration 
for now. We can always reintroduce it later if the need arises



##########
flink-process-function-parent/flink-process-function/src/test/java/org/apache/flink/process/impl/ExecutionEnvironmentImplTest.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl;
+
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.v2.SourceUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.process.api.ExecutionEnvironment;
+import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.process.impl.stream.StreamTestUtils;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ExecutionEnvironmentImpl}. */
+class ExecutionEnvironmentImplTest {

Review Comment:
   We need also test `setExecutionMode` and `execute(String jobName)`.



##########
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/operators/ProcessOperator.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl.operators;
+
+import org.apache.flink.process.api.function.OneInputStreamProcessFunction;
+import org.apache.flink.process.impl.common.OutputCollector;
+import org.apache.flink.process.impl.common.TimestampCollector;
+import org.apache.flink.process.impl.context.DefaultNonPartitionedContext;
+import org.apache.flink.process.impl.context.DefaultRuntimeContext;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/** Operator for {@link OneInputStreamProcessFunction}. */
+public class ProcessOperator<IN, OUT>
+        extends AbstractUdfStreamOperator<OUT, 
OneInputStreamProcessFunction<IN, OUT>>
+        implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {
+
+    protected transient DefaultRuntimeContext context;
+
+    protected transient DefaultNonPartitionedContext<OUT> 
nonPartitionedContext;
+
+    protected transient TimestampCollector<OUT> outputCollector;
+
+    public ProcessOperator(OneInputStreamProcessFunction<IN, OUT> 
userFunction) {
+        super(userFunction);
+
+        chainingStrategy = ChainingStrategy.ALWAYS;

Review Comment:
   The design of newly introduced `ProcessOperator` is similar with 
`org.apache.flink.streaming.api.operators.ProcessOperator`, which also set the 
`chainingStrategy` to `ALWAYS`. 
   
   Currently, the `StreamingJobGraphGenerator` will use 
`StreamOperatorFactory#getChainingStrategy()` 
   to determine whether to chain the two adjacent operators. 
   
   The operator factory of new`ProcessOperator` is 
`SimpleUdfStreamOperatorFactory`,  which extends the 
`AbstractStreamOperatorFactory`. The default implementation of method 
   `getChainingStrategy()` in `AbstractStreamOperatorFactory` will return 
   `ChainingStrategy.DEFAULT_CHAINING_STRATEGY`, which is same with `ALWAYS`. 
   
   Therefore, I think this initialization is unnecessary. WDYT?



##########
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.v2.FromDataSource;
+import org.apache.flink.api.connector.v2.Source;
+import org.apache.flink.api.connector.v2.WrappedSource;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.process.api.ExecutionEnvironment;
+import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.process.impl.stream.NonKeyedPartitionStreamImpl;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import 
org.apache.flink.streaming.runtime.translators.ProcessFunctionSinkTransformationTranslator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The implementation of {@link ExecutionEnvironment}. */
+public class ExecutionEnvironmentImpl implements ExecutionEnvironment {
+    private final List<Transformation<?>> transformations = new ArrayList<>();
+
+    private final ExecutionConfig executionConfig;
+
+    /** Settings that control the checkpointing behavior. */
+    private final CheckpointConfig checkpointCfg;
+
+    private final Configuration configuration;
+
+    private final ClassLoader userClassloader;
+
+    private final PipelineExecutorServiceLoader executorServiceLoader;
+
+    static {
+        try {
+            // All transformation translator must be put to a map in 
StreamGraphGenerator, but
+            // streaming-java is not depend on process-function module, using 
reflect to handle
+            // this.
+            
ProcessFunctionSinkTransformationTranslator.registerSinkTransformationTranslator();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Can not register process function transformation 
translator.");
+        }
+    }
+
+    /**
+     * The environment of the context (local by default, cluster if invoked 
through command line).
+     */
+    private static ExecutionEnvironmentFactory contextEnvironmentFactory = 
null;
+
+    public static ExecutionEnvironment newInstance() {
+        if (contextEnvironmentFactory != null) {
+            return contextEnvironmentFactory.createExecutionEnvironment(new 
Configuration());
+        } else {
+            final Configuration configuration = new Configuration();
+            configuration.set(DeploymentOptions.TARGET, "local");
+            configuration.set(DeploymentOptions.ATTACHED, true);
+            return new ExecutionEnvironmentImpl(
+                    new DefaultExecutorServiceLoader(), configuration, null);
+        }
+    }
+
+    ExecutionEnvironmentImpl(
+            PipelineExecutorServiceLoader executorServiceLoader,
+            Configuration configuration,
+            ClassLoader classLoader) {
+        this.executorServiceLoader = checkNotNull(executorServiceLoader);
+        this.configuration = configuration;
+        this.executionConfig = new ExecutionConfig(this.configuration);
+        this.checkpointCfg = new CheckpointConfig(this.configuration);
+        this.userClassloader = classLoader == null ? 
getClass().getClassLoader() : classLoader;
+        configure(configuration, userClassloader);
+    }
+
+    @Override
+    public void execute(String jobName) throws Exception {
+        StreamGraph streamGraph = getStreamGraph();
+        if (jobName != null) {
+            streamGraph.setJobName(jobName);
+        }
+
+        execute(streamGraph);
+    }
+
+    @Override
+    public ExecutionEnvironment setExecutionMode(RuntimeExecutionMode 
runtimeMode) {
+        checkNotNull(runtimeMode);
+        configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeMode);
+        return this;
+    }
+
+    protected static void 
initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
+        contextEnvironmentFactory = ctx;
+    }
+
+    protected static void resetContextEnvironment() {
+        contextEnvironmentFactory = null;
+    }
+
+    @Override
+    public <OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, 
String sourceName) {
+        if (source instanceof WrappedSource) {
+            org.apache.flink.api.connector.source.Source<OUT, ?, ?> 
innerSource =
+                    ((WrappedSource<OUT>) source).getWrappedSource();
+            final TypeInformation<OUT> resolvedTypeInfo =
+                    getSourceTypeInfo(innerSource, sourceName);
+
+            SourceTransformation<OUT, ?, ?> sourceTransformation =
+                    new SourceTransformation<>(
+                            sourceName,
+                            innerSource,
+                            WatermarkStrategy.noWatermarks(),
+                            resolvedTypeInfo,
+                            getParallelism(),
+                            false);
+            return new NonKeyedPartitionStreamImpl<>(this, 
sourceTransformation);
+        } else if (source instanceof FromDataSource) {
+            Collection<OUT> data = ((FromDataSource<OUT>) source).getData();
+            TypeInformation<OUT> outType = extractTypeInfoFromCollection(data);
+
+            FromElementsGeneratorFunction<OUT> generatorFunction =
+                    new FromElementsGeneratorFunction<>(outType, 
executionConfig, data);
+
+            DataGeneratorSource<OUT> generatorSource =
+                    new DataGeneratorSource<>(generatorFunction, data.size(), 
outType);
+
+            return fromSource(new WrappedSource<>(generatorSource), 
"Collection Source");
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported type of sink, you could use SourceUtils to 
wrap a FLIP-27 based source.");
+        }
+    }
+
+    public Configuration getConfiguration() {
+        return this.configuration;
+    }
+
+    public ExecutionConfig getExecutionConfig() {
+        return executionConfig;
+    }
+
+    public int getParallelism() {
+        return executionConfig.getParallelism();
+    }
+
+    public List<Transformation<?>> getTransformations() {
+        return transformations;
+    }
+
+    public void setParallelism(int parallelism) {
+        executionConfig.setParallelism(parallelism);
+    }
+
+    public CheckpointConfig getCheckpointCfg() {
+        return checkpointCfg;
+    }
+
+    // -----------------------------------------------
+    //              Internal Methods
+    // -----------------------------------------------
+
+    private static <OUT> TypeInformation<OUT> 
extractTypeInfoFromCollection(Collection<OUT> data) {
+        Preconditions.checkNotNull(data, "Collection must not be null");
+        if (data.isEmpty()) {
+            throw new IllegalArgumentException("Collection must not be empty");
+        }
+
+        OUT first = data.iterator().next();
+        if (first == null) {
+            throw new IllegalArgumentException("Collection must not contain 
null elements");
+        }
+
+        TypeInformation<OUT> typeInfo;
+        try {
+            typeInfo = TypeExtractor.getForObject(first);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not create TypeInformation for type "
+                            + first.getClass()
+                            + "; please specify the TypeInformation manually 
via the version of the "
+                            + "method that explicitly accepts it as an 
argument.",
+                    e);
+        }
+        return typeInfo;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <OUT, T extends TypeInformation<OUT>> T getSourceTypeInfo(
+            org.apache.flink.api.connector.source.Source<OUT, ?, ?> source, 
String sourceName) {
+        TypeInformation<OUT> resolvedTypeInfo = null;
+        if (source instanceof ResultTypeQueryable) {

Review Comment:
   We could use 
`org.apache.flink.api.java.typeutils.TypeExtractor#createTypeInfo(java.lang.Object,
 java.lang.Class<?>, java.lang.Class<?>, int)` to replace this `if-else` check.



##########
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java:
##########
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.process.impl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.v2.FromDataSource;
+import org.apache.flink.api.connector.v2.Source;
+import org.apache.flink.api.connector.v2.WrappedSource;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.process.api.ExecutionEnvironment;
+import org.apache.flink.process.api.stream.NonKeyedPartitionStream;
+import org.apache.flink.process.impl.stream.NonKeyedPartitionStreamImpl;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import 
org.apache.flink.streaming.runtime.translators.ProcessFunctionSinkTransformationTranslator;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The implementation of {@link ExecutionEnvironment}. */
+public class ExecutionEnvironmentImpl implements ExecutionEnvironment {
+    private final List<Transformation<?>> transformations = new ArrayList<>();
+
+    private final ExecutionConfig executionConfig;
+
+    /** Settings that control the checkpointing behavior. */
+    private final CheckpointConfig checkpointCfg;
+
+    private final Configuration configuration;
+
+    private final ClassLoader userClassloader;
+
+    private final PipelineExecutorServiceLoader executorServiceLoader;
+
+    static {
+        try {
+            // All transformation translator must be put to a map in 
StreamGraphGenerator, but
+            // streaming-java is not depend on process-function module, using 
reflect to handle
+            // this.
+            
ProcessFunctionSinkTransformationTranslator.registerSinkTransformationTranslator();
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Can not register process function transformation 
translator.");
+        }
+    }
+
+    /**
+     * The environment of the context (local by default, cluster if invoked 
through command line).
+     */
+    private static ExecutionEnvironmentFactory contextEnvironmentFactory = 
null;
+
+    public static ExecutionEnvironment newInstance() {
+        if (contextEnvironmentFactory != null) {
+            return contextEnvironmentFactory.createExecutionEnvironment(new 
Configuration());
+        } else {
+            final Configuration configuration = new Configuration();
+            configuration.set(DeploymentOptions.TARGET, "local");
+            configuration.set(DeploymentOptions.ATTACHED, true);
+            return new ExecutionEnvironmentImpl(
+                    new DefaultExecutorServiceLoader(), configuration, null);
+        }
+    }
+
+    ExecutionEnvironmentImpl(
+            PipelineExecutorServiceLoader executorServiceLoader,
+            Configuration configuration,
+            ClassLoader classLoader) {
+        this.executorServiceLoader = checkNotNull(executorServiceLoader);
+        this.configuration = configuration;
+        this.executionConfig = new ExecutionConfig(this.configuration);
+        this.checkpointCfg = new CheckpointConfig(this.configuration);
+        this.userClassloader = classLoader == null ? 
getClass().getClassLoader() : classLoader;
+        configure(configuration, userClassloader);
+    }
+
+    @Override
+    public void execute(String jobName) throws Exception {
+        StreamGraph streamGraph = getStreamGraph();
+        if (jobName != null) {
+            streamGraph.setJobName(jobName);
+        }
+
+        execute(streamGraph);
+    }
+
+    @Override
+    public ExecutionEnvironment setExecutionMode(RuntimeExecutionMode 
runtimeMode) {
+        checkNotNull(runtimeMode);
+        configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeMode);
+        return this;
+    }
+
+    protected static void 
initializeContextEnvironment(ExecutionEnvironmentFactory ctx) {
+        contextEnvironmentFactory = ctx;
+    }
+
+    protected static void resetContextEnvironment() {
+        contextEnvironmentFactory = null;
+    }
+
+    @Override
+    public <OUT> NonKeyedPartitionStream<OUT> fromSource(Source<OUT> source, 
String sourceName) {
+        if (source instanceof WrappedSource) {
+            org.apache.flink.api.connector.source.Source<OUT, ?, ?> 
innerSource =
+                    ((WrappedSource<OUT>) source).getWrappedSource();
+            final TypeInformation<OUT> resolvedTypeInfo =
+                    getSourceTypeInfo(innerSource, sourceName);
+
+            SourceTransformation<OUT, ?, ?> sourceTransformation =
+                    new SourceTransformation<>(
+                            sourceName,
+                            innerSource,
+                            WatermarkStrategy.noWatermarks(),
+                            resolvedTypeInfo,
+                            getParallelism(),
+                            false);
+            return new NonKeyedPartitionStreamImpl<>(this, 
sourceTransformation);
+        } else if (source instanceof FromDataSource) {
+            Collection<OUT> data = ((FromDataSource<OUT>) source).getData();
+            TypeInformation<OUT> outType = extractTypeInfoFromCollection(data);
+
+            FromElementsGeneratorFunction<OUT> generatorFunction =
+                    new FromElementsGeneratorFunction<>(outType, 
executionConfig, data);
+
+            DataGeneratorSource<OUT> generatorSource =
+                    new DataGeneratorSource<>(generatorFunction, data.size(), 
outType);
+
+            return fromSource(new WrappedSource<>(generatorSource), 
"Collection Source");
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unsupported type of sink, you could use SourceUtils to 
wrap a FLIP-27 based source.");
+        }
+    }
+
+    public Configuration getConfiguration() {
+        return this.configuration;
+    }
+
+    public ExecutionConfig getExecutionConfig() {
+        return executionConfig;
+    }
+
+    public int getParallelism() {
+        return executionConfig.getParallelism();
+    }
+
+    public List<Transformation<?>> getTransformations() {
+        return transformations;
+    }
+
+    public void setParallelism(int parallelism) {

Review Comment:
   This code change is not related to commit `[FLINK-34548][API] Supports 
sink-v2 Sink`, we could move it to the `[FLINK-34548][API] Introduce 
ExecutionEnvironment`.



##########
flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/streaming/api/transformations/ProcessFunctionSinkTransformation.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.process.impl.stream.DataStream;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+
+import org.apache.commons.compress.utils.Lists;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link Transformation} for sink-v2 in process function.
+ *
+ * @param <InputT> The input type of the {@link SinkWriter}
+ * @param <OutputT> The output type of the {@link Sink}
+ */
+@Internal
+public class ProcessFunctionSinkTransformation<InputT, OutputT>

Review Comment:
   Why do we need to introduce a new `ProcessFunctionSinkTransformation` rather 
than reuse `SinkTransformation` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to