xintongsong commented on code in PR #23905: URL: https://github.com/apache/flink/pull/23905#discussion_r1433693565
########## flink-core/src/main/java/org/apache/flink/api/common/TaskInfoImpl.java: ########## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The default implementation of {@link TaskInfo}. */ +@PublicEvolving Review Comment: The implementation class should be `@Internal`. ########## flink-core/src/main/java/org/apache/flink/api/common/JobInfoImpl.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The default implementation of {@link JobInfo}. */ +@PublicEvolving Review Comment: The implementation class should be `@Internal`. ########## flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java: ########## @@ -127,7 +127,7 @@ public Long timestamp() { }; private static RuntimeContext getMockRuntimeContext() { - return new MockStreamingRuntimeContext(false, 0, 0) { + return new MockStreamingRuntimeContext(false, 1, 0) { Review Comment: Unrelated change ########## flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java: ########## @@ -68,8 +79,24 @@ public interface SourceReaderContext { * Get the current parallelism of this Source. * * @return the parallelism of the Source. + * @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be + * provided uniformly by {@link #getTaskInfo()}. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs"> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a> */ + @Deprecated default int currentParallelism() { throw new UnsupportedOperationException(); } + + /** + * Get the meta information of current task. + * + * @return the task meta information. + */ + @PublicEvolving + default TaskInfo getTaskInfo() { + return null; Review Comment: Same here for the default implementation and Nullable. ########## flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java: ########## @@ -63,14 +67,27 @@ public interface RuntimeContext { /** * The ID of the current job. Note that Job ID can change in particular upon manual restart. The * returned ID should NOT be used for any job management tasks. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should be + * provided uniformly by {@link #getJobInfo()}. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs"> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a> */ + @Deprecated JobID getJobId(); Review Comment: Actually, we can provide default implementations for these deprecated methods. E.g., `JobID getJobId() { return getJobInfo().getJobId(); }`, so that we can remove the duplicated method implementations for the concrete classes. ########## flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java: ########## @@ -110,38 +111,41 @@ public void testCepRuntimeContext() { final String taskName = "foobarTask"; final OperatorMetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup(); - final int numberOfParallelSubtasks = 42; - final int indexOfSubtask = 43; + final int numberOfParallelSubtasks = 43; + final int indexOfSubtask = 42; final int attemptNumber = 1337; - final String taskNameWithSubtask = "barfoo"; + final String taskNameWithSubtask = "foobarTask (43/43)#1337"; Review Comment: How are these changes related to this commit? ########## flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java: ########## @@ -473,4 +520,26 @@ <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState( */ @PublicEvolving <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); + + /** + * Get the meta information of current job. + * + * @return the job meta information. + */ + @PublicEvolving + @Nullable + default JobInfo getJobInfo() { + return null; + } + + /** + * Get the meta information of current task. + * + * @return the task meta information. + */ + @PublicEvolving + @Nullable + default TaskInfo getTaskInfo() { + return null; + } Review Comment: Why provide default implementation for these methods? And why these are `@Nullable`? What does it mean when jobinfo / taskinfo is null? ########## flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java: ########## @@ -39,7 +41,16 @@ public interface SourceReaderContext { */ String getLocalHostName(); - /** @return The index of this subtask. */ + /** + * Get the index of this subtask. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be + * provided uniformly by {@link #getTaskInfo()}. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs"> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a> + */ + @Deprecated int getIndexOfSubtask(); Review Comment: Same here for the deprecated methods. ########## flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java: ########## @@ -119,7 +120,14 @@ public interface Environment { Configuration getJobConfiguration(); /** - * Returns the {@link TaskInfo} object associated with this subtask + * Returns the {@link JobInfo} object associated with current job. + * + * @return JobInfo for current job + */ + JobInfo getJobInfo(); Review Comment: This does not match the commit message "Unify the provision of metadata in RuntimeContext" ########## flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java: ########## @@ -42,6 +43,7 @@ import java.util.List; import java.util.concurrent.Future; +/** The test for flat map operator. */ Review Comment: Such code-style clean-ups can be placed in a separated hotfix commit. ########## flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java: ########## @@ -166,7 +169,10 @@ public class Task /** ID which identifies the slot in which the task is supposed to run. */ private final AllocationID allocationId; - /** TaskInfo object for this task. */ + /** The meta information of current job. */ + private final JobInfo jobInfo; Review Comment: Nit: This seems in the wrong commit. ########## flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java: ########## @@ -57,6 +83,34 @@ public interface InitContext { /** * The ID of the current job. Note that Job ID can change in particular upon manual restart. The * returned ID should NOT be used for any job management tasks. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should be + * provided uniformly by {@link #getJobInfo()}. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs"> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a> */ JobID getJobId(); + + /** + * Get the meta information of current job. + * + * @return the job meta information. + */ + @PublicEvolving + @Nullable + default JobInfo getJobInfo() { + return null; + } + + /** + * Get the meta information of current task. + * + * @return the task meta information. + */ + @PublicEvolving + @Nullable + default TaskInfo getTaskInfo() { + return null; + } Review Comment: Same here for default and nullable. ########## flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java: ########## @@ -124,5 +140,16 @@ enum FailureType { * @return the Executor pool */ Executor getIOExecutor(); + + /** + * Get the meta information of current job. + * + * @return the job meta information. + */ + @PublicEvolving + @Nullable + default JobInfo getJobInfo() { Review Comment: Same here for default and nullable. ########## flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java: ########## @@ -84,14 +88,26 @@ enum FailureType { * Get the ID of the job. * * @return the ID of the job + * @deprecated This method is deprecated since Flink 1.19. All metadata about the job should + * be provided uniformly by {@link #getJobInfo()}. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs"> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a> */ + @Deprecated Review Comment: Same here for the deprecated methods. ########## flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java: ########## @@ -35,16 +40,37 @@ public interface InitContext { */ long INITIAL_CHECKPOINT_ID = 1; - /** @return The id of task where the committer is running. */ + /** + * Get the id of task where the committer is running. + * + * @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be + * provided uniformly by {@link #getTaskInfo()}. + * @see <a + * href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs"> + * FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a> + */ int getSubtaskId(); Review Comment: 1. Aren't we deprecating these methods? 2. Same for the default implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
