xintongsong commented on code in PR #24422: URL: https://github.com/apache/flink/pull/24422#discussion_r1533200528
########## flink-process-function-parent/pom.xml: ########## @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-parent</artifactId> Review Comment: 1. Why do we need this parent module? 2. I'm not sure about naming / prefixing the modules with `flink-process-function`. The feature is called DataStream API V2. I don't think `flink-process-function` properly reflects what the modules are about. ########## pom.xml: ########## @@ -2366,6 +2366,7 @@ under the License. <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)</exclude> <exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude> <exclude>org.apache.flink.api.common.functions.RuntimeContext</exclude> + <exclude>org.apache.flink.api.common.functions.Function</exclude> Review Comment: I'd suggest to create jira tickets for tracking: 1. Manually verifying whether there are any breaking changes to the excluded APIs before releasing 1.20. 2. Remove the exclusion after 1.20. ########## flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentFactory.java: ########## @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.process.impl; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.process.api.ExecutionEnvironment; + +/** Factory class for execution environments. */ +@FunctionalInterface +@Experimental Review Comment: Why is this `@Experimental`? Is this a public interface at all? ########## flink-process-function-parent/flink-process-function-api/src/main/java/org/apache/flink/process/api/ExecutionEnvironment.java: ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.process.api; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.RuntimeExecutionMode; + +/** + * This is the context in which a program is executed. + * + * <p>The environment provides methods to create a DataStream and control the job execution. + */ +@Experimental +public interface ExecutionEnvironment { + /** + * Get the execution environment instance. + * + * @return A {@link ExecutionEnvironment} instance. + */ + static ExecutionEnvironment getExecutionEnvironment() throws ReflectiveOperationException { Review Comment: Imagine how this is used. `ExecutionEnvironment.getExecutionEnvironment()` might be a bit redundant. I'd suggest `getInstance()` or `getEnv()`. ########## flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; Review Comment: The package `*.connector.v2` is confusing. I'd suggest `*.connector.dsv2`. ########## flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/stream/DataStream.java: ########## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.process.impl.stream; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.process.impl.ExecutionEnvironmentImpl; +import org.apache.flink.streaming.api.transformations.SideOutputTransformation; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; + +import java.util.HashMap; +import java.util.Map; + +/** + * Base class for all streams. + * + * <p>Note: This is only used for internal implementation. It must not leak to user face api. + */ +@Internal Review Comment: Why do we need this annotation? Shouldn't everything in this non-api module be internal by default? ########## flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +/** Utils to convert a FLIP-27 based source to a DataStream v2 Source. */ +@Experimental +public final class SourceUtils { Review Comment: I'd suggest the name `DataStreamV2SourceUtils`. ########## flink-process-function-parent/flink-process-function/src/main/java/org/apache/flink/process/impl/ExecutionEnvironmentImpl.java: ########## @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.process.impl; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.core.execution.DefaultExecutorServiceLoader; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.PipelineExecutor; +import org.apache.flink.core.execution.PipelineExecutorFactory; +import org.apache.flink.core.execution.PipelineExecutorServiceLoader; +import org.apache.flink.process.api.ExecutionEnvironment; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamGraphGenerator; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The implementation of {@link ExecutionEnvironment}. */ Review Comment: It should be documented that the package, class name and signature of `newInstance()` must not be changed, because reflection is used in the api module. ########## flink-core/src/main/java/org/apache/flink/api/connector/v2/SourceUtils.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; + +/** Utils to convert a FLIP-27 based source to a DataStream v2 Source. */ +@Experimental +public final class SourceUtils { + public static <T> Source<T> wrapSource( Review Comment: Need JavaDoc ########## flink-core/src/main/java/org/apache/flink/api/connector/v2/SinkUtils.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Experimental; + +/** Utils to convert the sink-v2 based sink to a DataStream v2 Source. */ +@Experimental +public class SinkUtils { + public static <T> Sink<T> wrapSink(org.apache.flink.api.connector.sink2.Sink<T> sink) { Review Comment: JavaDoc ########## flink-core/src/main/java/org/apache/flink/api/connector/v2/SinkUtils.java: ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.connector.v2; + +import org.apache.flink.annotation.Experimental; + +/** Utils to convert the sink-v2 based sink to a DataStream v2 Source. */ +@Experimental +public class SinkUtils { Review Comment: `DataStreamV2SinkUtils`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
