This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 17f89c553 [CELEBORN-1504] Support for Apache Flink 1.16
17f89c553 is described below
commit 17f89c553e4ffec43a8d9d8e4c07b86042f539e8
Author: Mridul Muralidharan <mridulatgmail.com>
AuthorDate: Mon Jul 15 10:44:16 2024 +0800
[CELEBORN-1504] Support for Apache Flink 1.16
### What changes were proposed in this pull request?
Add support for Apache Flink 1.16 in Celeborn.
### Why are the changes needed?
User requests for Apache Flink 1.16.
This implementation is a synthesis of 1.15 and 1.17 support which already
exists in Apache Celeborn
### Does this PR introduce _any_ user-facing change?
Yes, supports Apache Flink 1.16
### How was this patch tested?
Tests for 1.16 added, which are based on 1.15 and 1.17
Closes #2619 from mridulm/flink-1.16-support.
Authored-by: Mridul Muralidharan <mridulatgmail.com>
Signed-off-by: Shuang <[email protected]>
---
.github/workflows/deps.yml | 2 +
.github/workflows/license.yml | 1 +
.github/workflows/maven.yml | 1 +
.github/workflows/sbt.yml | 1 +
.github/workflows/style.yml | 1 +
README.md | 2 +-
build/make-distribution.sh | 2 +
build/release/release.sh | 3 +
client-flink/flink-1.16-shaded/pom.xml | 137 +++++
.../src/main/resources/META-INF/LICENSE | 248 +++++++++
.../src/main/resources/META-INF/NOTICE | 45 ++
.../META-INF/licenses/LICENSE-protobuf.txt | 42 ++
client-flink/flink-1.16/pom.xml | 75 +++
.../plugin/flink/RemoteShuffleEnvironment.java | 82 +++
.../plugin/flink/RemoteShuffleInputGate.java | 295 ++++++++++
.../flink/RemoteShuffleInputGateFactory.java | 55 ++
.../plugin/flink/RemoteShuffleResultPartition.java | 222 ++++++++
.../flink/RemoteShuffleResultPartitionFactory.java | 79 +++
.../plugin/flink/RemoteShuffleServiceFactory.java | 60 ++
.../plugin/flink/SimpleResultPartitionAdapter.java | 27 +
.../plugin/flink/RemoteShuffleMasterTest.java | 296 ++++++++++
.../RemoteShuffleResultPartitionFactorySuiteJ.java | 51 ++
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 618 +++++++++++++++++++++
.../flink/RemoteShuffleServiceFactorySuitJ.java | 58 ++
dev/dependencies.sh | 4 +
dev/reformat | 1 +
docs/developers/sbt.md | 1 +
pom.xml | 30 +-
project/CelebornBuild.scala | 11 +
tests/flink-it/pom.xml | 2 +-
30 files changed, 2445 insertions(+), 7 deletions(-)
diff --git a/.github/workflows/deps.yml b/.github/workflows/deps.yml
index 98ea3f537..dea25d5ae 100644
--- a/.github/workflows/deps.yml
+++ b/.github/workflows/deps.yml
@@ -50,6 +50,7 @@ jobs:
- 'spark-3.5'
- 'flink-1.14'
- 'flink-1.15'
+ - 'flink-1.16'
- 'flink-1.17'
- 'flink-1.18'
- 'flink-1.19'
@@ -82,6 +83,7 @@ jobs:
- 'spark-3.5'
- 'flink-1.14'
- 'flink-1.15'
+ - 'flink-1.16'
- 'flink-1.17'
- 'flink-1.18'
- 'flink-1.19'
diff --git a/.github/workflows/license.yml b/.github/workflows/license.yml
index d0b41a174..39a627b79 100644
--- a/.github/workflows/license.yml
+++ b/.github/workflows/license.yml
@@ -45,6 +45,7 @@ jobs:
- run: |
build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,flink-1.14
build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,flink-1.15
+ build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,flink-1.16
build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,flink-1.17
build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,flink-1.18
build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,flink-1.19
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index e7de091eb..8cab3052f 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -153,6 +153,7 @@ jobs:
flink:
- '1.14'
- '1.15'
+ - '1.16'
- '1.17'
- '1.18'
- '1.19'
diff --git a/.github/workflows/sbt.yml b/.github/workflows/sbt.yml
index a93840124..0c2faa822 100644
--- a/.github/workflows/sbt.yml
+++ b/.github/workflows/sbt.yml
@@ -205,6 +205,7 @@ jobs:
flink:
- '1.14'
- '1.15'
+ - '1.16'
- '1.17'
- '1.18'
- '1.19'
diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml
index 4acf9ad68..717e813ea 100644
--- a/.github/workflows/style.yml
+++ b/.github/workflows/style.yml
@@ -49,6 +49,7 @@ jobs:
- run: |
build/mvn spotless:check -Pgoogle-mirror,flink-1.14
build/mvn spotless:check -Pgoogle-mirror,flink-1.15
+ build/mvn spotless:check -Pgoogle-mirror,flink-1.16
build/mvn spotless:check -Pgoogle-mirror,flink-1.17
build/mvn spotless:check -Pgoogle-mirror,flink-1.18
build/mvn spotless:check -Pgoogle-mirror,flink-1.19
diff --git a/README.md b/README.md
index 1947d76e5..1755d03e6 100644
--- a/README.md
+++ b/README.md
@@ -46,7 +46,7 @@ Celeborn worker's slot count decreases when a partition is
allocated and increme
Build Celeborn via `make-distribution.sh`:
```shell
-./build/make-distribution.sh
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.17/-Pflink-1.18/-Pflink-1.19/-Pmr
+./build/make-distribution.sh
-Pspark-2.4/-Pspark-3.0/-Pspark-3.1/-Pspark-3.2/-Pspark-3.3/-Pspark-3.4/-Pflink-1.14/-Pflink-1.15/-Pflink-1.16/-Pflink-1.17/-Pflink-1.18/-Pflink-1.19/-Pmr
```
Package `apache-celeborn-${project.version}-bin.tgz` will be generated.
diff --git a/build/make-distribution.sh b/build/make-distribution.sh
index f930349ef..e5e39ec6e 100755
--- a/build/make-distribution.sh
+++ b/build/make-distribution.sh
@@ -309,6 +309,7 @@ if [ "$SBT_ENABLED" == "true" ]; then
sbt_build_client -Pspark-3.5
sbt_build_client -Pflink-1.14
sbt_build_client -Pflink-1.15
+ sbt_build_client -Pflink-1.16
sbt_build_client -Pflink-1.17
sbt_build_client -Pflink-1.18
sbt_build_client -Pflink-1.19
@@ -342,6 +343,7 @@ else
build_spark_client -Pspark-3.5
build_flink_client -Pflink-1.14
build_flink_client -Pflink-1.15
+ build_flink_client -Pflink-1.16
build_flink_client -Pflink-1.17
build_flink_client -Pflink-1.18
build_flink_client -Pflink-1.19
diff --git a/build/release/release.sh b/build/release/release.sh
index abea58772..ab94cfc47 100755
--- a/build/release/release.sh
+++ b/build/release/release.sh
@@ -110,6 +110,9 @@ upload_nexus_staging() {
echo "Deploying celeborn-client-flink-1.15-shaded_2.12"
${PROJECT_DIR}/build/sbt -Pflink-1.15
"clean;celeborn-client-flink-1_15-shaded/publishSigned"
+ echo "Deploying celeborn-client-flink-1.16-shaded_2.12"
+ ${PROJECT_DIR}/build/sbt -Pflink-1.16
"clean;celeborn-client-flink-1_16-shaded/publishSigned"
+
echo "Deploying celeborn-client-flink-1.17-shaded_2.12"
${PROJECT_DIR}/build/sbt -Pflink-1.17
"clean;celeborn-client-flink-1_17-shaded/publishSigned"
diff --git a/client-flink/flink-1.16-shaded/pom.xml
b/client-flink/flink-1.16-shaded/pom.xml
new file mode 100644
index 000000000..4eb0fbfb5
--- /dev/null
+++ b/client-flink/flink-1.16-shaded/pom.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+
<artifactId>celeborn-client-flink-1.16-shaded_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Shaded Client for Flink 1.16</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-client-flink-1.16_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+
<shadedPattern>${shading.prefix}.com.google.protobuf</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+
<shadedPattern>${shading.prefix}.com.google.common</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>${shading.prefix}.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+
<shadedPattern>${shading.prefix}.org.roaringbitmap</shadedPattern>
+ </relocation>
+ </relocations>
+ <artifactSet>
+ <includes>
+ <include>org.apache.celeborn:*</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>com.google.guava:failureaccess</include>
+ <include>io.netty:*</include>
+ <include>org.apache.commons:commons-lang3</include>
+ <include>org.roaringbitmap:RoaringBitmap</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>**/*.proto</exclude>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ <exclude>**/log4j.properties</exclude>
+ <exclude>META-INF/LICENSE.txt</exclude>
+ <exclude>META-INF/NOTICE.txt</exclude>
+ <exclude>LICENSE.txt</exclude>
+ <exclude>NOTICE.txt</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer>
+ </transformers>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${maven.plugin.antrun.version}</version>
+ <executions>
+ <execution>
+ <id>rename-native-library</id>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <phase>package</phase>
+ <configuration>
+ <target>
+ <echo message="unpacking netty jar"></echo>
+ <unzip dest="${project.build.directory}/unpacked/"
src="${project.build.directory}/${artifactId}-${version}.jar"></unzip>
+ <echo message="renaming native epoll library"></echo>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_x86_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_x86_64.so"
type="glob"></mapper>
+ </move>
+ <move includeemptydirs="false"
todir="${project.build.directory}/unpacked/META-INF/native">
+ <fileset
dir="${project.build.directory}/unpacked/META-INF/native"></fileset>
+ <mapper from="libnetty_transport_native_epoll_aarch_64.so"
to="liborg_apache_celeborn_shaded_netty_transport_native_epoll_aarch_64.so"
type="glob"></mapper>
+ </move>
+ <echo message="deleting native kqueue library"></echo>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_x86_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_transport_native_kqueue_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_aarch_64.jnilib"></delete>
+ <delete
file="${project.build.directory}/unpacked/META-INF/native/libnetty_resolver_dns_native_macos_x86_64.jnilib"></delete>
+ <echo message="repackaging netty jar"></echo>
+ <jar basedir="${project.build.directory}/unpacked"
destfile="${project.build.directory}/${artifactId}-${version}.jar"></jar>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/client-flink/flink-1.16-shaded/src/main/resources/META-INF/LICENSE
b/client-flink/flink-1.16-shaded/src/main/resources/META-INF/LICENSE
new file mode 100644
index 000000000..924ef2c85
--- /dev/null
+++ b/client-flink/flink-1.16-shaded/src/main/resources/META-INF/LICENSE
@@ -0,0 +1,248 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+------------------------------------------------------------------------------------
+This project bundles the following dependencies under the Apache License 2.0
(http://www.apache.org/licenses/LICENSE-2.0.txt):
+
+
+Apache License 2.0
+--------------------------------------
+
+com.google.guava:failureaccess
+com.google.guava:guava
+io.netty:netty-all
+io.netty:netty-buffer
+io.netty:netty-codec
+io.netty:netty-codec-dns
+io.netty:netty-codec-haproxy
+io.netty:netty-codec-http
+io.netty:netty-codec-http2
+io.netty:netty-codec-memcache
+io.netty:netty-codec-mqtt
+io.netty:netty-codec-redis
+io.netty:netty-codec-smtp
+io.netty:netty-codec-socks
+io.netty:netty-codec-stomp
+io.netty:netty-codec-xml
+io.netty:netty-common
+io.netty:netty-handler
+io.netty:netty-handler-proxy
+io.netty:netty-resolver
+io.netty:netty-resolver-dns
+io.netty:netty-transport
+io.netty:netty-transport-classes-epoll
+io.netty:netty-transport-classes-kqueue
+io.netty:netty-transport-native-epoll
+io.netty:netty-transport-native-kqueue
+io.netty:netty-transport-native-unix-common
+io.netty:netty-transport-rxtx
+io.netty:netty-transport-sctp
+io.netty:netty-transport-udt
+org.apache.commons:commons-lang3
+org.roaringbitmap:RoaringBitmap
+
+
+BSD 3-clause
+------------
+See licenses/LICENSE-protobuf.txt for details.
+com.google.protobuf:protobuf-java
diff --git a/client-flink/flink-1.16-shaded/src/main/resources/META-INF/NOTICE
b/client-flink/flink-1.16-shaded/src/main/resources/META-INF/NOTICE
new file mode 100644
index 000000000..43452a38a
--- /dev/null
+++ b/client-flink/flink-1.16-shaded/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,45 @@
+
+Apache Celeborn
+Copyright 2022-2024 The Apache Software Foundation.
+
+This product includes software developed at
+The Apache Software Foundation (https://www.apache.org/).
+
+Apache Spark
+Copyright 2014 and onwards The Apache Software Foundation
+
+Apache Kyuubi
+Copyright 2021-2023 The Apache Software Foundation
+
+Apache Iceberg
+Copyright 2017-2022 The Apache Software Foundation
+
+Apache Parquet MR
+Copyright 2014-2024 The Apache Software Foundation
+
+This project includes code from Kite, developed at Cloudera, Inc. with
+the following copyright notice:
+
+| Copyright 2013 Cloudera Inc.
+|
+| Licensed under the Apache License, Version 2.0 (the "License");
+| you may not use this file except in compliance with the License.
+| You may obtain a copy of the License at
+|
+| http://www.apache.org/licenses/LICENSE-2.0
+|
+| Unless required by applicable law or agreed to in writing, software
+| distributed under the License is distributed on an "AS IS" BASIS,
+| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+| See the License for the specific language governing permissions and
+| limitations under the License.
+
+Remote Shuffle Service for Flink
+Copyright 2021 The Flink Remote Shuffle Project
+
+=============================================================================
+= NOTICE file corresponding to section 4d of the Apache License Version 2.0 =
+=============================================================================
+
+Apache Commons Lang
+Copyright 2001-2021 The Apache Software Foundation
diff --git
a/client-flink/flink-1.16-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
b/client-flink/flink-1.16-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
new file mode 100644
index 000000000..b4350ec83
--- /dev/null
+++
b/client-flink/flink-1.16-shaded/src/main/resources/META-INF/licenses/LICENSE-protobuf.txt
@@ -0,0 +1,42 @@
+This license applies to all parts of Protocol Buffers except the following:
+
+ - Atomicops support for generic gcc, located in
+ src/google/protobuf/stubs/atomicops_internals_generic_gcc.h.
+ This file is copyrighted by Red Hat Inc.
+
+ - Atomicops support for AIX/POWER, located in
+ src/google/protobuf/stubs/atomicops_internals_aix.h.
+ This file is copyrighted by Bloomberg Finance LP.
+
+Copyright 2014, Google Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+ * Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above
+copyright notice, this list of conditions and the following disclaimer
+in the documentation and/or other materials provided with the
+distribution.
+ * Neither the name of Google Inc. nor the names of its
+contributors may be used to endorse or promote products derived from
+this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+Code generated by the Protocol Buffer compiler is owned by the owner
+of the input file used when generating it. This code is not
+standalone and requires a support library to be linked with it. This
+support library is itself covered by the above license.
\ No newline at end of file
diff --git a/client-flink/flink-1.16/pom.xml b/client-flink/flink-1.16/pom.xml
new file mode 100644
index 000000000..1caac0b4b
--- /dev/null
+++ b/client-flink/flink-1.16/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one or more
+~ contributor license agreements. See the NOTICE file distributed with
+~ this work for additional information regarding copyright ownership.
+~ The ASF licenses this file to You under the Apache License, Version 2.0
+~ (the "License"); you may not use this file except in compliance with
+~ the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing, software
+~ distributed under the License is distributed on an "AS IS" BASIS,
+~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+~ See the License for the specific language governing permissions and
+~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-parent_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>celeborn-client-flink-1.16_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Client for Flink 1.16</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+ <artifactId>celeborn-client_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-client-flink-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-1.2-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
new file mode 100644
index 000000000..f76a18478
--- /dev/null
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleEnvironment.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/**
+ * The implementation of {@link ShuffleEnvironment} based on the remote
shuffle service, providing
+ * shuffle environment on flink TM side.
+ */
+public class RemoteShuffleEnvironment extends AbstractRemoteShuffleEnvironment
+ implements ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate> {
+
+ /** Factory class to create {@link RemoteShuffleResultPartition}. */
+ private final RemoteShuffleResultPartitionFactory resultPartitionFactory;
+
+ private final RemoteShuffleInputGateFactory inputGateFactory;
+
+ /**
+ * @param networkBufferPool Network buffer pool for shuffle read and shuffle
write.
+ * @param resultPartitionManager A trivial {@link ResultPartitionManager}.
+ * @param resultPartitionFactory Factory class to create {@link
RemoteShuffleResultPartition}.
+ * @param inputGateFactory Factory class to create {@link
RemoteShuffleInputGate}.
+ */
+ public RemoteShuffleEnvironment(
+ NetworkBufferPool networkBufferPool,
+ ResultPartitionManager resultPartitionManager,
+ RemoteShuffleResultPartitionFactory resultPartitionFactory,
+ RemoteShuffleInputGateFactory inputGateFactory,
+ CelebornConf conf) {
+
+ super(networkBufferPool, resultPartitionManager, conf);
+ this.resultPartitionFactory = resultPartitionFactory;
+ this.inputGateFactory = inputGateFactory;
+ }
+
+ @Override
+ public ResultPartitionWriter createResultPartitionWriterInternal(
+ ShuffleIOOwnerContext ownerContext,
+ int index,
+ ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor,
+ CelebornConf conf) {
+ return resultPartitionFactory.create(
+ ownerContext.getOwnerName(), index,
resultPartitionDeploymentDescriptor, conf);
+ }
+
+ @Override
+ IndexedInputGate createInputGateInternal(
+ ShuffleIOOwnerContext ownerContext, int gateIndex,
InputGateDeploymentDescriptor igdd) {
+ return inputGateFactory.create(ownerContext.getOwnerName(), gateIndex,
igdd);
+ }
+
+ @VisibleForTesting
+ RemoteShuffleResultPartitionFactory getResultPartitionFactory() {
+ return resultPartitionFactory;
+ }
+}
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
new file mode 100644
index 000000000..6f34177c1
--- /dev/null
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGate.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentProvider;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.SubpartitionIndexRange;
+import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.throughput.ThroughputCalculator;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.clock.SystemClock;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** A {@link IndexedInputGate} which ingest data from remote shuffle workers.
*/
+public class RemoteShuffleInputGate extends IndexedInputGate {
+
+ private final RemoteShuffleInputGateDelegation inputGateDelegation;
+
+ public RemoteShuffleInputGate(
+ CelebornConf celebornConf,
+ String taskName,
+ int gateIndex,
+ InputGateDeploymentDescriptor gateDescriptor,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ BufferDecompressor bufferDecompressor,
+ int numConcurrentReading) {
+
+ inputGateDelegation =
+ new RemoteShuffleInputGateDelegation(
+ celebornConf,
+ taskName,
+ gateIndex,
+ gateDescriptor,
+ bufferPoolFactory,
+ bufferDecompressor,
+ numConcurrentReading,
+ availabilityHelper,
+ gateDescriptor.getConsumedSubpartitionIndexRange().getStartIndex(),
+ gateDescriptor.getConsumedSubpartitionIndexRange().getEndIndex());
+ }
+
+ /** Setup gate and build network connections. */
+ @Override
+ public void setup() throws IOException {
+ inputGateDelegation.setup();
+ }
+
+ /** Index of the gate of the corresponding computing task. */
+ @Override
+ public int getGateIndex() {
+ return inputGateDelegation.getGateIndex();
+ }
+
+ /** Get number of input channels. A channel is a data flow from one shuffle
worker. */
+ @Override
+ public int getNumberOfInputChannels() {
+ return inputGateDelegation.getBufferReaders().size();
+ }
+
+ /** Whether reading is finished -- all channels are finished and cached
buffers are drained. */
+ @Override
+ public boolean isFinished() {
+ return inputGateDelegation.isFinished();
+ }
+
+ @Override
+ public Optional<BufferOrEvent> getNext() {
+ throw new UnsupportedOperationException("Not implemented (DataSet API is
not supported).");
+ }
+
+ /** Poll a received {@link BufferOrEvent}. */
+ @Override
+ public Optional<BufferOrEvent> pollNext() throws IOException {
+ return inputGateDelegation.pollNext();
+ }
+
+ /** Close all reading channels inside this {@link RemoteShuffleInputGate}. */
+ @Override
+ public void close() throws Exception {
+ inputGateDelegation.close();
+ }
+
+ /** Get {@link InputChannelInfo}s of this {@link RemoteShuffleInputGate}. */
+ @Override
+ public List<InputChannelInfo> getChannelInfos() {
+ return inputGateDelegation.getChannelsInfo();
+ }
+
+ @Override
+ public void requestPartitions() {
+ // do-nothing
+ }
+
+ @Override
+ public void checkpointStarted(CheckpointBarrier barrier) {
+ // do-nothing.
+ }
+
+ @Override
+ public void checkpointStopped(long cancelledCheckpointId) {
+ // do-nothing.
+ }
+
+ @Override
+ public void triggerDebloating() {
+ // do-nothing.
+ }
+
+ @Override
+ public List<InputChannelInfo> getUnfinishedChannels() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public EndOfDataStatus hasReceivedEndOfData() {
+ if (inputGateDelegation.getPendingEndOfDataEvents() > 0) {
+ return EndOfDataStatus.NOT_END_OF_DATA;
+ } else {
+ // Keep compatibility with streaming mode.
+ return EndOfDataStatus.DRAINED;
+ }
+ }
+
+ @Override
+ public void finishReadRecoveredState() {
+ // do-nothing.
+ }
+
+ @Override
+ public InputChannel getChannel(int channelIndex) {
+ return new FakedRemoteInputChannel(channelIndex);
+ }
+
+ @Override
+ public void sendTaskEvent(TaskEvent event) {
+ throw new FlinkRuntimeException("Method should not be called.");
+ }
+
+ @Override
+ public void resumeConsumption(InputChannelInfo channelInfo) {
+ throw new FlinkRuntimeException("Method should not be called.");
+ }
+
+ @Override
+ public void acknowledgeAllRecordsProcessed(InputChannelInfo
inputChannelInfo) {}
+
+ @Override
+ public CompletableFuture<Void> getStateConsumedFuture() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ReadGate [owning task: %s, gate index: %d, descriptor: %s]",
+ inputGateDelegation.getTaskName(),
+ inputGateDelegation.getGateIndex(),
+ inputGateDelegation.getGateDescriptor().toString());
+ }
+
+ /** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
+ private class FakedRemoteInputChannel extends RemoteInputChannel {
+ FakedRemoteInputChannel(int channelIndex) {
+ super(
+ new SingleInputGate(
+ inputGateDelegation.getTaskName(),
+ inputGateDelegation.getGateIndex(),
+ new IntermediateDataSetID(),
+ ResultPartitionType.BLOCKING,
+ new SubpartitionIndexRange(0, 0),
+ 1,
+ (a, b, c) -> {},
+ () -> null,
+ null,
+ new FakedMemorySegmentProvider(),
+ 0,
+ new ThroughputCalculator(SystemClock.getInstance()),
+ null),
+ channelIndex,
+ new ResultPartitionID(),
+ 0,
+ new ConnectionID(
+ new TaskManagerLocation(ResourceID.generate(),
InetAddress.getLoopbackAddress(), 1),
+ 0),
+ new LocalConnectionManager(),
+ 0,
+ 0,
+ 0,
+ new SimpleCounter(),
+ new SimpleCounter(),
+ new FakedChannelStateWriter());
+ }
+ }
+
+ /** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
+ private static class FakedMemorySegmentProvider implements
MemorySegmentProvider {
+
+ @Override
+ public Collection<MemorySegment> requestUnpooledMemorySegments(int i)
throws IOException {
+ return null;
+ }
+
+ @Override
+ public void recycleUnpooledMemorySegments(Collection<MemorySegment>
collection)
+ throws IOException {}
+ }
+
+ /** Accommodation for the incompleteness of Flink pluggable shuffle service.
*/
+ private static class FakedChannelStateWriter implements ChannelStateWriter {
+
+ @Override
+ public void start(long cpId, CheckpointOptions checkpointOptions) {}
+
+ @Override
+ public void addInputData(
+ long cpId, InputChannelInfo info, int startSeqNum,
CloseableIterator<Buffer> data) {}
+
+ @Override
+ public void addOutputData(
+ long cpId, ResultSubpartitionInfo info, int startSeqNum, Buffer...
data) {}
+
+ @Override
+ public void finishInput(long checkpointId) {}
+
+ @Override
+ public void finishOutput(long checkpointId) {}
+
+ @Override
+ public void abort(long checkpointId, Throwable cause, boolean cleanup) {}
+
+ @Override
+ public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void addOutputDataFuture(
+ long l,
+ ResultSubpartitionInfo resultSubpartitionInfo,
+ int i,
+ CompletableFuture<List<Buffer>> completableFuture)
+ throws IllegalArgumentException {}
+ }
+}
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
new file mode 100644
index 000000000..de0ddab7c
--- /dev/null
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** Factory class to create {@link RemoteShuffleInputGate}. */
+public class RemoteShuffleInputGateFactory extends
AbstractRemoteShuffleInputGateFactory {
+
+ public RemoteShuffleInputGateFactory(
+ CelebornConf conf, NetworkBufferPool networkBufferPool, int
networkBufferSize) {
+ super(conf, networkBufferPool, networkBufferSize);
+ }
+
+ @Override
+ protected RemoteShuffleInputGate createInputGate(
+ String owningTaskName,
+ int gateIndex,
+ InputGateDeploymentDescriptor igdd,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ BufferDecompressor bufferDecompressor) {
+ return new RemoteShuffleInputGate(
+ this.celebornConf,
+ owningTaskName,
+ gateIndex,
+ igdd,
+ bufferPoolFactory,
+ bufferDecompressor,
+ numConcurrentReading);
+ }
+}
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
new file mode 100644
index 000000000..eb46a290d
--- /dev/null
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartition.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+import javax.annotation.Nullable;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.EndOfData;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.StopMode;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.Buffer.DataType;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.*;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.plugin.flink.buffer.BufferWithSubpartition;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+import org.apache.celeborn.plugin.flink.utils.Utils;
+
+/**
+ * A {@link ResultPartition} which appends records and events to {@link
DataBuffer} and after the
+ * {@link DataBuffer} is full, all data in the {@link DataBuffer} will be
copied and spilled to the
+ * remote shuffle service in subpartition index order sequentially. Large
records that can not be
+ * appended to an empty {@link DataBuffer} will be spilled directly.
+ */
+public class RemoteShuffleResultPartition extends ResultPartition {
+
+ private final RemoteShuffleResultPartitionDelegation delegation;
+
+ public RemoteShuffleResultPartition(
+ String owningTaskName,
+ int partitionIndex,
+ ResultPartitionID partitionId,
+ ResultPartitionType partitionType,
+ int numSubpartitions,
+ int numTargetKeyGroups,
+ int networkBufferSize,
+ ResultPartitionManager partitionManager,
+ @Nullable BufferCompressor bufferCompressor,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ RemoteShuffleOutputGate outputGate) {
+
+ super(
+ owningTaskName,
+ partitionIndex,
+ partitionId,
+ partitionType,
+ numSubpartitions,
+ numTargetKeyGroups,
+ partitionManager,
+ bufferCompressor,
+ bufferPoolFactory);
+
+ delegation =
+ new RemoteShuffleResultPartitionDelegation(
+ networkBufferSize,
+ outputGate,
+ (bufferWithChannel, isBroadcast) ->
updateStatistics(bufferWithChannel, isBroadcast),
+ numSubpartitions);
+ }
+
+ @Override
+ public void setup() throws IOException {
+ super.setup();
+ BufferUtils.reserveNumRequiredBuffers(bufferPool, 1);
+ delegation.setup(
+ bufferPool,
+ bufferCompressor,
+ buffer -> canBeCompressed(buffer),
+ () -> checkInProduceState());
+ }
+
+ @Override
+ protected void setupInternal() {
+ // do not need to implement
+ }
+
+ @Override
+ public void emitRecord(ByteBuffer record, int targetSubpartition) throws
IOException {
+ delegation.emit(record, targetSubpartition, DataType.DATA_BUFFER, false);
+ }
+
+ @Override
+ public void broadcastRecord(ByteBuffer record) throws IOException {
+ delegation.broadcast(record, DataType.DATA_BUFFER);
+ }
+
+ @Override
+ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent)
throws IOException {
+ Buffer buffer = EventSerializer.toBuffer(event, isPriorityEvent);
+ try {
+ ByteBuffer serializedEvent = buffer.getNioBufferReadable();
+ delegation.broadcast(serializedEvent, buffer.getDataType());
+ } finally {
+ buffer.recycleBuffer();
+ }
+ }
+
+ @Override
+ public void alignedBarrierTimeout(long l) {}
+
+ @Override
+ public void abortCheckpoint(long l, CheckpointException e) {}
+
+ @Override
+ public void finish() throws IOException {
+ Utils.checkState(!isReleased(), "Result partition is already released.");
+ broadcastEvent(EndOfPartitionEvent.INSTANCE, false);
+ delegation.finish();
+ super.finish();
+ }
+
+ @Override
+ public synchronized void close() {
+ delegation.close(() -> super.close());
+ }
+
+ @Override
+ protected void releaseInternal() {
+ // no-op
+ }
+
+ @Override
+ public void flushAll() {
+ delegation.flushAll();
+ }
+
+ @Override
+ public void flush(int subpartitionIndex) {
+ flushAll();
+ }
+
+ @Override
+ public CompletableFuture<?> getAvailableFuture() {
+ return AVAILABLE;
+ }
+
+ @Override
+ public int getNumberOfQueuedBuffers() {
+ return 0;
+ }
+
+ @Override
+ public long getSizeOfQueuedBuffersUnsafe() {
+ return 0;
+ }
+
+ @Override
+ public int getNumberOfQueuedBuffers(int targetSubpartition) {
+ return 0;
+ }
+
+ @Override
+ public ResultSubpartitionView createSubpartitionView(
+ int index, BufferAvailabilityListener availabilityListener) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public void notifyEndOfData(StopMode mode) throws IOException {
+ if (!delegation.isEndOfDataNotified()) {
+ broadcastEvent(new EndOfData(mode), false);
+ delegation.setEndOfDataNotified(true);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> getAllDataProcessedFuture() {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public String toString() {
+ return "ResultPartition "
+ + partitionId.toString()
+ + " ["
+ + partitionType
+ + ", "
+ + numSubpartitions
+ + " subpartitions, shuffle-descriptor: "
+ + delegation.getOutputGate().getShuffleDesc()
+ + "]";
+ }
+
+ @VisibleForTesting
+ public RemoteShuffleResultPartitionDelegation getDelegation() {
+ return delegation;
+ }
+
+ public void updateStatistics(BufferWithSubpartition bufferWithChannel,
boolean isBroadcast) {
+ numBuffersOut.inc(isBroadcast ? numSubpartitions : 1);
+ long readableBytes =
+ (long) bufferWithChannel.getBuffer().readableBytes() -
BufferUtils.HEADER_LENGTH;
+ numBytesProduced.inc(readableBytes);
+ numBytesOut.inc(isBroadcast ? readableBytes * numSubpartitions :
readableBytes);
+ }
+}
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
new file mode 100644
index 000000000..b75435b92
--- /dev/null
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.celeborn.common.CelebornConf;
+
+/** Factory class to create {@link RemoteShuffleResultPartition}. */
+public class RemoteShuffleResultPartitionFactory
+ extends AbstractRemoteShuffleResultPartitionFactory {
+
+ public RemoteShuffleResultPartitionFactory(
+ CelebornConf celebornConf,
+ ResultPartitionManager partitionManager,
+ BufferPoolFactory bufferPoolFactory,
+ int networkBufferSize) {
+
+ super(celebornConf, partitionManager, bufferPoolFactory,
networkBufferSize);
+ }
+
+ @Override
+ ResultPartition createRemoteShuffleResultPartitionInternal(
+ String taskNameWithSubtaskAndId,
+ int partitionIndex,
+ ResultPartitionID id,
+ ResultPartitionType type,
+ int numSubpartitions,
+ int maxParallelism,
+ List<SupplierWithException<BufferPool, IOException>> bufferPoolFactories,
+ CelebornConf celebornConf,
+ int numMappers,
+ BufferCompressor bufferCompressor,
+ RemoteShuffleDescriptor rsd) {
+ return new RemoteShuffleResultPartition(
+ taskNameWithSubtaskAndId,
+ partitionIndex,
+ id,
+ type,
+ numSubpartitions,
+ maxParallelism,
+ networkBufferSize,
+ partitionManager,
+ bufferCompressor,
+ bufferPoolFactories.get(0),
+ new RemoteShuffleOutputGate(
+ rsd,
+ numSubpartitions,
+ networkBufferSize,
+ bufferPoolFactories.get(1),
+ celebornConf,
+ numMappers));
+ }
+}
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
new file mode 100644
index 000000000..bf95317bc
--- /dev/null
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
+
+public class RemoteShuffleServiceFactory extends
AbstractRemoteShuffleServiceFactory
+ implements ShuffleServiceFactory<
+ RemoteShuffleDescriptor, ResultPartitionWriter, IndexedInputGate> {
+
+ @Override
+ public ShuffleMaster<RemoteShuffleDescriptor> createShuffleMaster(
+ ShuffleMasterContext shuffleMasterContext) {
+ return new RemoteShuffleMaster(shuffleMasterContext, new
SimpleResultPartitionAdapter());
+ }
+
+ @Override
+ public ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate>
createShuffleEnvironment(
+ ShuffleEnvironmentContext shuffleEnvironmentContext) {
+ AbstractRemoteShuffleServiceParameters parameters =
+ initializePreCreateShuffleEnvironment(shuffleEnvironmentContext);
+ RemoteShuffleResultPartitionFactory resultPartitionFactory =
+ new RemoteShuffleResultPartitionFactory(
+ parameters.celebornConf,
+ parameters.resultPartitionManager,
+ parameters.networkBufferPool,
+ parameters.bufferSize);
+ RemoteShuffleInputGateFactory inputGateFactory =
+ new RemoteShuffleInputGateFactory(
+ parameters.celebornConf, parameters.networkBufferPool,
parameters.bufferSize);
+
+ return new RemoteShuffleEnvironment(
+ parameters.networkBufferPool,
+ parameters.resultPartitionManager,
+ resultPartitionFactory,
+ inputGateFactory,
+ parameters.celebornConf);
+ }
+}
diff --git
a/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
new file mode 100644
index 000000000..3476b8fff
--- /dev/null
+++
b/client-flink/flink-1.16/src/main/java/org/apache/celeborn/plugin/flink/SimpleResultPartitionAdapter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+
+public class SimpleResultPartitionAdapter implements ResultPartitionAdapter {
+ @Override
+ public boolean isBlockingResultPartition(ResultPartitionType partitionType) {
+ return partitionType.isBlockingOrBlockingPersistentResultPartition();
+ }
+}
diff --git
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
new file mode 100644
index 000000000..079cbc5b1
--- /dev/null
+++
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleMasterTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
+import org.apache.flink.runtime.shuffle.TaskInputsOutputsDescriptor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.utils.FlinkUtils;
+
+public class RemoteShuffleMasterTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RemoteShuffleMasterTest.class);
+ private RemoteShuffleMaster remoteShuffleMaster;
+ private Configuration configuration;
+
+ @Before
+ public void setUp() {
+ configuration = new Configuration();
+ remoteShuffleMaster = createShuffleMaster(configuration);
+ }
+
+ @Test
+ public void testRegisterJob() {
+ JobShuffleContext jobShuffleContext =
createJobShuffleContext(JobID.generate());
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+
+ // reRunRegister job
+ try {
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+ } catch (Exception e) {
+ Assert.assertTrue(true);
+ }
+
+ // unRegister job
+ remoteShuffleMaster.unregisterJob(jobShuffleContext.getJobId());
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+ }
+
+ @Test
+ public void testRegisterPartitionWithProducer()
+ throws UnknownHostException, ExecutionException, InterruptedException {
+ JobID jobID = JobID.generate();
+ JobShuffleContext jobShuffleContext = createJobShuffleContext(jobID);
+ remoteShuffleMaster.registerJob(jobShuffleContext);
+
+ IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+ PartitionDescriptor partitionDescriptor =
createPartitionDescriptor(intermediateDataSetID, 0);
+ ProducerDescriptor producerDescriptor = createProducerDescriptor();
+ RemoteShuffleDescriptor remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ ShuffleResource shuffleResource =
remoteShuffleDescriptor.getShuffleResource();
+ ShuffleResourceDescriptor mapPartitionShuffleDescriptor =
+ shuffleResource.getMapPartitionShuffleDescriptor();
+
+ LOG.info("remoteShuffleDescriptor:{}", remoteShuffleDescriptor);
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getPartitionId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getAttemptId());
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getMapId());
+
+ // use same dataset id
+ partitionDescriptor = createPartitionDescriptor(intermediateDataSetID, 1);
+ remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ mapPartitionShuffleDescriptor =
+
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+
+ // use another attemptId
+ producerDescriptor = createProducerDescriptor();
+ remoteShuffleDescriptor =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID, partitionDescriptor,
producerDescriptor)
+ .get();
+ mapPartitionShuffleDescriptor =
+
remoteShuffleDescriptor.getShuffleResource().getMapPartitionShuffleDescriptor();
+ Assert.assertEquals(0, mapPartitionShuffleDescriptor.getShuffleId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getAttemptId());
+ Assert.assertEquals(1, mapPartitionShuffleDescriptor.getMapId());
+ }
+
+ @Test
+ public void testRegisterMultipleJobs()
+ throws UnknownHostException, ExecutionException, InterruptedException {
+ JobID jobID1 = JobID.generate();
+ JobShuffleContext jobShuffleContext1 = createJobShuffleContext(jobID1);
+ remoteShuffleMaster.registerJob(jobShuffleContext1);
+
+ JobID jobID2 = JobID.generate();
+ JobShuffleContext jobShuffleContext2 = createJobShuffleContext(jobID2);
+ remoteShuffleMaster.registerJob(jobShuffleContext2);
+
+ IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+ PartitionDescriptor partitionDescriptor =
createPartitionDescriptor(intermediateDataSetID, 0);
+ ProducerDescriptor producerDescriptor = createProducerDescriptor();
+ RemoteShuffleDescriptor remoteShuffleDescriptor1 =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID1, partitionDescriptor,
producerDescriptor)
+ .get();
+
+ // use same datasetId but different jobId
+ RemoteShuffleDescriptor remoteShuffleDescriptor2 =
+ remoteShuffleMaster
+ .registerPartitionWithProducer(jobID2, partitionDescriptor,
producerDescriptor)
+ .get();
+
+ Assert.assertEquals(
+ remoteShuffleDescriptor1
+ .getShuffleResource()
+ .getMapPartitionShuffleDescriptor()
+ .getShuffleId(),
+ 0);
+ Assert.assertEquals(
+ remoteShuffleDescriptor2
+ .getShuffleResource()
+ .getMapPartitionShuffleDescriptor()
+ .getShuffleId(),
+ 1);
+ }
+
+ @Test
+ public void testShuffleMemoryAnnouncing() {
+ Map<IntermediateDataSetID, Integer> numberOfInputGateChannels = new
HashMap<>();
+ Map<IntermediateDataSetID, Integer> numbersOfResultSubpartitions = new
HashMap<>();
+ Map<IntermediateDataSetID, ResultPartitionType> resultPartitionTypes = new
HashMap<>();
+ IntermediateDataSetID inputDataSetID0 = new IntermediateDataSetID();
+ IntermediateDataSetID inputDataSetID1 = new IntermediateDataSetID();
+ IntermediateDataSetID outputDataSetID0 = new IntermediateDataSetID();
+ IntermediateDataSetID outputDataSetID1 = new IntermediateDataSetID();
+ IntermediateDataSetID outputDataSetID2 = new IntermediateDataSetID();
+ Random random = new Random();
+ numberOfInputGateChannels.put(inputDataSetID0, random.nextInt(1000));
+ numberOfInputGateChannels.put(inputDataSetID1, random.nextInt(1000));
+ int subPartitionNum1 = random.nextInt(1000);
+ int subPartitionNum2 = random.nextInt(1000);
+ int subPartitionNum3 = random.nextInt(1000);
+ numbersOfResultSubpartitions.put(outputDataSetID0, subPartitionNum1);
+ numbersOfResultSubpartitions.put(outputDataSetID1, subPartitionNum2);
+ numbersOfResultSubpartitions.put(outputDataSetID2, subPartitionNum3);
+ resultPartitionTypes.put(outputDataSetID0, ResultPartitionType.BLOCKING);
+ resultPartitionTypes.put(outputDataSetID1, ResultPartitionType.BLOCKING);
+ resultPartitionTypes.put(outputDataSetID2, ResultPartitionType.BLOCKING);
+ MemorySize calculated =
+ remoteShuffleMaster.computeShuffleMemorySizeForTask(
+ TaskInputsOutputsDescriptor.from(
+ 3, numberOfInputGateChannels, numbersOfResultSubpartitions,
resultPartitionTypes));
+
+ CelebornConf celebornConf = FlinkUtils.toCelebornConf(configuration);
+
+ long numBytesPerGate = celebornConf.clientFlinkMemoryPerInputGate();
+ long expectedInput = 2 * numBytesPerGate;
+
+ long numBytesPerResultPartition =
celebornConf.clientFlinkMemoryPerResultPartition();
+ long expectedOutput = 3 * numBytesPerResultPartition;
+ MemorySize expected = new MemorySize(expectedInput + expectedOutput);
+
+ Assert.assertEquals(expected, calculated);
+ }
+
+ @Test
+ public void testInvalidShuffleConfig() {
+ Assert.assertThrows(
+ String.format(
+ "The config option %s should configure as %s",
+ ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
+ BatchShuffleMode.ALL_EXCHANGES_BLOCKING.name()),
+ IllegalArgumentException.class,
+ () ->
+ createShuffleMaster(
+ new Configuration()
+ .set(
+ ExecutionOptions.BATCH_SHUFFLE_MODE,
+ BatchShuffleMode.ALL_EXCHANGES_PIPELINED)));
+ }
+
+ @After
+ public void tearDown() {
+ if (remoteShuffleMaster != null) {
+ try {
+ remoteShuffleMaster.close();
+ } catch (Exception e) {
+ LOG.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ public RemoteShuffleMaster createShuffleMaster(Configuration configuration) {
+ remoteShuffleMaster =
+ new RemoteShuffleMaster(
+ new ShuffleMasterContext() {
+ @Override
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public void onFatalError(Throwable throwable) {
+ System.exit(-1);
+ }
+ },
+ new SimpleResultPartitionAdapter());
+
+ return remoteShuffleMaster;
+ }
+
+ public JobShuffleContext createJobShuffleContext(JobID jobId) {
+ return new JobShuffleContext() {
+ @Override
+ public org.apache.flink.api.common.JobID getJobId() {
+ return jobId;
+ }
+
+ @Override
+ public CompletableFuture<?> stopTrackingAndReleasePartitions(
+ Collection<ResultPartitionID> collection) {
+ return CompletableFuture.completedFuture(null);
+ }
+ };
+ }
+
+ public PartitionDescriptor createPartitionDescriptor(
+ IntermediateDataSetID intermediateDataSetId, int partitionNum) {
+ IntermediateResultPartitionID intermediateResultPartitionId =
+ new IntermediateResultPartitionID(intermediateDataSetId, partitionNum);
+ return new PartitionDescriptor(
+ intermediateDataSetId,
+ 10,
+ intermediateResultPartitionId,
+ ResultPartitionType.BLOCKING,
+ 5,
+ 1,
+ false,
+ false);
+ }
+
+ public ProducerDescriptor createProducerDescriptor() throws
UnknownHostException {
+ ExecutionAttemptID executionAttemptId =
+ new ExecutionAttemptID(
+ new ExecutionGraphID(), new ExecutionVertexID(new JobVertexID(0,
0), 0), 0);
+ return new ProducerDescriptor(
+ ResourceID.generate(), executionAttemptId, InetAddress.getLocalHost(),
100);
+ }
+}
diff --git
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java
new file mode 100644
index 000000000..548336db6
--- /dev/null
+++
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionFactorySuiteJ.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.common.protocol.CompressionCodec;
+
+/** Tests for {@link RemoteShuffleResultPartitionFactory}. */
+public class RemoteShuffleResultPartitionFactorySuiteJ {
+
+ @Test
+ public void testGetBufferCompressor() {
+ CelebornConf celebornConf = new CelebornConf();
+ for (CompressionCodec compressionCodec : CompressionCodec.values()) {
+ RemoteShuffleResultPartitionFactory partitionFactory =
+ new RemoteShuffleResultPartitionFactory(
+ celebornConf.set(
+ CelebornConf.SHUFFLE_COMPRESSION_CODEC().key(),
compressionCodec.name()),
+ mock(ResultPartitionManager.class),
+ mock(BufferPoolFactory.class),
+ 1);
+ if (CompressionCodec.NONE.equals(compressionCodec)) {
+ Assert.assertNull(partitionFactory.getBufferCompressor());
+ } else {
+ Assert.assertNotNull(partitionFactory.getBufferCompressor());
+ }
+ }
+ }
+}
diff --git
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
new file mode 100644
index 000000000..023f0b29e
--- /dev/null
+++
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.util.function.SupplierWithException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.celeborn.common.CelebornConf;
+import org.apache.celeborn.plugin.flink.buffer.BufferPacker;
+import org.apache.celeborn.plugin.flink.buffer.DataBuffer;
+import org.apache.celeborn.plugin.flink.readclient.FlinkShuffleClientImpl;
+import org.apache.celeborn.plugin.flink.utils.BufferUtils;
+
+public class RemoteShuffleResultPartitionSuiteJ {
+ private final int networkBufferSize = 32 * 1024;
+ private BufferCompressor bufferCompressor = new
BufferCompressor(networkBufferSize, "lz4");
+ private RemoteShuffleOutputGate remoteShuffleOutputGate =
mock(RemoteShuffleOutputGate.class);
+ private final String compressCodec = "LZ4";
+ private final CelebornConf conf = new CelebornConf();
+ BufferDecompressor bufferDecompressor = new
BufferDecompressor(networkBufferSize, "LZ4");
+
+ private static final int totalBuffers = 1000;
+
+ private static final int bufferSize = 1024;
+
+ private NetworkBufferPool globalBufferPool;
+
+ private BufferPool dataBufferPool;
+
+ private BufferPool nettyBufferPool;
+
+ private RemoteShuffleResultPartition partitionWriter;
+
+ private FakedRemoteShuffleOutputGate outputGate;
+
+ @Before
+ public void setup() {
+ globalBufferPool = new NetworkBufferPool(totalBuffers, bufferSize);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (outputGate != null) {
+ outputGate.release();
+ }
+
+ if (dataBufferPool != null) {
+ dataBufferPool.lazyDestroy();
+ }
+ if (nettyBufferPool != null) {
+ nettyBufferPool.lazyDestroy();
+ }
+ assertEquals(totalBuffers,
globalBufferPool.getNumberOfAvailableMemorySegments());
+ globalBufferPool.destroy();
+ }
+
+ @Test
+ public void tesSimpleFlush() throws IOException, InterruptedException {
+ List<SupplierWithException<BufferPool, IOException>> bufferPool =
createBufferPoolFactory();
+ RemoteShuffleResultPartition remoteShuffleResultPartition =
+ new RemoteShuffleResultPartition(
+ "test",
+ 0,
+ new ResultPartitionID(),
+ ResultPartitionType.BLOCKING,
+ 2,
+ 2,
+ 32 * 1024,
+ new ResultPartitionManager(),
+ bufferCompressor,
+ bufferPool.get(0),
+ remoteShuffleOutputGate);
+ remoteShuffleResultPartition.setup();
+ doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
+ doNothing().when(remoteShuffleOutputGate).regionFinish();
+
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
+ DataBuffer dataBuffer =
remoteShuffleResultPartition.getDelegation().getUnicastDataBuffer();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
+ dataBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.getDelegation().flushDataBuffer(dataBuffer,
true);
+ }
+
+ private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
+ NetworkBufferPool networkBufferPool =
+ new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofSeconds(1));
+
+ int numBuffersPerPartition = 64 * 1024 / 32;
+ int numForResultPartition = numBuffersPerPartition * 7 / 8;
+ int numForOutputGate = numBuffersPerPartition - numForResultPartition;
+
+ List<SupplierWithException<BufferPool, IOException>> factories = new
ArrayList<>();
+ factories.add(
+ () -> networkBufferPool.createBufferPool(numForResultPartition,
numForResultPartition));
+ factories.add(() -> networkBufferPool.createBufferPool(numForOutputGate,
numForOutputGate));
+ return factories;
+ }
+
+ @Test
+ public void testWriteNormalRecordWithCompressionEnabled() throws Exception {
+ testWriteNormalRecord(true);
+ }
+
+ @Test
+ public void testWriteNormalRecordWithCompressionDisabled() throws Exception {
+ testWriteNormalRecord(false);
+ }
+
+ @Test
+ public void testWriteLargeRecord() throws Exception {
+ int numSubpartitions = 2;
+ int numBuffers = 100;
+ initResultPartitionWriter(numSubpartitions, 10, 200, false, conf, 10);
+
+ partitionWriter.setup();
+
+ byte[] dataWritten = new byte[bufferSize * numBuffers];
+ Random random = new Random();
+ random.nextBytes(dataWritten);
+ ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
+ partitionWriter.emitRecord(recordWritten, 0);
+ assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.finish();
+ partitionWriter.close();
+
+ List<Buffer> receivedBuffers = outputGate.getReceivedBuffers()[0];
+
+ ByteBuffer recordRead = ByteBuffer.allocate(bufferSize * numBuffers);
+ for (Buffer buffer : receivedBuffers) {
+ if (buffer.isBuffer()) {
+ recordRead.put(
+ buffer.getNioBuffer(
+ BufferUtils.HEADER_LENGTH, buffer.readableBytes() -
BufferUtils.HEADER_LENGTH));
+ }
+ }
+ recordWritten.rewind();
+ recordRead.flip();
+ assertEquals(recordWritten, recordRead);
+ }
+
+ @Test
+ public void testBroadcastLargeRecord() throws Exception {
+ int numSubpartitions = 2;
+ int numBuffers = 100;
+ initResultPartitionWriter(numSubpartitions, 10, 200, false, conf, 10);
+
+ partitionWriter.setup();
+
+ byte[] dataWritten = new byte[bufferSize * numBuffers];
+ Random random = new Random();
+ random.nextBytes(dataWritten);
+ ByteBuffer recordWritten = ByteBuffer.wrap(dataWritten);
+ partitionWriter.broadcastRecord(recordWritten);
+ assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.finish();
+ partitionWriter.close();
+
+ ByteBuffer recordRead0 = ByteBuffer.allocate(bufferSize * numBuffers);
+ for (Buffer buffer : outputGate.getReceivedBuffers()[0]) {
+ if (buffer.isBuffer()) {
+ recordRead0.put(
+ buffer.getNioBuffer(
+ BufferUtils.HEADER_LENGTH, buffer.readableBytes() -
BufferUtils.HEADER_LENGTH));
+ }
+ }
+ recordWritten.rewind();
+ recordRead0.flip();
+ assertEquals(recordWritten, recordRead0);
+
+ ByteBuffer recordRead1 = ByteBuffer.allocate(bufferSize * numBuffers);
+ for (Buffer buffer : outputGate.getReceivedBuffers()[1]) {
+ if (buffer.isBuffer()) {
+ recordRead1.put(
+ buffer.getNioBuffer(
+ BufferUtils.HEADER_LENGTH, buffer.readableBytes() -
BufferUtils.HEADER_LENGTH));
+ }
+ }
+ recordWritten.rewind();
+ recordRead1.flip();
+ assertEquals(recordWritten, recordRead0);
+ }
+
+ @Test
+ public void testFlush() throws Exception {
+ int numSubpartitions = 10;
+
+ initResultPartitionWriter(numSubpartitions, 10, 20, false, conf, 10);
+ partitionWriter.setup();
+
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 1);
+ assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.broadcastRecord(ByteBuffer.allocate(bufferSize));
+ assertEquals(2, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.flush(0);
+ assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 2);
+ partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 3);
+ assertEquals(3, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.flushAll();
+ assertEquals(0, dataBufferPool.bestEffortGetNumOfUsedBuffers());
+
+ partitionWriter.finish();
+ partitionWriter.close();
+ }
+
+ private void testWriteNormalRecord(boolean compressionEnabled) throws
Exception {
+ int numSubpartitions = 4;
+ int numRecords = 100;
+ Random random = new Random();
+
+ initResultPartitionWriter(numSubpartitions, 100, 500, compressionEnabled,
conf, 10);
+ partitionWriter.setup();
+ assertTrue(outputGate.isSetup());
+
+ Queue<DataAndType>[] dataWritten = new Queue[numSubpartitions];
+ IntStream.range(0, numSubpartitions).forEach(i -> dataWritten[i] = new
ArrayDeque<>());
+ int[] numBytesWritten = new int[numSubpartitions];
+ Arrays.fill(numBytesWritten, 0);
+
+ for (int i = 0; i < numRecords; i++) {
+ byte[] data = new byte[random.nextInt(2 * bufferSize) + 1];
+ if (compressionEnabled) {
+ byte randomByte = (byte) random.nextInt();
+ Arrays.fill(data, randomByte);
+ } else {
+ random.nextBytes(data);
+ }
+ ByteBuffer record = ByteBuffer.wrap(data);
+ boolean isBroadCast = random.nextBoolean();
+
+ if (isBroadCast) {
+ partitionWriter.broadcastRecord(record);
+ IntStream.range(0, numSubpartitions)
+ .forEach(
+ subpartition ->
+ recordDataWritten(
+ record,
+ Buffer.DataType.DATA_BUFFER,
+ subpartition,
+ dataWritten,
+ numBytesWritten));
+ } else {
+ int subpartition = random.nextInt(numSubpartitions);
+ partitionWriter.emitRecord(record, subpartition);
+ recordDataWritten(
+ record, Buffer.DataType.DATA_BUFFER, subpartition, dataWritten,
numBytesWritten);
+ }
+ }
+
+ partitionWriter.finish();
+ assertTrue(outputGate.isFinished());
+ partitionWriter.close();
+ assertTrue(outputGate.isClosed());
+
+ for (int subpartition = 0; subpartition < numSubpartitions;
++subpartition) {
+ ByteBuffer record =
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE);
+ recordDataWritten(
+ record, Buffer.DataType.EVENT_BUFFER, subpartition, dataWritten,
numBytesWritten);
+ }
+
+ outputGate
+ .getFinishedRegions()
+ .forEach(
+ regionIndex ->
assertTrue(outputGate.getNumBuffersByRegion().containsKey(regionIndex)));
+
+ int[] numBytesRead = new int[numSubpartitions];
+ List<Buffer>[] receivedBuffers = outputGate.getReceivedBuffers();
+ List<Buffer>[] validateTarget = new List[numSubpartitions];
+ Arrays.fill(numBytesRead, 0);
+ for (int i = 0; i < numSubpartitions; i++) {
+ validateTarget[i] = new ArrayList<>();
+ for (Buffer buffer : receivedBuffers[i]) {
+ for (Buffer unpackedBuffer : BufferPacker.unpack(buffer.asByteBuf())) {
+ if (compressionEnabled && unpackedBuffer.isCompressed()) {
+ Buffer decompressedBuffer =
+
bufferDecompressor.decompressToIntermediateBuffer(unpackedBuffer);
+ ByteBuffer decompressed =
decompressedBuffer.getNioBufferReadable();
+ int numBytes = decompressed.remaining();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ segment.put(0, decompressed, numBytes);
+ decompressedBuffer.recycleBuffer();
+ validateTarget[i].add(
+ new NetworkBuffer(segment, buf -> {},
unpackedBuffer.getDataType(), numBytes));
+ numBytesRead[i] += numBytes;
+ } else {
+ numBytesRead[i] += buffer.readableBytes();
+ validateTarget[i].add(buffer);
+ }
+ }
+ }
+ }
+ IntStream.range(0, numSubpartitions).forEach(subpartitions -> {});
+ checkWriteReadResult(
+ numSubpartitions, numBytesWritten, numBytesWritten, dataWritten,
validateTarget);
+ }
+
+ private void initResultPartitionWriter(
+ int numSubpartitions,
+ int dataBufferPoolSize,
+ int nettyBufferPoolSize,
+ boolean compressionEnabled,
+ CelebornConf conf,
+ int numMappers)
+ throws Exception {
+
+ dataBufferPool = globalBufferPool.createBufferPool(dataBufferPoolSize,
dataBufferPoolSize);
+ nettyBufferPool = globalBufferPool.createBufferPool(nettyBufferPoolSize,
nettyBufferPoolSize);
+
+ outputGate =
+ new FakedRemoteShuffleOutputGate(
+ getShuffleDescriptor(), numSubpartitions, () -> nettyBufferPool,
conf, numMappers);
+ outputGate.setup();
+
+ if (compressionEnabled) {
+ partitionWriter =
+ new RemoteShuffleResultPartition(
+ "RemoteShuffleResultPartitionWriterTest",
+ 0,
+ new ResultPartitionID(),
+ ResultPartitionType.BLOCKING,
+ numSubpartitions,
+ numSubpartitions,
+ bufferSize,
+ new ResultPartitionManager(),
+ bufferCompressor,
+ () -> dataBufferPool,
+ outputGate);
+ } else {
+ partitionWriter =
+ new RemoteShuffleResultPartition(
+ "RemoteShuffleResultPartitionWriterTest",
+ 0,
+ new ResultPartitionID(),
+ ResultPartitionType.BLOCKING,
+ numSubpartitions,
+ numSubpartitions,
+ bufferSize,
+ new ResultPartitionManager(),
+ null,
+ () -> dataBufferPool,
+ outputGate);
+ }
+ }
+
+ private void recordDataWritten(
+ ByteBuffer record,
+ Buffer.DataType dataType,
+ int subpartition,
+ Queue<DataAndType>[] dataWritten,
+ int[] numBytesWritten) {
+
+ record.rewind();
+ dataWritten[subpartition].add(new DataAndType(record, dataType));
+ numBytesWritten[subpartition] += record.remaining();
+ }
+
+ private static class FakedRemoteShuffleOutputGate extends
RemoteShuffleOutputGate {
+
+ private boolean isSetup;
+ private boolean isFinished;
+ private boolean isClosed;
+ private final List<Buffer>[] receivedBuffers;
+ private final Map<Integer, Integer> numBuffersByRegion;
+ private final Set<Integer> finishedRegions;
+ private int currentRegionIndex;
+ private boolean currentIsBroadcast;
+
+ FakedRemoteShuffleOutputGate(
+ RemoteShuffleDescriptor shuffleDescriptor,
+ int numSubpartitions,
+ SupplierWithException<BufferPool, IOException> bufferPoolFactory,
+ CelebornConf celebornConf,
+ int numMappers) {
+
+ super(
+ shuffleDescriptor,
+ numSubpartitions,
+ bufferSize,
+ bufferPoolFactory,
+ celebornConf,
+ numMappers);
+ isSetup = false;
+ isFinished = false;
+ isClosed = false;
+ numBuffersByRegion = new HashMap<>();
+ finishedRegions = new HashSet<>();
+ currentRegionIndex = -1;
+ receivedBuffers = new ArrayList[numSubpartitions];
+ IntStream.range(0, numSubpartitions).forEach(i -> receivedBuffers[i] =
new ArrayList<>());
+ currentIsBroadcast = false;
+ }
+
+ @Override
+ FlinkShuffleClientImpl getShuffleClient() {
+ FlinkShuffleClientImpl client = mock(FlinkShuffleClientImpl.class);
+ doNothing().when(client).cleanup(anyInt(), anyInt(), anyInt());
+ return client;
+ }
+
+ @Override
+ public void setup() throws IOException, InterruptedException {
+ bufferPool = bufferPoolFactory.get();
+ isSetup = true;
+ }
+
+ @Override
+ public void write(Buffer buffer, int subIdx) {
+ if (currentIsBroadcast) {
+ assertEquals(0, subIdx);
+ ByteBuffer byteBuffer = buffer.getNioBufferReadable();
+ for (int i = 0; i < numSubs; i++) {
+ int numBytes = buffer.readableBytes();
+ MemorySegment segment =
MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+ byteBuffer.rewind();
+ segment.put(0, byteBuffer, numBytes);
+ receivedBuffers[i].add(
+ new NetworkBuffer(
+ segment, buf -> {}, buffer.getDataType(),
buffer.isCompressed(), numBytes));
+ }
+ buffer.recycleBuffer();
+ } else {
+ receivedBuffers[subIdx].add(buffer);
+ }
+ if (numBuffersByRegion.containsKey(currentRegionIndex)) {
+ int prev = numBuffersByRegion.get(currentRegionIndex);
+ numBuffersByRegion.put(currentRegionIndex, prev + 1);
+ } else {
+ numBuffersByRegion.put(currentRegionIndex, 1);
+ }
+ }
+
+ @Override
+ public void regionStart(boolean isBroadcast) {
+ currentIsBroadcast = isBroadcast;
+ currentRegionIndex++;
+ }
+
+ @Override
+ public void regionFinish() {
+ if (finishedRegions.contains(currentRegionIndex)) {
+ throw new IllegalStateException("Unexpected region: " +
currentRegionIndex);
+ }
+ finishedRegions.add(currentRegionIndex);
+ }
+
+ @Override
+ public void finish() throws InterruptedException {
+ isFinished = true;
+ }
+
+ @Override
+ public void close() {
+ isClosed = true;
+ }
+
+ public List<Buffer>[] getReceivedBuffers() {
+ return receivedBuffers;
+ }
+
+ public Map<Integer, Integer> getNumBuffersByRegion() {
+ return numBuffersByRegion;
+ }
+
+ public Set<Integer> getFinishedRegions() {
+ return finishedRegions;
+ }
+
+ public boolean isSetup() {
+ return isSetup;
+ }
+
+ public boolean isFinished() {
+ return isFinished;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ public void release() throws Exception {
+ IntStream.range(0, numSubs)
+ .forEach(
+ subpartitionIndex -> {
+
receivedBuffers[subpartitionIndex].forEach(Buffer::recycleBuffer);
+ receivedBuffers[subpartitionIndex].clear();
+ });
+ numBuffersByRegion.clear();
+ finishedRegions.clear();
+ super.close();
+ }
+ }
+
+ private RemoteShuffleDescriptor getShuffleDescriptor() throws Exception {
+ Random random = new Random();
+ byte[] bytes = new byte[16];
+ random.nextBytes(bytes);
+ return new RemoteShuffleDescriptor(
+ new JobID(bytes).toString(),
+ new JobID(bytes),
+ new JobID(bytes).toString(),
+ new ResultPartitionID(),
+ new RemoteShuffleResource(
+ "1", 2, System.currentTimeMillis(), new
ShuffleResourceDescriptor(1, 1, 1, 0)));
+ }
+
+ /** Data written and its {@link Buffer.DataType}. */
+ public static class DataAndType {
+ private final ByteBuffer data;
+ private final Buffer.DataType dataType;
+
+ DataAndType(ByteBuffer data, Buffer.DataType dataType) {
+ this.data = data;
+ this.dataType = dataType;
+ }
+ }
+
+ public static void checkWriteReadResult(
+ int numSubpartitions,
+ int[] numBytesWritten,
+ int[] numBytesRead,
+ Queue<DataAndType>[] dataWritten,
+ Collection<Buffer>[] buffersRead) {
+ for (int subpartitionIndex = 0; subpartitionIndex < numSubpartitions;
++subpartitionIndex) {
+ assertEquals(numBytesWritten[subpartitionIndex],
numBytesRead[subpartitionIndex]);
+
+ List<DataAndType> eventsWritten = new ArrayList<>();
+ List<Buffer> eventsRead = new ArrayList<>();
+
+ ByteBuffer subpartitionDataWritten =
ByteBuffer.allocate(numBytesWritten[subpartitionIndex]);
+ for (DataAndType dataAndType : dataWritten[subpartitionIndex]) {
+ subpartitionDataWritten.put(dataAndType.data);
+ dataAndType.data.rewind();
+ if (dataAndType.dataType.isEvent()) {
+ eventsWritten.add(dataAndType);
+ }
+ }
+
+ ByteBuffer subpartitionDataRead =
ByteBuffer.allocate(numBytesRead[subpartitionIndex]);
+ for (Buffer buffer : buffersRead[subpartitionIndex]) {
+ subpartitionDataRead.put(buffer.getNioBufferReadable());
+ if (!buffer.isBuffer()) {
+ eventsRead.add(buffer);
+ }
+ }
+
+ subpartitionDataWritten.flip();
+ subpartitionDataRead.flip();
+ assertEquals(subpartitionDataWritten, subpartitionDataRead);
+
+ assertEquals(eventsWritten.size(), eventsRead.size());
+ for (int i = 0; i < eventsWritten.size(); ++i) {
+ assertEquals(eventsWritten.get(i).dataType,
eventsRead.get(i).getDataType());
+ assertEquals(eventsWritten.get(i).data,
eventsRead.get(i).getNioBufferReadable());
+ }
+ }
+ }
+}
diff --git
a/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
new file mode 100644
index 000000000..73dbb6a9d
--- /dev/null
+++
b/client-flink/flink-1.16/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleServiceFactorySuitJ.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.celeborn.plugin.flink;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
+import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RemoteShuffleServiceFactorySuitJ {
+ @Test
+ public void testCreateShuffleEnvironment() {
+ RemoteShuffleServiceFactory remoteShuffleServiceFactory = new
RemoteShuffleServiceFactory();
+ ShuffleEnvironmentContext shuffleEnvironmentContext =
mock(ShuffleEnvironmentContext.class);
+ when(shuffleEnvironmentContext.getConfiguration()).thenReturn(new
Configuration());
+ when(shuffleEnvironmentContext.getNetworkMemorySize())
+ .thenReturn(new MemorySize(64 * 1024 * 1024));
+ MetricGroup parentMeric = mock(MetricGroup.class);
+
when(shuffleEnvironmentContext.getParentMetricGroup()).thenReturn(parentMeric);
+ MetricGroup childGroup = mock(MetricGroup.class);
+ MetricGroup childChildGroup = mock(MetricGroup.class);
+ when(parentMeric.addGroup(anyString())).thenReturn(childGroup);
+ when(childGroup.addGroup(any())).thenReturn(childChildGroup);
+ when(childChildGroup.gauge(any(), any())).thenReturn(null);
+ ShuffleEnvironment<ResultPartitionWriter, IndexedInputGate>
shuffleEnvironment =
+
remoteShuffleServiceFactory.createShuffleEnvironment(shuffleEnvironmentContext);
+ Assert.assertEquals(
+ 32 * 1024,
+ ((RemoteShuffleEnvironment) shuffleEnvironment)
+ .getResultPartitionFactory()
+ .getNetworkBufferSize());
+ }
+}
diff --git a/dev/dependencies.sh b/dev/dependencies.sh
index 11fada9ef..1e2888962 100755
--- a/dev/dependencies.sh
+++ b/dev/dependencies.sh
@@ -182,6 +182,10 @@ case "$MODULE" in
MVN_MODULES="client-flink/flink-1.15"
SBT_PROJECT="celeborn-client-flink-1_15"
;;
+ "flink-1.16")
+ MVN_MODULES="client-flink/flink-1.16"
+ SBT_PROJECT="celeborn-client-flink-1_16"
+ ;;
"flink-1.17")
MVN_MODULES="client-flink/flink-1.17"
SBT_PROJECT="celeborn-client-flink-1_17"
diff --git a/dev/reformat b/dev/reformat
index 6cca2ecbf..e848c189d 100755
--- a/dev/reformat
+++ b/dev/reformat
@@ -25,6 +25,7 @@ if [ "$1" == "--web" ]; then
else
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.14
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.15
+ ${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.16
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.17
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.18
${PROJECT_DIR}/build/mvn spotless:apply -Pflink-1.19
diff --git a/docs/developers/sbt.md b/docs/developers/sbt.md
index 638e3cfa6..0233a5426 100644
--- a/docs/developers/sbt.md
+++ b/docs/developers/sbt.md
@@ -37,6 +37,7 @@ The following table indicates the compatibility of Celeborn
Spark and Flink clie
| Spark 3.5 | ❌ | ✔ | ✔ |
✔ | ✔ | ✔ | ✔
|
| Flink 1.14 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.15 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
+| Flink 1.16 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.17 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.18 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
| Flink 1.19 | ❌ | ✔ | ✔ |
❌ | ❌ | ❌ | ❌
|
diff --git a/pom.xml b/pom.xml
index 8ada82b3f..eae2f1dbc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1565,7 +1565,7 @@
<flink.version>1.14.6</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.14_${scala.binary.version}</celeborn.flink.plugin.artifact>
-
<flink.streamig.artifact>flink-streaming-java_${scala.binary.version}</flink.streamig.artifact>
+
<flink.streaming.artifact>flink-streaming-java_${scala.binary.version}</flink.streaming.artifact>
<flink.clients.artifact>flink-clients_${scala.binary.version}</flink.clients.artifact>
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
<flink.runtime.web.artifact>flink-runtime-web_${scala.binary.version}</flink.runtime.web.artifact>
@@ -1585,7 +1585,27 @@
<flink.binary.version>1.15</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.15_${scala.binary.version}</celeborn.flink.plugin.artifact>
- <flink.streamig.artifact>flink-streaming-java</flink.streamig.artifact>
+
<flink.streaming.artifact>flink-streaming-java</flink.streaming.artifact>
+ <flink.clients.artifact>flink-clients</flink.clients.artifact>
+
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
+
<flink.runtime.web.artifact>flink-runtime-web</flink.runtime.web.artifact>
+ </properties>
+ </profile>
+
+ <profile>
+ <id>flink-1.16</id>
+ <modules>
+ <module>client-flink/common</module>
+ <module>client-flink/flink-1.16</module>
+ <module>client-flink/flink-1.16-shaded</module>
+ <module>tests/flink-it</module>
+ </modules>
+ <properties>
+ <flink.version>1.16.3</flink.version>
+ <flink.binary.version>1.16</flink.binary.version>
+ <scala.binary.version>2.12</scala.binary.version>
+
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.16_${scala.binary.version}</celeborn.flink.plugin.artifact>
+
<flink.streaming.artifact>flink-streaming-java</flink.streaming.artifact>
<flink.clients.artifact>flink-clients</flink.clients.artifact>
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
<flink.runtime.web.artifact>flink-runtime-web</flink.runtime.web.artifact>
@@ -1605,7 +1625,7 @@
<flink.binary.version>1.17</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.17_${scala.binary.version}</celeborn.flink.plugin.artifact>
- <flink.streamig.artifact>flink-streaming-java</flink.streamig.artifact>
+
<flink.streaming.artifact>flink-streaming-java</flink.streaming.artifact>
<flink.clients.artifact>flink-clients</flink.clients.artifact>
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
<flink.runtime.web.artifact>flink-runtime-web</flink.runtime.web.artifact>
@@ -1625,7 +1645,7 @@
<flink.binary.version>1.18</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.18_${scala.binary.version}</celeborn.flink.plugin.artifact>
- <flink.streamig.artifact>flink-streaming-java</flink.streamig.artifact>
+
<flink.streaming.artifact>flink-streaming-java</flink.streaming.artifact>
<flink.clients.artifact>flink-clients</flink.clients.artifact>
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
<flink.runtime.web.artifact>flink-runtime-web</flink.runtime.web.artifact>
@@ -1645,7 +1665,7 @@
<flink.binary.version>1.19</flink.binary.version>
<scala.binary.version>2.12</scala.binary.version>
<celeborn.flink.plugin.artifact>celeborn-client-flink-1.19_${scala.binary.version}</celeborn.flink.plugin.artifact>
- <flink.streamig.artifact>flink-streaming-java</flink.streamig.artifact>
+
<flink.streaming.artifact>flink-streaming-java</flink.streaming.artifact>
<flink.clients.artifact>flink-clients</flink.clients.artifact>
<flink.scala.artifact>flink-scala_${scala.binary.version}</flink.scala.artifact>
<flink.runtime.web.artifact>flink-runtime-web</flink.runtime.web.artifact>
diff --git a/project/CelebornBuild.scala b/project/CelebornBuild.scala
index 15d45f09c..c73651b59 100644
--- a/project/CelebornBuild.scala
+++ b/project/CelebornBuild.scala
@@ -395,6 +395,7 @@ object Utils {
lazy val flinkClientProjects = FLINK_VERSION match {
case Some("flink-1.14") => Some(Flink114)
case Some("flink-1.15") => Some(Flink115)
+ case Some("flink-1.16") => Some(Flink116)
case Some("flink-1.17") => Some(Flink117)
case Some("flink-1.18") => Some(Flink118)
case Some("flink-1.19") => Some(Flink119)
@@ -918,6 +919,16 @@ object Flink115 extends FlinkClientProjects {
val flinkClientShadedProjectName: String =
"celeborn-client-flink-1_15-shaded"
}
+object Flink116 extends FlinkClientProjects {
+ val flinkVersion = "1.16.3"
+
+ // note that SBT does not allow using the period symbol (.) in project names.
+ val flinkClientProjectPath = "client-flink/flink-1.16"
+ val flinkClientProjectName = "celeborn-client-flink-1_16"
+ val flinkClientShadedProjectPath: String = "client-flink/flink-1.16-shaded"
+ val flinkClientShadedProjectName: String =
"celeborn-client-flink-1_16-shaded"
+}
+
object Flink117 extends FlinkClientProjects {
val flinkVersion = "1.17.2"
diff --git a/tests/flink-it/pom.xml b/tests/flink-it/pom.xml
index 7067feb4a..28c506cc1 100644
--- a/tests/flink-it/pom.xml
+++ b/tests/flink-it/pom.xml
@@ -75,7 +75,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>${flink.streamig.artifact}</artifactId>
+ <artifactId>${flink.streaming.artifact}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>