This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new fd5d10662 [CELEBORN-2080] Bump Flink from 1.19.2, 1.20.1 to 1.19.3, 
1.20.2
fd5d10662 is described below

commit fd5d106620d9682107a1b68a0d735f9a8f480e24
Author: SteNicholas <[email protected]>
AuthorDate: Thu Jul 24 14:13:12 2025 +0800

    [CELEBORN-2080] Bump Flink from 1.19.2, 1.20.1 to 1.19.3, 1.20.2
    
    ### What changes were proposed in this pull request?
    
    Bump Flink from 1.19.2, 1.20.1 to 1.19.3, 1.20.2.
    
    ### Why are the changes needed?
    
    Flink has released v1.19.3 and v1.20.2, which release notes refer to:
    
    - [Apache Flink 1.19.3 Release 
Announcement](https://flink.apache.org/2025/07/10/apache-flink-1.19.3-release-announcement/)
    - [Apache Flink 1.20.2 Release 
Announcement](https://flink.apache.org/2025/07/10/apache-flink-1.20.2-release-announcement/)
    
    Flink v1.19.3 adds the `getConsumedPartitionType()` interface into 
`IndexedInputGate`, which refers to https://github.com/apache/flink/pull/26548.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
    
    Closes #3385 from SteNicholas/CELEBORN-2080.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
    (cherry picked from commit 4ced621534a9ba82799fe50b16f5edc39dddf7cc)
    Signed-off-by: SteNicholas <[email protected]>
---
 .../apache/celeborn/plugin/flink/RemoteShuffleInputGate.java   | 10 ++++++++++
 pom.xml                                                        |  4 ++--
 project/CelebornBuild.scala                                    |  8 ++++----
 3 files changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index c89f54489..c6f909b4a 100644
--- 
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++ 
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -61,6 +61,9 @@ import org.apache.celeborn.common.CelebornConf;
 /** A {@link IndexedInputGate} which ingest data from remote shuffle workers. 
*/
 public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {
 
+  /** The type of the partition the input gate is consuming. */
+  private final ResultPartitionType consumedPartitionType;
+
   public RemoteShuffleInputGate(
       CelebornConf celebornConf,
       ShuffleIOOwnerContext ownerContext,
@@ -79,6 +82,7 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
         bufferDecompressor,
         numConcurrentReading,
         shuffleIOMetricGroups);
+    this.consumedPartitionType = gateDescriptor.getConsumedPartitionType();
   }
 
   @Override
@@ -93,6 +97,12 @@ public class RemoteShuffleInputGate extends 
AbstractRemoteShuffleInputGate {
     return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
   }
 
+  public ResultPartitionType getConsumedPartitionType() {
+    // Flink 1.19.3
+    // [FLINK-37783] Auto-disable buffer debloating for tiered shuffle
+    return consumedPartitionType;
+  }
+
   /** Accommodation for the incompleteness of Flink pluggable shuffle service. 
*/
   private class FakedRemoteInputChannel extends RemoteInputChannel {
     FakedRemoteInputChannel(int channelIndex) {
diff --git a/pom.xml b/pom.xml
index f7e3dee6b..fdcb145bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1783,7 +1783,7 @@
         <module>tests/flink-it</module>
       </modules>
       <properties>
-        <flink.version>1.19.2</flink.version>
+        <flink.version>1.19.3</flink.version>
         <flink.binary.version>1.19</flink.binary.version>
         <scala.binary.version>2.12</scala.binary.version>
         
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.19_${scala.binary.version}</celeborn.flink.plugin.artifact>
@@ -1802,7 +1802,7 @@
         <module>tests/flink-it</module>
       </modules>
       <properties>
-        <flink.version>1.20.1</flink.version>
+        <flink.version>1.20.2</flink.version>
         <flink.binary.version>1.20</flink.binary.version>
         <scala.binary.version>2.12</scala.binary.version>
         
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.20_${scala.binary.version}</celeborn.flink.plugin.artifact>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 4387bfbf2..da84e1974 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -1179,7 +1179,7 @@ object Flink118 extends FlinkClientProjects {
 }
 
 object Flink119 extends FlinkClientProjects {
-  val flinkVersion = "1.19.2"
+  val flinkVersion = "1.19.3"
 
   // note that SBT does not allow using the period symbol (.) in project names.
   val flinkClientProjectPath = "client-flink/flink-1.19"
@@ -1189,7 +1189,7 @@ object Flink119 extends FlinkClientProjects {
 }
 
 object Flink120 extends FlinkClientProjects {
-  val flinkVersion = "1.20.1"
+  val flinkVersion = "1.20.2"
 
   // note that SBT does not allow using the period symbol (.) in project names.
   val flinkClientProjectPath = "client-flink/flink-1.20"
@@ -1229,8 +1229,8 @@ trait FlinkClientProjects {
     .aggregate(flinkCommon, flinkClient, flinkIt)
 
   // get flink major version. e.g:
-  //   1.20.1 -> 1.20
-  //   1.19.2 -> 1.19
+  //   1.20.2 -> 1.20
+  //   1.19.3 -> 1.19
   //   1.18.1 -> 1.18
   //   1.17.2 -> 1.17
   //   1.16.3 -> 1.16

Reply via email to