This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 00dc8cb80 [CELEBORN-2193] Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1
00dc8cb80 is described below
commit 00dc8cb80b0ae66fc5e1c275cb0e8915b028d2b2
Author: SteNicholas <[email protected]>
AuthorDate: Tue Nov 4 15:57:34 2025 +0800
[CELEBORN-2193] Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1
### What changes were proposed in this pull request?
Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1.
### Why are the changes needed?
Flink has released v2.0.1 and v2.1.1, which release notes refer to:
- [Apache Flink 2.0.1
Release](https://github.com/apache/flink/releases/tag/release-2.0.1)
- [Apache Flink 2.1.1
Release](https://github.com/apache/flink/releases/tag/release-2.1.1)
Flink v2.0.1 adds the `getConsumedPartitionType()` interface into
`IndexedInputGate`, which refers to https://github.com/apache/flink/pull/26548.
`HybridShuffleWordCountTest` could execute with parallelism in
https://github.com/apache/flink/pull/26369 which has released in v2.0.1 and
v2.1.1.
### Does this PR resolve a correctness bug?
No.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3526 from SteNicholas/CELEBORN-2193.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/plugin/flink/RemoteShuffleInputGate.java | 10 ++++++++++
pom.xml | 4 ++--
project/CelebornBuild.scala | 6 ++++--
.../celeborn/tests/flink/HybridShuffleWordCountTest.scala | 12 ++++++++----
4 files changed, 24 insertions(+), 8 deletions(-)
diff --git
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index bd60bbe2b..b8e1994fd 100644
---
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -61,6 +61,9 @@ import org.apache.celeborn.common.CelebornConf;
/** A {@link IndexedInputGate} which ingest data from remote shuffle workers.
*/
public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {
+ /** The type of the partition the input gate is consuming. */
+ private final ResultPartitionType consumedPartitionType;
+
public RemoteShuffleInputGate(
CelebornConf celebornConf,
ShuffleIOOwnerContext ownerContext,
@@ -79,6 +82,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
bufferDecompressor,
numConcurrentReading,
shuffleIOMetricGroups);
+ this.consumedPartitionType = gateDescriptor.getConsumedPartitionType();
}
@Override
@@ -93,6 +97,12 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
}
+ public ResultPartitionType getConsumedPartitionType() {
+ // Flink 1.19.3
+ // [FLINK-37783] Auto-disable buffer debloating for tiered shuffle
+ return consumedPartitionType;
+ }
+
/** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
private class FakedRemoteInputChannel extends RemoteInputChannel {
FakedRemoteInputChannel(int channelIndex) {
diff --git a/pom.xml b/pom.xml
index 4675dd3cd..816393f93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1833,7 +1833,7 @@
<module>tests/flink-it</module>
</modules>
<properties>
- <flink.version>2.0.0</flink.version>
+ <flink.version>2.0.1</flink.version>
<flink.binary.version>2.0</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-2.0_${scala.binary.version}</celeborn.flink.plugin.artifact>
@@ -1852,7 +1852,7 @@
<module>tests/flink-it</module>
</modules>
<properties>
- <flink.version>2.1.0</flink.version>
+ <flink.version>2.1.1</flink.version>
<flink.binary.version>2.1</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-2.1_${scala.binary.version}</celeborn.flink.plugin.artifact>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 84ee69c93..f12adff49 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -1202,7 +1202,7 @@ object Flink120 extends FlinkClientProjects {
}
object Flink20 extends FlinkClientProjects {
- val flinkVersion = "2.0.0"
+ val flinkVersion = "2.0.1"
// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-2.0"
@@ -1212,7 +1212,7 @@ object Flink20 extends FlinkClientProjects {
}
object Flink21 extends FlinkClientProjects {
- val flinkVersion = "2.1.0"
+ val flinkVersion = "2.1.1"
// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-2.1"
@@ -1242,6 +1242,8 @@ trait FlinkClientProjects {
.aggregate(flinkCommon, flinkClient, flinkIt)
// get flink major version. e.g:
+ // 2.0.1 -> 2.0
+ // 2.1.1 -> 2.1
// 1.20.3 -> 1.20
// 1.19.3 -> 1.19
// 1.18.1 -> 1.18
diff --git
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
index 29bd3cedc..1bfa24733 100644
---
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
+++
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
@@ -93,6 +93,9 @@ class HybridShuffleWordCountTest extends AnyFunSuite with
Logging with MiniClust
"execution.batch-shuffle-mode",
"ALL_EXCHANGES_HYBRID_FULL")
configuration.setString("taskmanager.memory.network.min", "1024m")
+ configuration.setString(
+ "execution.batch.adaptive.auto-parallelism.min-parallelism",
+ "" + parallelism)
configuration.setString("restart-strategy.type", "fixed-delay")
configuration.setString("restart-strategy.fixed-delay.attempts", "50")
configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -105,8 +108,7 @@ class HybridShuffleWordCountTest extends AnyFunSuite with
Logging with MiniClust
env.getConfig.setParallelism(parallelism)
env.disableOperatorChaining()
// make parameters available in the web interface
- // TODO: WordCountHelper should execute with parallelism for
[FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in
AllToAllBlockingResultInfo when submitting a job graph.
- WordCountHelper.execute(env, 1)
+ WordCountHelper.execute(env, parallelism)
val graph = env.getStreamGraph
env.execute(graph)
@@ -129,6 +131,9 @@ class HybridShuffleWordCountTest extends AnyFunSuite with
Logging with MiniClust
"execution.batch-shuffle-mode",
"ALL_EXCHANGES_HYBRID_FULL")
configuration.setString("taskmanager.memory.network.min", "256m")
+ configuration.setString(
+ "execution.batch.adaptive.auto-parallelism.min-parallelism",
+ "" + parallelism)
configuration.setString("restart-strategy.type", "fixed-delay")
configuration.setString("restart-strategy.fixed-delay.attempts", "50")
configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -140,8 +145,7 @@ class HybridShuffleWordCountTest extends AnyFunSuite with
Logging with MiniClust
env.getConfig.setParallelism(parallelism)
env.disableOperatorChaining()
// make parameters available in the web interface
- // TODO: WordCountHelper should execute with parallelism for
[FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in
AllToAllBlockingResultInfo when submitting a job graph.
- WordCountHelper.execute(env, 1)
+ WordCountHelper.execute(env, parallelism)
val graph = env.getStreamGraph
graph.setJobType(JobType.BATCH)