This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8d4e98503fd [FLINK-36225][core] Remove deprecated method marked in
FLIP-382
8d4e98503fd is described below
commit 8d4e98503fd36faa6046731d329e42cd95d5e507
Author: Weijie Guo <[email protected]>
AuthorDate: Fri Sep 13 11:40:57 2024 +0800
[FLINK-36225][core] Remove deprecated method marked in FLIP-382
---
.../docs/dev/datastream/dataset_migration.md | 2 +-
.../docs/dev/datastream/dataset_migration.md | 2 +-
.../flink/api/common/functions/RuntimeContext.java | 111 ---------------------
.../flink/api/connector/sink2/InitContext.java | 59 -----------
.../org/apache/flink/api/connector/sink2/Sink.java | 21 ----
.../apache/flink/core/failure/FailureEnricher.java | 31 ------
flink-end-to-end-tests/run-nightly-tests.sh | 4 +-
.../planner/codegen/LongHashJoinGenerator.scala | 2 +-
.../fusion/spec/HashJoinFusionCodegenSpec.scala | 2 +-
.../flink/test/scheduling/JMFailoverITCase.java | 8 +-
pom.xml | 11 ++
11 files changed, 22 insertions(+), 231 deletions(-)
diff --git a/docs/content.zh/docs/dev/datastream/dataset_migration.md
b/docs/content.zh/docs/dev/datastream/dataset_migration.md
index c10441fe7df..1de2818d5ff 100644
--- a/docs/content.zh/docs/dev/datastream/dataset_migration.md
+++ b/docs/content.zh/docs/dev/datastream/dataset_migration.md
@@ -761,7 +761,7 @@ public class EndOfStreamWindows extends
WindowAssigner<Object, TimeWindow> {
public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T,
Tuple2<String, T>> {
@Override
public Tuple2<String, T> map(T value) {
- return
Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
+ return
Tuple2.of(String.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()),
value);
}
}
```
diff --git a/docs/content/docs/dev/datastream/dataset_migration.md
b/docs/content/docs/dev/datastream/dataset_migration.md
index 25d1cb8f9a2..98e653d8189 100644
--- a/docs/content/docs/dev/datastream/dataset_migration.md
+++ b/docs/content/docs/dev/datastream/dataset_migration.md
@@ -766,7 +766,7 @@ The following code shows the example of
`AddSubtaskIDMapFunction`.
public static class AddSubtaskIDMapFunction<T> extends RichMapFunction<T,
Tuple2<String, T>> {
@Override
public Tuple2<String, T> map(T value) {
- return
Tuple2.of(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()), value);
+ return
Tuple2.of(String.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()),
value);
}
}
```
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 66970e2440d..eb94f65a2f8 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.common.functions;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
@@ -60,37 +59,6 @@ import java.util.Set;
*/
@Public
public interface RuntimeContext {
-
- /**
- * The ID of the current job. Note that Job ID can change in particular
upon manual restart. The
- * returned ID should NOT be used for any job management tasks.
- *
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the job should be
- * provided uniformly by {@link #getJobInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default JobID getJobId() {
- return getJobInfo().getJobId();
- }
-
- /**
- * Returns the name of the task in which the UDF runs, as assigned during
plan construction.
- *
- * @return The name of the task in which the UDF runs.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default String getTaskName() {
- return getTaskInfo().getTaskName();
- }
-
/**
* Returns the metric group for this parallel subtask.
*
@@ -99,85 +67,6 @@ public interface RuntimeContext {
@PublicEvolving
OperatorMetricGroup getMetricGroup();
- /**
- * Gets the parallelism with which the parallel task runs.
- *
- * @return The parallelism with which the parallel task runs.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default int getNumberOfParallelSubtasks() {
- return getTaskInfo().getNumberOfParallelSubtasks();
- }
-
- /**
- * Gets the number of max-parallelism with which the parallel task runs.
- *
- * @return The max-parallelism with which the parallel task runs.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- @PublicEvolving
- default int getMaxNumberOfParallelSubtasks() {
- return getTaskInfo().getMaxNumberOfParallelSubtasks();
- }
-
- /**
- * Gets the number of this parallel subtask. The numbering starts from 0
and goes up to
- * parallelism-1.
- *
- * @return The index of the parallel subtask.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default int getIndexOfThisSubtask() {
- return getTaskInfo().getIndexOfThisSubtask();
- }
-
- /**
- * Gets the attempt number of this parallel subtask. First attempt is
numbered 0.
- *
- * @return Attempt number of the subtask.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default int getAttemptNumber() {
- return getTaskInfo().getAttemptNumber();
- }
-
- /**
- * Returns the name of the task, appended with the subtask indicator, such
as "MyTask (3/6)#1",
- * where 3 would be (task index + 1), and 6 would be task parallelism, and
1 would be attempt
- * number.
- *
- * @return The name of the task, with subtask indicator.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default String getTaskNameWithSubtasks() {
- return getTaskInfo().getTaskNameWithSubtasks();
- }
-
/**
* Create a serializer for a given type.
*
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java
index ae637d6456e..36c812723c5 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/InitContext.java
@@ -20,7 +20,6 @@ package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
@@ -38,70 +37,12 @@ public interface InitContext {
*/
long INITIAL_CHECKPOINT_ID = 1;
- /**
- * Get the id of task where the committer is running.
- *
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default int getSubtaskId() {
- return getTaskInfo().getIndexOfThisSubtask();
- }
-
- /**
- * Get the number of parallel committer tasks.
- *
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default int getNumberOfParallelSubtasks() {
- return getTaskInfo().getNumberOfParallelSubtasks();
- }
-
- /**
- * Gets the attempt number of this parallel subtask. First attempt is
numbered 0.
- *
- * @return Attempt number of the subtask.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default int getAttemptNumber() {
- return getTaskInfo().getAttemptNumber();
- }
-
/**
* Returns id of the restored checkpoint, if state was restored from the
snapshot of a previous
* execution.
*/
OptionalLong getRestoredCheckpointId();
- /**
- * The ID of the current job. Note that Job ID can change in particular
upon manual restart. The
- * returned ID should NOT be used for any job management tasks.
- *
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the job should be
- * provided uniformly by {@link #getJobInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default JobID getJobId() {
- return getJobInfo().getJobId();
- }
-
/**
* Get the meta information of current job.
*
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
index f870eb8852f..9979111efdf 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.connector.sink2;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
@@ -149,31 +148,11 @@ public interface Sink<InputT> extends Serializable {
this.wrapped = wrapped;
}
- @Override
- public int getSubtaskId() {
- return wrapped.getSubtaskId();
- }
-
- @Override
- public int getNumberOfParallelSubtasks() {
- return wrapped.getNumberOfParallelSubtasks();
- }
-
- @Override
- public int getAttemptNumber() {
- return wrapped.getAttemptNumber();
- }
-
@Override
public OptionalLong getRestoredCheckpointId() {
return wrapped.getRestoredCheckpointId();
}
- @Override
- public JobID getJobId() {
- return wrapped.getJobId();
- }
-
@Override
public JobInfo getJobInfo() {
return wrapped.getJobInfo();
diff --git
a/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java
b/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java
index e142a12dc5f..b5676cdc96c 100644
---
a/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java
+++
b/flink-core/src/main/java/org/apache/flink/core/failure/FailureEnricher.java
@@ -20,7 +20,6 @@ package org.apache.flink.core.failure;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.metrics.MetricGroup;
@@ -82,36 +81,6 @@ public interface FailureEnricher {
TASK_MANAGER
}
- /**
- * Get the ID of the job.
- *
- * @return the ID of the job
- * @deprecated This method is deprecated since Flink 1.19. All
metadata about the job should
- * be provided uniformly by {@link #getJobInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for
Context-like APIs </a>
- */
- @Deprecated
- default JobID getJobId() {
- return getJobInfo().getJobId();
- }
-
- /**
- * Get the name of the job.
- *
- * @return the name of the job
- * @deprecated This method is deprecated since Flink 1.19. All
metadata about the job should
- * be provided uniformly by {@link #getJobInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for
Context-like APIs </a>
- */
- @Deprecated
- default String getJobName() {
- return getJobInfo().getJobName();
- }
-
/**
* Get the metric group of the JobMaster.
*
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh
b/flink-end-to-end-tests/run-nightly-tests.sh
index 9e7ab2bf56f..92f0eeced1c 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -197,7 +197,9 @@ function run_group_2 {
run_test "Walkthrough DataStream Java nightly end-to-end test"
"$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh java"
- run_test "Avro Confluent Schema Registry nightly end-to-end test"
"$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
+ # Disable this test as 2.0 broke the compatibility of kafka sink writer.
We should consider migrate this test to flink-connector-kafka repo.
+ # See FLINK-36268.
+ # run_test "Avro Confluent Schema Registry nightly end-to-end test"
"$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
run_test "State TTL Heap backend end-to-end test"
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh hashmap"
"skip_check_exceptions"
run_test "State TTL RocksDb backend end-to-end test"
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
"skip_check_exceptions"
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
index 25045de6900..fdc7ac8036b 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/LongHashJoinGenerator.scala
@@ -206,7 +206,7 @@ object LongHashJoinGenerator {
| computeMemorySize(),
| getContainingTask().getEnvironment().getIOManager(),
| $buildRowSize,
- | ${buildRowCount}L /
getRuntimeContext().getNumberOfParallelSubtasks());
+ | ${buildRowCount}L /
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
| }
|
| @Override
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashJoinFusionCodegenSpec.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashJoinFusionCodegenSpec.scala
index 0c65d0a5aaf..41c44564f24 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashJoinFusionCodegenSpec.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashJoinFusionCodegenSpec.scala
@@ -569,7 +569,7 @@ class HashJoinFusionCodegenSpec(
| memorySize,
| getContainingTask().getEnvironment().getIOManager(),
| $buildRowSize,
- | ${buildRowCount}L /
getRuntimeContext().getNumberOfParallelSubtasks());
+ | ${buildRowCount}L /
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
| }
|
| @Override
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
index d23ae637434..6905a8d4a8f 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/JMFailoverITCase.java
@@ -539,7 +539,7 @@ class JMFailoverITCase {
Output<StreamRecord<Long>> output) {
super.setup(containingTask, config, output);
- int subIdx = getRuntimeContext().getIndexOfThisSubtask();
+ int subIdx =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
// attempt id ++
attemptIds.compute(
@@ -601,7 +601,7 @@ class JMFailoverITCase {
Output<StreamRecord<Tuple2<Integer, Integer>>> output) {
super.setup(containingTask, config, output);
- int subIdx = getRuntimeContext().getIndexOfThisSubtask();
+ int subIdx =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
// attempt id ++
attemptIds.compute(
@@ -654,7 +654,7 @@ class JMFailoverITCase {
Output<StreamRecord<Tuple2<Integer, Integer>>> output) {
super.setup(containingTask, config, output);
- int subIdx = getRuntimeContext().getIndexOfThisSubtask();
+ int subIdx =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
// attempt id ++
attemptIds.compute(
@@ -710,7 +710,7 @@ class JMFailoverITCase {
Output<StreamRecord<Void>> output) {
super.setup(containingTask, config, output);
- int subIdx = getRuntimeContext().getIndexOfThisSubtask();
+ int subIdx =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
// attempt id ++
attemptIds.compute(
diff --git a/pom.xml b/pom.xml
index 0ce3994ede7..bd9ce10f8f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2348,6 +2348,17 @@ under the License.
<!--
FLINK-35812 move tuple interfaces into flink-core-api, should be removed in 2.0
-->
<exclude>org.apache.flink.api.java.tuple.*</exclude>
<exclude>org.apache.flink.types.NullFieldException</exclude>
+ <!--
FLINK-36225 Remove deprecated methods marked in FLIP-382 -->
+
<exclude>org.apache.flink.api.common.functions.RuntimeContext#getAttemptNumber()</exclude>
+
<exclude>org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()</exclude>
+
<exclude>org.apache.flink.api.common.functions.RuntimeContext#getJobId()</exclude>
+
<exclude>org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()</exclude>
+
<exclude>org.apache.flink.api.common.functions.RuntimeContext#getTaskName()</exclude>
+
<exclude>org.apache.flink.api.common.functions.RuntimeContext#getTaskNameWithSubtasks()</exclude>
+
<exclude>org.apache.flink.api.connector.sink2.Sink$InitContextWrapper#getAttemptNumber()</exclude>
+
<exclude>org.apache.flink.api.connector.sink2.Sink$InitContextWrapper#getJobId()</exclude>
+
<exclude>org.apache.flink.api.connector.sink2.Sink$InitContextWrapper#getNumberOfParallelSubtasks()</exclude>
+
<exclude>org.apache.flink.api.connector.sink2.Sink$InitContextWrapper#getSubtaskId()</exclude>
<!-- The
following exclusions are due to classes being relocated from the
flink-streaming-java
module
to the flink-runtime module. -->
<exclude>org.apache.flink.streaming.api.functions.windowing.AllWindowFunction</exclude>