This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 00dc8cb80 [CELEBORN-2193] Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1
00dc8cb80 is described below

commit 00dc8cb80b0ae66fc5e1c275cb0e8915b028d2b2
Author: SteNicholas <[email protected]>
AuthorDate: Tue Nov 4 15:57:34 2025 +0800

    [CELEBORN-2193] Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1
    
    ### What changes were proposed in this pull request?
    
    Bump Flink from 2.0.0, 2.1.0 to 2.0.1, 2.1.1.
    
    ### Why are the changes needed?
    
    Flink has released v2.0.1 and v2.1.1, which release notes refer to:
    
    - [Apache Flink 2.0.1 
Release](https://github.com/apache/flink/releases/tag/release-2.0.1)
    - [Apache Flink 2.1.1 
Release](https://github.com/apache/flink/releases/tag/release-2.1.1)
    
    Flink v2.0.1 adds the `getConsumedPartitionType()` interface into 
`IndexedInputGate`, which refers to https://github.com/apache/flink/pull/26548.
    
    `HybridShuffleWordCountTest` could execute with parallelism in 
https://github.com/apache/flink/pull/26369 which has released in v2.0.1 and 
v2.1.1.
    
    ### Does this PR resolve a correctness bug?
    
    No.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3526 from SteNicholas/CELEBORN-2193.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/plugin/flink/RemoteShuffleInputGate.java | 10 ++++++++++
 pom.xml                                                      |  4 ++--
 project/CelebornBuild.scala                                  |  6 ++++--
 .../celeborn/tests/flink/HybridShuffleWordCountTest.scala    | 12 ++++++++----
 4 files changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index bd60bbe2b..b8e1994fd 100644
--- 
a/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-2.0/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -61,6 +61,9 @@ import org.apache.celeborn.common.CelebornConf;
 /** A {@link IndexedInputGate} which ingest data from remote shuffle workers. 
*/
 public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {
 
+  /** The type of the partition the input gate is consuming. */
+  private final ResultPartitionType consumedPartitionType;
+
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
       ShuffleIOOwnerContext ownerContext,
@@ -79,6 +82,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         bufferDecompressor,
         numConcurrentReading,
         shuffleIOMetricGroups);
+    this.consumedPartitionType = gateDescriptor.getConsumedPartitionType();
   }
 
   @Override
@@ -93,6 +97,12 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
     return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
   }
 
+  public ResultPartitionType getConsumedPartitionType() {
+    // Flink 1.19.3
+    // [FLINK-37783] Auto-disable buffer debloating for tiered shuffle
+    return consumedPartitionType;
+  }
+
   /** Accommodation for the incompleteness of Flink pluggable shuffle service. 
*/
   private class FakedRemoteInputChannel extends RemoteInputChannel {
     FakedRemoteInputChannel(int channelIndex) {
diff --git a/pom.xml b/pom.xml
index 4675dd3cd..816393f93 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1833,7 +1833,7 @@
         <module>tests/flink-it</module>
       </modules>
       <properties>
-        <flink.version>2.0.0</flink.version>
+        <flink.version>2.0.1</flink.version>
         <flink.binary.version>2.0</flink.binary.version>
         <scala.binary.version>2.12</scala.binary.version>
         
<celeborn.flink.plugin.artifact>celeborn-client-flink-2.0_${scala.binary.version}</celeborn.flink.plugin.artifact>
@@ -1852,7 +1852,7 @@
         <module>tests/flink-it</module>
       </modules>
       <properties>
-        <flink.version>2.1.0</flink.version>
+        <flink.version>2.1.1</flink.version>
         <flink.binary.version>2.1</flink.binary.version>
         <scala.binary.version>2.12</scala.binary.version>
         
<celeborn.flink.plugin.artifact>celeborn-client-flink-2.1_${scala.binary.version}</celeborn.flink.plugin.artifact>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 84ee69c93..f12adff49 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -1202,7 +1202,7 @@ object Flink120 extends FlinkClientProjects {
 }
 
 object Flink20 extends FlinkClientProjects {
-  val flinkVersion = "2.0.0"
+  val flinkVersion = "2.0.1"
 
   // note that SBT does not allow using the period symbol (.) in project names.
   val flinkClientProjectPath = "client-flink/flink-2.0"
@@ -1212,7 +1212,7 @@ object Flink20 extends FlinkClientProjects {
 }
 
 object Flink21 extends FlinkClientProjects {
-  val flinkVersion = "2.1.0"
+  val flinkVersion = "2.1.1"
 
   // note that SBT does not allow using the period symbol (.) in project names.
   val flinkClientProjectPath = "client-flink/flink-2.1"
@@ -1242,6 +1242,8 @@ trait FlinkClientProjects {
     .aggregate(flinkCommon, flinkClient, flinkIt)
 
   // get flink major version. e.g:
+  //   2.0.1 -> 2.0
+  //   2.1.1 -> 2.1
   //   1.20.3 -> 1.20
   //   1.19.3 -> 1.19
   //   1.18.1 -> 1.18
diff --git 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
index 29bd3cedc..1bfa24733 100644
--- 
a/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
+++ 
b/tests/flink-it/src/test/scala/org/apache/celeborn/tests/flink/HybridShuffleWordCountTest.scala
@@ -93,6 +93,9 @@ class HybridShuffleWordCountTest extends AnyFunSuite with 
Logging with MiniClust
       "execution.batch-shuffle-mode",
       "ALL_EXCHANGES_HYBRID_FULL")
     configuration.setString("taskmanager.memory.network.min", "1024m")
+    configuration.setString(
+      "execution.batch.adaptive.auto-parallelism.min-parallelism",
+      "" + parallelism)
     configuration.setString("restart-strategy.type", "fixed-delay")
     configuration.setString("restart-strategy.fixed-delay.attempts", "50")
     configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -105,8 +108,7 @@ class HybridShuffleWordCountTest extends AnyFunSuite with 
Logging with MiniClust
     env.getConfig.setParallelism(parallelism)
     env.disableOperatorChaining()
     // make parameters available in the web interface
-    // TODO: WordCountHelper should execute with parallelism for 
[FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in 
AllToAllBlockingResultInfo when submitting a job graph.
-    WordCountHelper.execute(env, 1)
+    WordCountHelper.execute(env, parallelism)
 
     val graph = env.getStreamGraph
     env.execute(graph)
@@ -129,6 +131,9 @@ class HybridShuffleWordCountTest extends AnyFunSuite with 
Logging with MiniClust
       "execution.batch-shuffle-mode",
       "ALL_EXCHANGES_HYBRID_FULL")
     configuration.setString("taskmanager.memory.network.min", "256m")
+    configuration.setString(
+      "execution.batch.adaptive.auto-parallelism.min-parallelism",
+      "" + parallelism)
     configuration.setString("restart-strategy.type", "fixed-delay")
     configuration.setString("restart-strategy.fixed-delay.attempts", "50")
     configuration.setString("restart-strategy.fixed-delay.delay", "5s")
@@ -140,8 +145,7 @@ class HybridShuffleWordCountTest extends AnyFunSuite with 
Logging with MiniClust
     env.getConfig.setParallelism(parallelism)
     env.disableOperatorChaining()
     // make parameters available in the web interface
-    // TODO: WordCountHelper should execute with parallelism for 
[FLINK-37576][runtime] Fix the incorrect status of the isBroadcast field in 
AllToAllBlockingResultInfo when submitting a job graph.
-    WordCountHelper.execute(env, 1)
+    WordCountHelper.execute(env, parallelism)
 
     val graph = env.getStreamGraph
     graph.setJobType(JobType.BATCH)

Reply via email to