This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d0829ba3b16 [FLINK-34237] Revert the breaking changes to 
SourceReaderContext
d0829ba3b16 is described below

commit d0829ba3b162c24f2655b35258c9a8dc61cdba67
Author: Wencong Liu <[email protected]>
AuthorDate: Fri Jan 26 11:26:22 2024 +0800

    [FLINK-34237] Revert the breaking changes to SourceReaderContext
    
    This closes #24197
---
 .../base/source/hybrid/HybridSourceReader.java     | 14 ++++-----
 .../datagen/source/DataGeneratorSourceTest.java    | 17 +++++------
 .../source/GeneratorSourceReaderFactory.java       |  2 +-
 .../file/src/FileSourceHeavyThroughputTest.java    | 20 +++++--------
 .../api/connector/source/SourceReaderContext.java  | 33 ++--------------------
 .../source/lib/NumberSequenceSourceTest.java       | 11 +++++---
 .../generator/EventsGeneratorFunction.java         |  5 ++--
 .../examples/java/connectors/SocketSource.java     |  2 +-
 .../streaming/api/operators/SourceOperator.java    | 12 ++++++--
 .../source/reader/TestingReaderContext.java        | 23 +++++++--------
 .../checkpointing/UnalignedCheckpointTestBase.java |  2 +-
 pom.xml                                            |  1 -
 12 files changed, 55 insertions(+), 87 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index e0df3a2de28..2f113078fe3 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -87,7 +87,7 @@ public class HybridSourceReader<T> implements SourceReader<T, 
HybridSourceSplit>
             // trap END_OF_INPUT unless all sources have finished
             LOG.info(
                     "End of input subtask={} sourceIndex={} {}",
-                    readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                    readerContext.getIndexOfSubtask(),
                     currentSourceIndex,
                     currentReader);
             // Signal the coordinator that this reader has consumed all input 
and the
@@ -140,7 +140,7 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
     public void addSplits(List<HybridSourceSplit> splits) {
         LOG.info(
                 "Adding splits subtask={} sourceIndex={} currentReader={} {}",
-                readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                readerContext.getIndexOfSubtask(),
                 currentSourceIndex,
                 currentReader,
                 splits);
@@ -168,7 +168,7 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
         }
         LOG.debug(
                 "No more splits for subtask={} sourceIndex={} 
currentReader={}",
-                readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                readerContext.getIndexOfSubtask(),
                 currentSourceIndex,
                 currentReader);
     }
@@ -179,7 +179,7 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
             SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent;
             LOG.info(
                     "Switch source event: subtask={} sourceIndex={} source={}",
-                    readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                    readerContext.getIndexOfSubtask(),
                     sse.sourceIndex(),
                     sse.source());
             switchedSources.put(sse.sourceIndex(), sse.source());
@@ -197,7 +197,7 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
         }
         LOG.debug(
                 "Reader closed: subtask={} sourceIndex={} currentReader={}",
-                readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                readerContext.getIndexOfSubtask(),
                 currentSourceIndex,
                 currentReader);
     }
@@ -212,7 +212,7 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
             }
             LOG.debug(
                     "Reader closed: subtask={} sourceIndex={} 
currentReader={}",
-                    readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                    readerContext.getIndexOfSubtask(),
                     currentSourceIndex,
                     currentReader);
         }
@@ -230,7 +230,7 @@ public class HybridSourceReader<T> implements 
SourceReader<T, HybridSourceSplit>
         availabilityFuture.complete(null);
         LOG.debug(
                 "Reader started: subtask={} sourceIndex={} {}",
-                readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                readerContext.getIndexOfSubtask(),
                 currentSourceIndex,
                 reader);
         // add restored splits
diff --git 
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
index f4597f3f6f2..ebda0c941cc 100644
--- 
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
+++ 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connector.datagen.source;
 
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.connector.source.ReaderOutput;
@@ -30,7 +29,6 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
 import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.testutils.source.TestingTaskInfo;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -184,6 +182,11 @@ class DataGeneratorSourceTest {
             return "localhost";
         }
 
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
         @Override
         public void sendSplitRequest() {}
 
@@ -196,14 +199,8 @@ class DataGeneratorSourceTest {
         }
 
         @Override
-        public TaskInfo getTaskInfo() {
-            return new TestingTaskInfo.Builder()
-                    .setTaskName("DummyTask")
-                    .setMaxNumberOfParallelSubtasks(1)
-                    .setIndexOfThisSubtask(0)
-                    .setNumberOfParallelSubtasks(1)
-                    .setAttemptNumber(0)
-                    .build();
+        public int currentParallelism() {
+            return 1;
         }
     }
 
diff --git 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
index c66f3edbd96..f3888d3d1de 100644
--- 
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
+++ 
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
@@ -58,7 +58,7 @@ class GeneratorSourceReaderFactory<OUT>
     @Override
     public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> 
createReader(
             SourceReaderContext readerContext) {
-        int parallelism = 
readerContext.getTaskInfo().getNumberOfParallelSubtasks();
+        int parallelism = readerContext.currentParallelism();
         RateLimiter rateLimiter = 
rateLimiterStrategy.createRateLimiter(parallelism);
         return new RateLimitedSourceReader<>(
                 new GeneratingIteratorSourceReader<>(readerContext, 
generatorFunction),
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index 3facf4cafe0..bb44462ab05 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.connector.file.src;
 
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -31,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
 import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
-import org.apache.flink.connector.testutils.source.TestingTaskInfo;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.InputStatus;
@@ -197,15 +195,6 @@ class FileSourceHeavyThroughputTest {
 
     private static final class NoOpReaderContext implements 
SourceReaderContext {
 
-        private final TaskInfo taskInfo =
-                new TestingTaskInfo.Builder()
-                        .setTaskName("NoOpTask")
-                        .setMaxNumberOfParallelSubtasks(1)
-                        .setIndexOfThisSubtask(0)
-                        .setAttemptNumber(0)
-                        .setNumberOfParallelSubtasks(1)
-                        .build();
-
         @Override
         public SourceReaderMetricGroup metricGroup() {
             return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
@@ -221,6 +210,11 @@ class FileSourceHeavyThroughputTest {
             return "localhost";
         }
 
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
         @Override
         public void sendSplitRequest() {}
 
@@ -233,8 +227,8 @@ class FileSourceHeavyThroughputTest {
         }
 
         @Override
-        public TaskInfo getTaskInfo() {
-            return taskInfo;
+        public int currentParallelism() {
+            return 1;
         }
     }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 78e45007915..08c64501d9a 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,8 +19,6 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 import org.apache.flink.util.UserCodeClassLoader;
@@ -41,19 +39,8 @@ public interface SourceReaderContext {
      */
     String getLocalHostName();
 
-    /**
-     * Get the index of this subtask.
-     *
-     * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
-     *     provided uniformly by {@link #getTaskInfo()}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
-     *     FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs </a>
-     */
-    @Deprecated
-    default int getIndexOfSubtask() {
-        return getTaskInfo().getIndexOfThisSubtask();
-    }
+    /** @return The index of this subtask. */
+    int getIndexOfSubtask();
 
     /**
      * Sends a split request to the source's {@link SplitEnumerator}. This 
will result in a call to
@@ -81,22 +68,8 @@ public interface SourceReaderContext {
      * Get the current parallelism of this Source.
      *
      * @return the parallelism of the Source.
-     * @deprecated This method is deprecated since Flink 1.19. All metadata 
about the task should be
-     *     provided uniformly by {@link #getTaskInfo()}.
-     * @see <a
-     *     
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs";>
-     *     FLIP-382: Unify the Provision of Diverse Metadata for Context-like 
APIs </a>
      */
-    @Deprecated
     default int currentParallelism() {
-        return getTaskInfo().getNumberOfParallelSubtasks();
+        throw new UnsupportedOperationException();
     }
-
-    /**
-     * Get the meta information of current task.
-     *
-     * @return the task meta information.
-     */
-    @PublicEvolving
-    TaskInfo getTaskInfo();
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index f8bf5c8dace..ce97ff71269 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.api.connector.source.lib;
 
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.TaskInfoImpl;
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceEvent;
@@ -130,6 +128,11 @@ public class NumberSequenceSourceTest {
             return "localhost";
         }
 
+        @Override
+        public int getIndexOfSubtask() {
+            return 0;
+        }
+
         @Override
         public void sendSplitRequest() {}
 
@@ -142,8 +145,8 @@ public class NumberSequenceSourceTest {
         }
 
         @Override
-        public TaskInfo getTaskInfo() {
-            return new TaskInfoImpl("DummyTask", 1, 0, 1, 0);
+        public int currentParallelism() {
+            return 1;
         }
     }
 
diff --git 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
index d04c6c4e166..70d6ec3f4ec 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
+++ 
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
@@ -44,9 +44,8 @@ public class EventsGeneratorFunction implements 
GeneratorFunction<Long, Event> {
 
     @Override
     public void open(SourceReaderContext readerContext) throws Exception {
-        final int range =
-                Integer.MAX_VALUE / 
readerContext.getTaskInfo().getNumberOfParallelSubtasks();
-        min = range * readerContext.getTaskInfo().getIndexOfThisSubtask();
+        final int range = Integer.MAX_VALUE / 
readerContext.currentParallelism();
+        min = range * readerContext.getIndexOfSubtask();
         max = min + range;
         generator = new EventsGenerator(errorProbability);
     }
diff --git 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
index c63a5468768..e4361187c48 100644
--- 
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
+++ 
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
@@ -116,7 +116,7 @@ public final class SocketSource
     public SourceReader<RowData, DummySplit> createReader(SourceReaderContext 
readerContext)
             throws Exception {
         Preconditions.checkState(
-                readerContext.getTaskInfo().getNumberOfParallelSubtasks() == 1,
+                readerContext.currentParallelism() == 1,
                 "SocketSource can only work with a parallelism of 1.");
         deserializer.open(
                 new DeserializationSchema.InitializationContext() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index de74cb13b8f..91d9a463178 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.ListState;
@@ -255,6 +254,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             return;
         }
 
+        final int subtaskIndex = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+
         final SourceReaderContext context =
                 new SourceReaderContext() {
                     @Override
@@ -272,6 +273,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                         return localHostname;
                     }
 
+                    @Override
+                    public int getIndexOfSubtask() {
+                        return subtaskIndex;
+                    }
+
                     @Override
                     public void sendSplitRequest() {
                         operatorEventGateway.sendEventToCoordinator(
@@ -302,8 +308,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                     }
 
                     @Override
-                    public TaskInfo getTaskInfo() {
-                        return getRuntimeContext().getTaskInfo();
+                    public int currentParallelism() {
+                        return 
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
                     }
                 };
 
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
index fe801c2f17a..4855ced967b 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.connector.testutils.source.reader;
 
-import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.testutils.source.TestingTaskInfo;
 import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.util.SimpleUserCodeClassLoader;
@@ -68,6 +66,11 @@ public class TestingReaderContext implements 
SourceReaderContext {
         return "localhost";
     }
 
+    @Override
+    public int getIndexOfSubtask() {
+        return 0;
+    }
+
     @Override
     public void sendSplitRequest() {
         numSplitRequests++;
@@ -83,6 +86,11 @@ public class TestingReaderContext implements 
SourceReaderContext {
         return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
     }
 
+    @Override
+    public int currentParallelism() {
+        return 1;
+    }
+
     // ------------------------------------------------------------------------
 
     public int getNumSplitRequests() {
@@ -96,15 +104,4 @@ public class TestingReaderContext implements 
SourceReaderContext {
     public void clearSentEvents() {
         sentEvents.clear();
     }
-
-    @Override
-    public TaskInfo getTaskInfo() {
-        return new TestingTaskInfo.Builder()
-                .setTaskName("TestTask")
-                .setMaxNumberOfParallelSubtasks(1)
-                .setIndexOfThisSubtask(0)
-                .setAttemptNumber(0)
-                .setNumberOfParallelSubtasks(1)
-                .build();
-    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index b1fef658892..4218be61e60 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -247,7 +247,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
         @Override
         public SourceReader<Long, LongSplit> createReader(SourceReaderContext 
readerContext) {
             return new LongSourceReader(
-                    readerContext.getTaskInfo().getIndexOfThisSubtask(),
+                    readerContext.getIndexOfSubtask(),
                     minCheckpoints,
                     expectedRestarts,
                     checkpointingInterval,
diff --git a/pom.xml b/pom.xml
index 5a44046e4a6..fbf34fcd46b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2320,7 +2320,6 @@ under the License.
                                                                
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)</exclude>
                                                                
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
                                                                
<exclude>org.apache.flink.api.common.functions.RuntimeContext</exclude>
-                                                               
<exclude>org.apache.flink.api.connector.source.SourceReaderContext</exclude>
                                                                <!-- MARKER: 
end exclusions -->
                                                        </excludes>
                                                        
<accessModifier>public</accessModifier>

Reply via email to