This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d0829ba3b16 [FLINK-34237] Revert the breaking changes to
SourceReaderContext
d0829ba3b16 is described below
commit d0829ba3b162c24f2655b35258c9a8dc61cdba67
Author: Wencong Liu <[email protected]>
AuthorDate: Fri Jan 26 11:26:22 2024 +0800
[FLINK-34237] Revert the breaking changes to SourceReaderContext
This closes #24197
---
.../base/source/hybrid/HybridSourceReader.java | 14 ++++-----
.../datagen/source/DataGeneratorSourceTest.java | 17 +++++------
.../source/GeneratorSourceReaderFactory.java | 2 +-
.../file/src/FileSourceHeavyThroughputTest.java | 20 +++++--------
.../api/connector/source/SourceReaderContext.java | 33 ++--------------------
.../source/lib/NumberSequenceSourceTest.java | 11 +++++---
.../generator/EventsGeneratorFunction.java | 5 ++--
.../examples/java/connectors/SocketSource.java | 2 +-
.../streaming/api/operators/SourceOperator.java | 12 ++++++--
.../source/reader/TestingReaderContext.java | 23 +++++++--------
.../checkpointing/UnalignedCheckpointTestBase.java | 2 +-
pom.xml | 1 -
12 files changed, 55 insertions(+), 87 deletions(-)
diff --git
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
index e0df3a2de28..2f113078fe3 100644
---
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
+++
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
@@ -87,7 +87,7 @@ public class HybridSourceReader<T> implements SourceReader<T,
HybridSourceSplit>
// trap END_OF_INPUT unless all sources have finished
LOG.info(
"End of input subtask={} sourceIndex={} {}",
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
// Signal the coordinator that this reader has consumed all input
and the
@@ -140,7 +140,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
public void addSplits(List<HybridSourceSplit> splits) {
LOG.info(
"Adding splits subtask={} sourceIndex={} currentReader={} {}",
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader,
splits);
@@ -168,7 +168,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
}
LOG.debug(
"No more splits for subtask={} sourceIndex={}
currentReader={}",
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
}
@@ -179,7 +179,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent;
LOG.info(
"Switch source event: subtask={} sourceIndex={} source={}",
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
sse.sourceIndex(),
sse.source());
switchedSources.put(sse.sourceIndex(), sse.source());
@@ -197,7 +197,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
}
LOG.debug(
"Reader closed: subtask={} sourceIndex={} currentReader={}",
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
}
@@ -212,7 +212,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
}
LOG.debug(
"Reader closed: subtask={} sourceIndex={}
currentReader={}",
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
}
@@ -230,7 +230,7 @@ public class HybridSourceReader<T> implements
SourceReader<T, HybridSourceSplit>
availabilityFuture.complete(null);
LOG.debug(
"Reader started: subtask={} sourceIndex={} {}",
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
currentSourceIndex,
reader);
// add restored splits
diff --git
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
index f4597f3f6f2..ebda0c941cc 100644
---
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
+++
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.datagen.source;
-import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.ReaderOutput;
@@ -30,7 +29,6 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -184,6 +182,11 @@ class DataGeneratorSourceTest {
return "localhost";
}
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
@Override
public void sendSplitRequest() {}
@@ -196,14 +199,8 @@ class DataGeneratorSourceTest {
}
@Override
- public TaskInfo getTaskInfo() {
- return new TestingTaskInfo.Builder()
- .setTaskName("DummyTask")
- .setMaxNumberOfParallelSubtasks(1)
- .setIndexOfThisSubtask(0)
- .setNumberOfParallelSubtasks(1)
- .setAttemptNumber(0)
- .build();
+ public int currentParallelism() {
+ return 1;
}
}
diff --git
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
index c66f3edbd96..f3888d3d1de 100644
---
a/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
+++
b/flink-connectors/flink-connector-datagen/src/main/java/org/apache/flink/connector/datagen/source/GeneratorSourceReaderFactory.java
@@ -58,7 +58,7 @@ class GeneratorSourceReaderFactory<OUT>
@Override
public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit>
createReader(
SourceReaderContext readerContext) {
- int parallelism =
readerContext.getTaskInfo().getNumberOfParallelSubtasks();
+ int parallelism = readerContext.currentParallelism();
RateLimiter rateLimiter =
rateLimiterStrategy.createRateLimiter(parallelism);
return new RateLimitedSourceReader<>(
new GeneratingIteratorSourceReader<>(readerContext,
generatorFunction),
diff --git
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index 3facf4cafe0..bb44462ab05 100644
---
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.connector.file.src;
-import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -31,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
-import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputStatus;
@@ -197,15 +195,6 @@ class FileSourceHeavyThroughputTest {
private static final class NoOpReaderContext implements
SourceReaderContext {
- private final TaskInfo taskInfo =
- new TestingTaskInfo.Builder()
- .setTaskName("NoOpTask")
- .setMaxNumberOfParallelSubtasks(1)
- .setIndexOfThisSubtask(0)
- .setAttemptNumber(0)
- .setNumberOfParallelSubtasks(1)
- .build();
-
@Override
public SourceReaderMetricGroup metricGroup() {
return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
@@ -221,6 +210,11 @@ class FileSourceHeavyThroughputTest {
return "localhost";
}
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
@Override
public void sendSplitRequest() {}
@@ -233,8 +227,8 @@ class FileSourceHeavyThroughputTest {
}
@Override
- public TaskInfo getTaskInfo() {
- return taskInfo;
+ public int currentParallelism() {
+ return 1;
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 78e45007915..08c64501d9a 100644
---
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,8 +19,6 @@
package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.Public;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
@@ -41,19 +39,8 @@ public interface SourceReaderContext {
*/
String getLocalHostName();
- /**
- * Get the index of this subtask.
- *
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
- */
- @Deprecated
- default int getIndexOfSubtask() {
- return getTaskInfo().getIndexOfThisSubtask();
- }
+ /** @return The index of this subtask. */
+ int getIndexOfSubtask();
/**
* Sends a split request to the source's {@link SplitEnumerator}. This
will result in a call to
@@ -81,22 +68,8 @@ public interface SourceReaderContext {
* Get the current parallelism of this Source.
*
* @return the parallelism of the Source.
- * @deprecated This method is deprecated since Flink 1.19. All metadata
about the task should be
- * provided uniformly by {@link #getTaskInfo()}.
- * @see <a
- *
href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
- * FLIP-382: Unify the Provision of Diverse Metadata for Context-like
APIs </a>
*/
- @Deprecated
default int currentParallelism() {
- return getTaskInfo().getNumberOfParallelSubtasks();
+ throw new UnsupportedOperationException();
}
-
- /**
- * Get the meta information of current task.
- *
- * @return the task meta information.
- */
- @PublicEvolving
- TaskInfo getTaskInfo();
}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index f8bf5c8dace..ce97ff71269 100644
---
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.api.connector.source.lib;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
@@ -130,6 +128,11 @@ public class NumberSequenceSourceTest {
return "localhost";
}
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
@Override
public void sendSplitRequest() {}
@@ -142,8 +145,8 @@ public class NumberSequenceSourceTest {
}
@Override
- public TaskInfo getTaskInfo() {
- return new TaskInfoImpl("DummyTask", 1, 0, 1, 0);
+ public int currentParallelism() {
+ return 1;
}
}
diff --git
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
index d04c6c4e166..70d6ec3f4ec 100644
---
a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
+++
b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/generator/EventsGeneratorFunction.java
@@ -44,9 +44,8 @@ public class EventsGeneratorFunction implements
GeneratorFunction<Long, Event> {
@Override
public void open(SourceReaderContext readerContext) throws Exception {
- final int range =
- Integer.MAX_VALUE /
readerContext.getTaskInfo().getNumberOfParallelSubtasks();
- min = range * readerContext.getTaskInfo().getIndexOfThisSubtask();
+ final int range = Integer.MAX_VALUE /
readerContext.currentParallelism();
+ min = range * readerContext.getIndexOfSubtask();
max = min + range;
generator = new EventsGenerator(errorProbability);
}
diff --git
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
index c63a5468768..e4361187c48 100644
---
a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
+++
b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/SocketSource.java
@@ -116,7 +116,7 @@ public final class SocketSource
public SourceReader<RowData, DummySplit> createReader(SourceReaderContext
readerContext)
throws Exception {
Preconditions.checkState(
- readerContext.getTaskInfo().getNumberOfParallelSubtasks() == 1,
+ readerContext.currentParallelism() == 1,
"SocketSource can only work with a parallelism of 1.");
deserializer.open(
new DeserializationSchema.InitializationContext() {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index de74cb13b8f..91d9a463178 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -19,7 +19,6 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
@@ -255,6 +254,8 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
return;
}
+ final int subtaskIndex =
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+
final SourceReaderContext context =
new SourceReaderContext() {
@Override
@@ -272,6 +273,11 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
return localHostname;
}
+ @Override
+ public int getIndexOfSubtask() {
+ return subtaskIndex;
+ }
+
@Override
public void sendSplitRequest() {
operatorEventGateway.sendEventToCoordinator(
@@ -302,8 +308,8 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
}
@Override
- public TaskInfo getTaskInfo() {
- return getRuntimeContext().getTaskInfo();
+ public int currentParallelism() {
+ return
getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
}
};
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
index fe801c2f17a..4855ced967b 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testutils/source/reader/TestingReaderContext.java
@@ -18,11 +18,9 @@
package org.apache.flink.connector.testutils.source.reader;
-import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.SimpleUserCodeClassLoader;
@@ -68,6 +66,11 @@ public class TestingReaderContext implements
SourceReaderContext {
return "localhost";
}
+ @Override
+ public int getIndexOfSubtask() {
+ return 0;
+ }
+
@Override
public void sendSplitRequest() {
numSplitRequests++;
@@ -83,6 +86,11 @@ public class TestingReaderContext implements
SourceReaderContext {
return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}
+ @Override
+ public int currentParallelism() {
+ return 1;
+ }
+
// ------------------------------------------------------------------------
public int getNumSplitRequests() {
@@ -96,15 +104,4 @@ public class TestingReaderContext implements
SourceReaderContext {
public void clearSentEvents() {
sentEvents.clear();
}
-
- @Override
- public TaskInfo getTaskInfo() {
- return new TestingTaskInfo.Builder()
- .setTaskName("TestTask")
- .setMaxNumberOfParallelSubtasks(1)
- .setIndexOfThisSubtask(0)
- .setAttemptNumber(0)
- .setNumberOfParallelSubtasks(1)
- .build();
- }
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index b1fef658892..4218be61e60 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -247,7 +247,7 @@ public abstract class UnalignedCheckpointTestBase extends
TestLogger {
@Override
public SourceReader<Long, LongSplit> createReader(SourceReaderContext
readerContext) {
return new LongSourceReader(
- readerContext.getTaskInfo().getIndexOfThisSubtask(),
+ readerContext.getIndexOfSubtask(),
minCheckpoints,
expectedRestarts,
checkpointingInterval,
diff --git a/pom.xml b/pom.xml
index 5a44046e4a6..fbf34fcd46b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2320,7 +2320,6 @@ under the License.
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)</exclude>
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
<exclude>org.apache.flink.api.common.functions.RuntimeContext</exclude>
-
<exclude>org.apache.flink.api.connector.source.SourceReaderContext</exclude>
<!-- MARKER:
end exclusions -->
</excludes>
<accessModifier>public</accessModifier>