This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new e45d9e53 Rename Flink modules and enable Flink CI (#1110)
e45d9e53 is described below
commit e45d9e53a80a2cb54c642372d95f3e47f0575be8
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Dec 21 23:52:11 2022 +0800
Rename Flink modules and enable Flink CI (#1110)
---
.github/workflows/license.yml | 1 +
.github/workflows/maven.yml | 32 ++++++
.github/workflows/style.yml | 1 +
client-flink/{flink-common => common}/pom.xml | 14 +--
.../celeborn/plugin/flink/buffer/BufferHeader.java | 0
.../celeborn/plugin/flink/buffer/BufferPacker.java | 0
.../plugin/flink/buffer/PartitionSortedBuffer.java | 0
.../celeborn/plugin/flink/buffer/SortBuffer.java | 0
.../celeborn/plugin/flink/utils/BufferUtils.java | 0
.../apache/celeborn/plugin/flink/utils/Utils.java | 0
.../{flink-shaded => flink-1.14-shaded}/pom.xml | 6 +-
client-flink/flink-1.14/pom.xml | 25 +++--
.../flink/RemoteShuffleResultPartitionSuiteJ.java | 108 ++++++++++-----------
pom.xml | 13 ++-
14 files changed, 116 insertions(+), 84 deletions(-)
diff --git a/.github/workflows/license.yml b/.github/workflows/license.yml
index 098f3046..be166aff 100644
--- a/.github/workflows/license.yml
+++ b/.github/workflows/license.yml
@@ -43,6 +43,7 @@ jobs:
cache: 'maven'
check-latest: false
- run: |
+ build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,flink-1.14
build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,spark-2.4
build/mvn org.apache.rat:apache-rat-plugin:check
-Pgoogle-mirror,spark-3.3
- name: Upload rat report
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 5d3660ef..d24db220 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -76,3 +76,35 @@ jobs:
name: spark-${{ matrix.spark }}-unit-test-log
path: |
**/target/unit-tests.log
+
+ flink:
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ matrix:
+ java:
+ - 8
+ flink:
+ - '1.14'
+ steps:
+ - uses: actions/checkout@v2
+ - name: Setup JDK ${{ matrix.java }}
+ uses: actions/setup-java@v2
+ with:
+ distribution: zulu
+ java-version: ${{ matrix.java }}
+ cache: maven
+ check-latest: false
+ - name: Test with Maven
+ run: |
+ PROFILES="-Pgoogle-mirror,flink-${{ matrix.flink }}"
+ TEST_MODULES="client-flink/common,client-flink/flink-${{
matrix.flink }},client-flink/flink-${{ matrix.flink }}-shaded"
+ build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
+ build/mvn $PROFILES -pl $TEST_MODULES test
+ - name: Upload test log
+ if: failure()
+ uses: actions/upload-artifact@v3
+ with:
+ name: flink-${{ matrix.flink }}-unit-test-log
+ path: |
+ **/target/unit-tests.log
diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml
index d5ec23a8..e7c886e0 100644
--- a/.github/workflows/style.yml
+++ b/.github/workflows/style.yml
@@ -43,5 +43,6 @@ jobs:
cache: 'maven'
check-latest: false
- run: |
+ build/mvn spotless:check -Pgoogle-mirror,flink-1.14
build/mvn spotless:check -Pgoogle-mirror,spark-2.4
build/mvn spotless:check -Pgoogle-mirror,spark-3.3
diff --git a/client-flink/flink-common/pom.xml b/client-flink/common/pom.xml
similarity index 91%
rename from client-flink/flink-common/pom.xml
rename to client-flink/common/pom.xml
index 3b37b482..5a4b6505 100644
--- a/client-flink/flink-common/pom.xml
+++ b/client-flink/common/pom.xml
@@ -24,14 +24,11 @@
<relativePath>../../pom.xml</relativePath>
</parent>
-
<artifactId>celeborn-client-flink-common-${flink.version}_${scala.binary.version}</artifactId>
+
<artifactId>celeborn-client-flink-common-${flink.binary.version}_${scala.binary.version}</artifactId>
+ <packaging>jar</packaging>
+ <name>Celeborn Client for Flink Common</name>
<dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-common_${scala.binary.version}</artifactId>
@@ -42,5 +39,10 @@
<artifactId>celeborn-client_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
similarity index 100%
rename from
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
rename to
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
similarity index 100%
rename from
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
rename to
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
similarity index 100%
rename from
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
rename to
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
similarity index 100%
rename from
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
rename to
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
similarity index 100%
rename from
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
rename to
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
diff --git
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
similarity index 100%
rename from
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
rename to
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
diff --git a/client-flink/flink-shaded/pom.xml
b/client-flink/flink-1.14-shaded/pom.xml
similarity index 95%
rename from client-flink/flink-shaded/pom.xml
rename to client-flink/flink-1.14-shaded/pom.xml
index 47800c07..d380ab8d 100644
--- a/client-flink/flink-shaded/pom.xml
+++ b/client-flink/flink-1.14-shaded/pom.xml
@@ -24,14 +24,14 @@
<relativePath>../../pom.xml</relativePath>
</parent>
-
<artifactId>celeborn-client-flink-${flink.version}-shaded_${scala.binary.version}</artifactId>
+
<artifactId>celeborn-client-flink-${flink.binary.version}-shaded_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
- <name>Celeborn Shaded Client for flink</name>
+ <name>Celeborn Shaded Client for Flink 1.14</name>
<dependencies>
<dependency>
<groupId>org.apache.celeborn</groupId>
-
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
+
<artifactId>celeborn-client-flink-${flink.binary.version}_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
diff --git a/client-flink/flink-1.14/pom.xml b/client-flink/flink-1.14/pom.xml
index 6b88888f..8645add4 100644
--- a/client-flink/flink-1.14/pom.xml
+++ b/client-flink/flink-1.14/pom.xml
@@ -24,16 +24,11 @@
<relativePath>../../pom.xml</relativePath>
</parent>
-
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
+
<artifactId>celeborn-client-flink-${flink.binary.version}_${scala.binary.version}</artifactId>
<packaging>jar</packaging>
- <name>Celeborn Client for flink</name>
+ <name>Celeborn Client for Flink 1.14</name>
<dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime</artifactId>
- <scope>provided</scope>
- </dependency>
<dependency>
<groupId>org.apache.celeborn</groupId>
<artifactId>celeborn-common_${scala.binary.version}</artifactId>
@@ -44,17 +39,21 @@
<artifactId>celeborn-client_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.celeborn</groupId>
+
<artifactId>celeborn-client-flink-common-${flink.binary.version}_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.celeborn</groupId>
-
<artifactId>celeborn-client-flink-common-${flink.version}_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>compile</scope>
- </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index a0b620a5..384adf9d 100644
---
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -17,7 +17,17 @@
package org.apache.celeborn.plugin.flink;
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -29,65 +39,53 @@ import org.apache.flink.util.function.SupplierWithException;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
public class RemoteShuffleResultPartitionSuiteJ {
- private BufferCompressor bufferCompressor =
- new BufferCompressor(32 * 1024, "lz4");
- private RemoteShuffleOutputGate remoteShuffleOutputGate =
mock(RemoteShuffleOutputGate.class);
-
- @Before
- public void setup() {
-
- }
-
- @Test
- public void tesSimpleFlush() throws IOException, InterruptedException {
- List<SupplierWithException<BufferPool, IOException>> bufferPool =
createBufferPoolFactory();
- RemoteShuffleResultPartition remoteShuffleResultPartition = new
RemoteShuffleResultPartition("test",
- 0,
- new ResultPartitionID(),
- ResultPartitionType.BLOCKING,
- 2,
- 2,
- 32 * 1024,
- new ResultPartitionManager(),
- bufferCompressor,
- bufferPool.get(0),
- remoteShuffleOutputGate);
- remoteShuffleResultPartition.setup();
- doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
- doNothing().when(remoteShuffleOutputGate).regionFinish();
-
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
- SortBuffer sortBuffer =
remoteShuffleResultPartition.getUnicastSortBuffer();
- ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
- sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
- remoteShuffleResultPartition.flushSortBuffer(sortBuffer, true);
- }
+ private BufferCompressor bufferCompressor = new BufferCompressor(32 * 1024,
"lz4");
+ private RemoteShuffleOutputGate remoteShuffleOutputGate =
mock(RemoteShuffleOutputGate.class);
- private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
- NetworkBufferPool networkBufferPool =
- new NetworkBufferPool(256 * 8, 32 * 1024,
Duration.ofMillis(1000));
+ @Before
+ public void setup() {}
- int numBuffersPerPartition = 64 * 1024 / 32;
- int numForResultPartition = numBuffersPerPartition * 7 / 8;
- int numForOutputGate = numBuffersPerPartition - numForResultPartition;
+ @Test
+ public void tesSimpleFlush() throws IOException, InterruptedException {
+ List<SupplierWithException<BufferPool, IOException>> bufferPool =
createBufferPoolFactory();
+ RemoteShuffleResultPartition remoteShuffleResultPartition =
+ new RemoteShuffleResultPartition(
+ "test",
+ 0,
+ new ResultPartitionID(),
+ ResultPartitionType.BLOCKING,
+ 2,
+ 2,
+ 32 * 1024,
+ new ResultPartitionManager(),
+ bufferCompressor,
+ bufferPool.get(0),
+ remoteShuffleOutputGate);
+ remoteShuffleResultPartition.setup();
+ doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
+ doNothing().when(remoteShuffleOutputGate).regionFinish();
+
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
+ SortBuffer sortBuffer =
remoteShuffleResultPartition.getUnicastSortBuffer();
+ ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
+ sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+ remoteShuffleResultPartition.flushSortBuffer(sortBuffer, true);
+ }
- List<SupplierWithException<BufferPool, IOException>> factories = new
ArrayList<>();
- factories.add(
- () ->
networkBufferPool.createBufferPool(numForResultPartition,
numForResultPartition));
- factories.add(() ->
networkBufferPool.createBufferPool(numForOutputGate, numForOutputGate));
- return factories;
- }
+ private List<SupplierWithException<BufferPool, IOException>>
createBufferPoolFactory() {
+ NetworkBufferPool networkBufferPool =
+ new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
+ int numBuffersPerPartition = 64 * 1024 / 32;
+ int numForResultPartition = numBuffersPerPartition * 7 / 8;
+ int numForOutputGate = numBuffersPerPartition - numForResultPartition;
+ List<SupplierWithException<BufferPool, IOException>> factories = new
ArrayList<>();
+ factories.add(
+ () -> networkBufferPool.createBufferPool(numForResultPartition,
numForResultPartition));
+ factories.add(() -> networkBufferPool.createBufferPool(numForOutputGate,
numForOutputGate));
+ return factories;
+ }
}
diff --git a/pom.xml b/pom.xml
index c7ffcf1a..0b736c2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.version>3.6.3</maven.version>
+ <flink.version>1.14.0</flink.version>
<hadoop.version>3.2.1</hadoop.version>
<spark.version>3.3.1</spark.version>
@@ -90,9 +91,6 @@
<snakeyaml.version>1.30</snakeyaml.version>
<zstd-jni.version>1.5.2-1</zstd-jni.version>
- <!-- default flink version -->
- <flink.version>1.14.0</flink.version>
-
<shading.prefix>org.apache.celeborn.shaded</shading.prefix>
<maven.plugin.antrun.version>3.0.0</maven.plugin.antrun.version>
@@ -1016,18 +1014,19 @@
<profile>
<id>flink-1.14</id>
<modules>
+ <module>client-flink/common</module>
<module>client-flink/flink-1.14</module>
- <module>client-flink/flink-common</module>
- <module>client-flink/flink-shaded</module>
+ <module>client-flink/flink-1.14-shaded</module>
</modules>
<properties>
+ <flink.version>1.14.0</flink.version>
+ <flink.binary.version>1.14</flink.binary.version>
<jackson.version>2.6.7</jackson.version>
<jackson.databind.version>2.6.7.3</jackson.databind.version>
<lz4-java.version>1.4.0</lz4-java.version>
+ <scala.version>2.12.15</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<zstd-jni.version>1.4.4-3</zstd-jni.version>
- <scala.version>2.12.15</scala.version>
- <flink.version>1.14.0</flink.version>
</properties>
</profile>