This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 4ced62153 [CELEBORN-2080] Bump Flink from 1.19.2, 1.20.1 to 1.19.3,
1.20.2
4ced62153 is described below
commit 4ced621534a9ba82799fe50b16f5edc39dddf7cc
Author: SteNicholas <[email protected]>
AuthorDate: Thu Jul 24 14:13:12 2025 +0800
[CELEBORN-2080] Bump Flink from 1.19.2, 1.20.1 to 1.19.3, 1.20.2
### What changes were proposed in this pull request?
Bump Flink from 1.19.2, 1.20.1 to 1.19.3, 1.20.2.
### Why are the changes needed?
Flink has released v1.19.3 and v1.20.2, which release notes refer to:
- [Apache Flink 1.19.3 Release
Announcement](https://flink.apache.org/2025/07/10/apache-flink-1.19.3-release-announcement/)
- [Apache Flink 1.20.2 Release
Announcement](https://flink.apache.org/2025/07/10/apache-flink-1.20.2-release-announcement/)
Flink v1.19.3 adds the `getConsumedPartitionType()` interface into
`IndexedInputGate`, which refers to https://github.com/apache/flink/pull/26548.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
Closes #3385 from SteNicholas/CELEBORN-2080.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: SteNicholas <[email protected]>
---
.../apache/celeborn/plugin/flink/RemoteShuffleInputGate.java | 10 ++++++++++
pom.xml | 4 ++--
project/CelebornBuild.scala | 8 ++++----
3 files changed, 16 insertions(+), 6 deletions(-)
diff --git
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
index c89f54489..c6f909b4a 100644
---
a/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
+++
b/client-flink/flink-1.19/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -61,6 +61,9 @@ import org.apache.celeborn.common.CelebornConf;
/** A {@link IndexedInputGate} which ingest data from remote shuffle workers.
*/
public class RemoteShuffleInputGate extends AbstractRemoteShuffleInputGate {
+ /** The type of the partition the input gate is consuming. */
+ private final ResultPartitionType consumedPartitionType;
+
public RemoteShuffleInputGate(
CelebornConf celebornConf,
ShuffleIOOwnerContext ownerContext,
@@ -79,6 +82,7 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
bufferDecompressor,
numConcurrentReading,
shuffleIOMetricGroups);
+ this.consumedPartitionType = gateDescriptor.getConsumedPartitionType();
}
@Override
@@ -93,6 +97,12 @@ public class RemoteShuffleInputGate extends
AbstractRemoteShuffleInputGate {
return Tuple2.of(indexRange.getStartIndex(), indexRange.getEndIndex());
}
+ public ResultPartitionType getConsumedPartitionType() {
+ // Flink 1.19.3
+ // [FLINK-37783] Auto-disable buffer debloating for tiered shuffle
+ return consumedPartitionType;
+ }
+
/** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
private class FakedRemoteInputChannel extends RemoteInputChannel {
FakedRemoteInputChannel(int channelIndex) {
diff --git a/pom.xml b/pom.xml
index e5ee1a32f..932cadd3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1783,7 +1783,7 @@
<module>tests/flink-it</module>
</modules>
<properties>
- <flink.version>1.19.2</flink.version>
+ <flink.version>1.19.3</flink.version>
<flink.binary.version>1.19</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.19_${scala.binary.version}</celeborn.flink.plugin.artifact>
@@ -1802,7 +1802,7 @@
<module>tests/flink-it</module>
</modules>
<properties>
- <flink.version>1.20.1</flink.version>
+ <flink.version>1.20.2</flink.version>
<flink.binary.version>1.20</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.20_${scala.binary.version}</celeborn.flink.plugin.artifact>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 3922038a4..cdce5973b 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -1179,7 +1179,7 @@ object Flink118 extends FlinkClientProjects {
}
object Flink119 extends FlinkClientProjects {
- val flinkVersion = "1.19.2"
+ val flinkVersion = "1.19.3"
// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-1.19"
@@ -1189,7 +1189,7 @@ object Flink119 extends FlinkClientProjects {
}
object Flink120 extends FlinkClientProjects {
- val flinkVersion = "1.20.1"
+ val flinkVersion = "1.20.2"
// note that SBT does not allow using the period symbol (.) in project names.
val flinkClientProjectPath = "client-flink/flink-1.20"
@@ -1229,8 +1229,8 @@ trait FlinkClientProjects {
.aggregate(flinkCommon, flinkClient, flinkIt)
// get flink major version. e.g:
- // 1.20.1 -> 1.20
- // 1.19.2 -> 1.19
+ // 1.20.2 -> 1.20
+ // 1.19.3 -> 1.19
// 1.18.1 -> 1.18
// 1.17.2 -> 1.17
// 1.16.3 -> 1.16