xintongsong commented on code in PR #23905:
URL: https://github.com/apache/flink/pull/23905#discussion_r1433693565


##########
flink-core/src/main/java/org/apache/flink/api/common/TaskInfoImpl.java:
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** The default implementation of {@link TaskInfo}. */
+@PublicEvolving

Review Comment:
   The implementation class should be `@Internal`.



##########
flink-core/src/main/java/org/apache/flink/api/common/JobInfoImpl.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The default implementation of {@link JobInfo}. */
+@PublicEvolving

Review Comment:
   The implementation class should be `@Internal`.



##########
flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/batch/compact/BatchPartitionCommitterSinkTest.java:
##########
@@ -127,7 +127,7 @@ public Long timestamp() {
             };
 
     private static RuntimeContext getMockRuntimeContext() {
-        return new MockStreamingRuntimeContext(false, 0, 0) {
+        return new MockStreamingRuntimeContext(false, 1, 0) {

Review Comment:
   Unrelated change



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##########
@@ -68,8 +79,24 @@ public interface SourceReaderContext {
      * Get the current parallelism of this Source.
      *
      * @return the parallelism of the Source.
+     * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
+     *     provided uniformly by {@link #getTaskInfo()}.
+     * @see <a
+     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+     *     FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs </a>
      */
+    @Deprecated
     default int currentParallelism() {
         throw new UnsupportedOperationException();
     }
+
+    /**
+     * Get the meta information of current task.
+     *
+     * @return the task meta information.
+     */
+    @PublicEvolving
+    default TaskInfo getTaskInfo() {
+        return null;

Review Comment:
   Same here for the default implementation and Nullable.



##########
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java:
##########
@@ -63,14 +67,27 @@ public interface RuntimeContext {
     /**
      * The ID of the current job. Note that Job ID can change in particular 
upon manual restart. The
      * returned ID should NOT be used for any job management tasks.
+     *
+     * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the job should be
+     *     provided uniformly by {@link #getJobInfo()}.
+     * @see <a
+     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+     *     FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs </a>
      */
+    @Deprecated
     JobID getJobId();

Review Comment:
   Actually, we can provide default implementations for these deprecated 
methods. E.g., `JobID getJobId() { return getJobInfo().getJobId(); }`, so that 
we can remove the duplicated method implementations for the concrete classes.



##########
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java:
##########
@@ -110,38 +111,41 @@ public void testCepRuntimeContext() {
         final String taskName = "foobarTask";
         final OperatorMetricGroup metricGroup =
                 UnregisteredMetricsGroup.createOperatorMetricGroup();
-        final int numberOfParallelSubtasks = 42;
-        final int indexOfSubtask = 43;
+        final int numberOfParallelSubtasks = 43;
+        final int indexOfSubtask = 42;
         final int attemptNumber = 1337;
-        final String taskNameWithSubtask = "barfoo";
+        final String taskNameWithSubtask = "foobarTask (43/43)#1337";

Review Comment:
   How are these changes related to this commit?



##########
flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java:
##########
@@ -473,4 +520,26 @@ <IN, ACC, OUT> AggregatingState<IN, OUT> 
getAggregatingState(
      */
     @PublicEvolving
     <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> 
stateProperties);
+
+    /**
+     * Get the meta information of current job.
+     *
+     * @return the job meta information.
+     */
+    @PublicEvolving
+    @Nullable
+    default JobInfo getJobInfo() {
+        return null;
+    }
+
+    /**
+     * Get the meta information of current task.
+     *
+     * @return the task meta information.
+     */
+    @PublicEvolving
+    @Nullable
+    default TaskInfo getTaskInfo() {
+        return null;
+    }

Review Comment:
   Why provide default implementation for these methods? And why these are 
`@Nullable`? What does it mean when jobinfo / taskinfo is null?



##########
flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java:
##########
@@ -39,7 +41,16 @@ public interface SourceReaderContext {
      */
     String getLocalHostName();
 
-    /** @return The index of this subtask. */
+    /**
+     * Get the index of this subtask.
+     *
+     * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
+     *     provided uniformly by {@link #getTaskInfo()}.
+     * @see <a
+     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+     *     FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs </a>
+     */
+    @Deprecated
     int getIndexOfSubtask();

Review Comment:
   Same here for the deprecated methods.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java:
##########
@@ -119,7 +120,14 @@ public interface Environment {
     Configuration getJobConfiguration();
 
     /**
-     * Returns the {@link TaskInfo} object associated with this subtask
+     * Returns the {@link JobInfo} object associated with current job.
+     *
+     * @return JobInfo for current job
+     */
+    JobInfo getJobInfo();

Review Comment:
   This does not match the commit message "Unify the provision of metadata in 
RuntimeContext"



##########
flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java:
##########
@@ -42,6 +43,7 @@
 import java.util.List;
 import java.util.concurrent.Future;
 
+/** The test for flat map operator. */

Review Comment:
   Such code-style clean-ups can be placed in a separated hotfix commit.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##########
@@ -166,7 +169,10 @@ public class Task
     /** ID which identifies the slot in which the task is supposed to run. */
     private final AllocationID allocationId;
 
-    /** TaskInfo object for this task. */
+    /** The meta information of current job. */
+    private final JobInfo jobInfo;

Review Comment:
   Nit: This seems in the wrong commit.



##########
flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java:
##########
@@ -57,6 +83,34 @@ public interface InitContext {
     /**
      * The ID of the current job. Note that Job ID can change in particular 
upon manual restart. The
      * returned ID should NOT be used for any job management tasks.
+     *
+     * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the job should be
+     *     provided uniformly by {@link #getJobInfo()}.
+     * @see <a
+     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+     *     FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs </a>
      */
     JobID getJobId();
+
+    /**
+     * Get the meta information of current job.
+     *
+     * @return the job meta information.
+     */
+    @PublicEvolving
+    @Nullable
+    default JobInfo getJobInfo() {
+        return null;
+    }
+
+    /**
+     * Get the meta information of current task.
+     *
+     * @return the task meta information.
+     */
+    @PublicEvolving
+    @Nullable
+    default TaskInfo getTaskInfo() {
+        return null;
+    }

Review Comment:
   Same here for default and nullable.



##########
flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java:
##########
@@ -124,5 +140,16 @@ enum FailureType {
          * @return the Executor pool
          */
         Executor getIOExecutor();
+
+        /**
+         * Get the meta information of current job.
+         *
+         * @return the job meta information.
+         */
+        @PublicEvolving
+        @Nullable
+        default JobInfo getJobInfo() {

Review Comment:
   Same here for default and nullable.



##########
flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java:
##########
@@ -84,14 +88,26 @@ enum FailureType {
          * Get the ID of the job.
          *
          * @return the ID of the job
+         * @deprecated This method is deprecated since Flink 1.19. All 
metadata about the job should
+         *     be provided uniformly by {@link #getJobInfo()}.
+         * @see <a
+         *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+         *     FLIP-382: Unify the Provision of Diverse Metadata for 
Context-like APIs </a>
          */
+        @Deprecated

Review Comment:
   Same here for the deprecated methods.



##########
flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java:
##########
@@ -35,16 +40,37 @@ public interface InitContext {
      */
     long INITIAL_CHECKPOINT_ID = 1;
 
-    /** @return The id of task where the committer is running. */
+    /**
+     * Get the id of task where the committer is running.
+     *
+     * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
+     *     provided uniformly by {@link #getTaskInfo()}.
+     * @see <a
+     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
+     *     FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs </a>
+     */
     int getSubtaskId();

Review Comment:
   1. Aren't we deprecating these methods?
   2. Same for the default implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to