This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit bf97a2227be2657d778d488f8a918b15b452e793 Author: Cheng Pan <[email protected]> AuthorDate: Wed Dec 21 23:52:11 2022 +0800 [CELEBORN-163][BUILD] Rename Flink modules and enable Flink CI (#1110) --- .github/workflows/license.yml | 1 + .github/workflows/maven.yml | 32 ++++++ .github/workflows/style.yml | 1 + client-flink/{flink-common => common}/pom.xml | 14 +-- .../celeborn/plugin/flink/buffer/BufferHeader.java | 0 .../celeborn/plugin/flink/buffer/BufferPacker.java | 0 .../plugin/flink/buffer/PartitionSortedBuffer.java | 0 .../celeborn/plugin/flink/buffer/SortBuffer.java | 0 .../celeborn/plugin/flink/utils/BufferUtils.java | 0 .../apache/celeborn/plugin/flink/utils/Utils.java | 0 .../{flink-shaded => flink-1.14-shaded}/pom.xml | 6 +- client-flink/flink-1.14/pom.xml | 25 +++-- .../flink/RemoteShuffleResultPartitionSuiteJ.java | 108 ++++++++++----------- pom.xml | 13 ++- 14 files changed, 116 insertions(+), 84 deletions(-) diff --git a/.github/workflows/license.yml b/.github/workflows/license.yml index 098f3046..be166aff 100644 --- a/.github/workflows/license.yml +++ b/.github/workflows/license.yml @@ -43,6 +43,7 @@ jobs: cache: 'maven' check-latest: false - run: | + build/mvn org.apache.rat:apache-rat-plugin:check -Pgoogle-mirror,flink-1.14 build/mvn org.apache.rat:apache-rat-plugin:check -Pgoogle-mirror,spark-2.4 build/mvn org.apache.rat:apache-rat-plugin:check -Pgoogle-mirror,spark-3.3 - name: Upload rat report diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 5d3660ef..d24db220 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -76,3 +76,35 @@ jobs: name: spark-${{ matrix.spark }}-unit-test-log path: | **/target/unit-tests.log + + flink: + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + matrix: + java: + - 8 + flink: + - '1.14' + steps: + - uses: actions/checkout@v2 + - name: Setup JDK ${{ matrix.java }} + uses: actions/setup-java@v2 + with: + distribution: zulu + java-version: ${{ matrix.java }} + cache: maven + check-latest: false + - name: Test with Maven + run: | + PROFILES="-Pgoogle-mirror,flink-${{ matrix.flink }}" + TEST_MODULES="client-flink/common,client-flink/flink-${{ matrix.flink }},client-flink/flink-${{ matrix.flink }}-shaded" + build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests + build/mvn $PROFILES -pl $TEST_MODULES test + - name: Upload test log + if: failure() + uses: actions/upload-artifact@v3 + with: + name: flink-${{ matrix.flink }}-unit-test-log + path: | + **/target/unit-tests.log diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml index d5ec23a8..e7c886e0 100644 --- a/.github/workflows/style.yml +++ b/.github/workflows/style.yml @@ -43,5 +43,6 @@ jobs: cache: 'maven' check-latest: false - run: | + build/mvn spotless:check -Pgoogle-mirror,flink-1.14 build/mvn spotless:check -Pgoogle-mirror,spark-2.4 build/mvn spotless:check -Pgoogle-mirror,spark-3.3 diff --git a/client-flink/flink-common/pom.xml b/client-flink/common/pom.xml similarity index 91% rename from client-flink/flink-common/pom.xml rename to client-flink/common/pom.xml index 3b37b482..5a4b6505 100644 --- a/client-flink/flink-common/pom.xml +++ b/client-flink/common/pom.xml @@ -24,14 +24,11 @@ <relativePath>../../pom.xml</relativePath> </parent> - <artifactId>celeborn-client-flink-common-${flink.version}_${scala.binary.version}</artifactId> + <artifactId>celeborn-client-flink-common-${flink.binary.version}_${scala.binary.version}</artifactId> + <packaging>jar</packaging> + <name>Celeborn Client for Flink Common</name> <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <scope>provided</scope> - </dependency> <dependency> <groupId>org.apache.celeborn</groupId> <artifactId>celeborn-common_${scala.binary.version}</artifactId> @@ -42,5 +39,10 @@ <artifactId>celeborn-client_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <scope>provided</scope> + </dependency> </dependencies> </project> diff --git a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java similarity index 100% rename from client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java rename to client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java diff --git a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java similarity index 100% rename from client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java rename to client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java diff --git a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java similarity index 100% rename from client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java rename to client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java diff --git a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java similarity index 100% rename from client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java rename to client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java diff --git a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java similarity index 100% rename from client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java rename to client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java diff --git a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java similarity index 100% rename from client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java rename to client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java diff --git a/client-flink/flink-shaded/pom.xml b/client-flink/flink-1.14-shaded/pom.xml similarity index 95% rename from client-flink/flink-shaded/pom.xml rename to client-flink/flink-1.14-shaded/pom.xml index 47800c07..d380ab8d 100644 --- a/client-flink/flink-shaded/pom.xml +++ b/client-flink/flink-1.14-shaded/pom.xml @@ -24,14 +24,14 @@ <relativePath>../../pom.xml</relativePath> </parent> - <artifactId>celeborn-client-flink-${flink.version}-shaded_${scala.binary.version}</artifactId> + <artifactId>celeborn-client-flink-${flink.binary.version}-shaded_${scala.binary.version}</artifactId> <packaging>jar</packaging> - <name>Celeborn Shaded Client for flink</name> + <name>Celeborn Shaded Client for Flink 1.14</name> <dependencies> <dependency> <groupId>org.apache.celeborn</groupId> - <artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId> + <artifactId>celeborn-client-flink-${flink.binary.version}_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> </dependencies> diff --git a/client-flink/flink-1.14/pom.xml b/client-flink/flink-1.14/pom.xml index 6b88888f..8645add4 100644 --- a/client-flink/flink-1.14/pom.xml +++ b/client-flink/flink-1.14/pom.xml @@ -24,16 +24,11 @@ <relativePath>../../pom.xml</relativePath> </parent> - <artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId> + <artifactId>celeborn-client-flink-${flink.binary.version}_${scala.binary.version}</artifactId> <packaging>jar</packaging> - <name>Celeborn Client for flink</name> + <name>Celeborn Client for Flink 1.14</name> <dependencies> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-runtime</artifactId> - <scope>provided</scope> - </dependency> <dependency> <groupId>org.apache.celeborn</groupId> <artifactId>celeborn-common_${scala.binary.version}</artifactId> @@ -44,17 +39,21 @@ <artifactId>celeborn-client_${scala.binary.version}</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.celeborn</groupId> + <artifactId>celeborn-client-flink-common-${flink.binary.version}_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <scope>provided</scope> + </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.celeborn</groupId> - <artifactId>celeborn-client-flink-common-${flink.version}_${scala.binary.version}</artifactId> - <version>${project.version}</version> - <scope>compile</scope> - </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-core</artifactId> diff --git a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java index a0b620a5..384adf9d 100644 --- a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java +++ b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java @@ -17,7 +17,17 @@ package org.apache.celeborn.plugin.flink; -import org.apache.celeborn.plugin.flink.buffer.SortBuffer; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.BufferCompressor; import org.apache.flink.runtime.io.network.buffer.BufferPool; @@ -29,65 +39,53 @@ import org.apache.flink.util.function.SupplierWithException; import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import org.apache.celeborn.plugin.flink.buffer.SortBuffer; public class RemoteShuffleResultPartitionSuiteJ { - private BufferCompressor bufferCompressor = - new BufferCompressor(32 * 1024, "lz4"); - private RemoteShuffleOutputGate remoteShuffleOutputGate = mock(RemoteShuffleOutputGate.class); - - @Before - public void setup() { - - } - - @Test - public void tesSimpleFlush() throws IOException, InterruptedException { - List<SupplierWithException<BufferPool, IOException>> bufferPool = createBufferPoolFactory(); - RemoteShuffleResultPartition remoteShuffleResultPartition = new RemoteShuffleResultPartition("test", - 0, - new ResultPartitionID(), - ResultPartitionType.BLOCKING, - 2, - 2, - 32 * 1024, - new ResultPartitionManager(), - bufferCompressor, - bufferPool.get(0), - remoteShuffleOutputGate); - remoteShuffleResultPartition.setup(); - doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean()); - doNothing().when(remoteShuffleOutputGate).regionFinish(); - when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get()); - SortBuffer sortBuffer = remoteShuffleResultPartition.getUnicastSortBuffer(); - ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3}); - sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER); - remoteShuffleResultPartition.flushSortBuffer(sortBuffer, true); - } + private BufferCompressor bufferCompressor = new BufferCompressor(32 * 1024, "lz4"); + private RemoteShuffleOutputGate remoteShuffleOutputGate = mock(RemoteShuffleOutputGate.class); - private List<SupplierWithException<BufferPool, IOException>> createBufferPoolFactory() { - NetworkBufferPool networkBufferPool = - new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000)); + @Before + public void setup() {} - int numBuffersPerPartition = 64 * 1024 / 32; - int numForResultPartition = numBuffersPerPartition * 7 / 8; - int numForOutputGate = numBuffersPerPartition - numForResultPartition; + @Test + public void tesSimpleFlush() throws IOException, InterruptedException { + List<SupplierWithException<BufferPool, IOException>> bufferPool = createBufferPoolFactory(); + RemoteShuffleResultPartition remoteShuffleResultPartition = + new RemoteShuffleResultPartition( + "test", + 0, + new ResultPartitionID(), + ResultPartitionType.BLOCKING, + 2, + 2, + 32 * 1024, + new ResultPartitionManager(), + bufferCompressor, + bufferPool.get(0), + remoteShuffleOutputGate); + remoteShuffleResultPartition.setup(); + doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean()); + doNothing().when(remoteShuffleOutputGate).regionFinish(); + when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get()); + SortBuffer sortBuffer = remoteShuffleResultPartition.getUnicastSortBuffer(); + ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3}); + sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER); + remoteShuffleResultPartition.flushSortBuffer(sortBuffer, true); + } - List<SupplierWithException<BufferPool, IOException>> factories = new ArrayList<>(); - factories.add( - () -> networkBufferPool.createBufferPool(numForResultPartition, numForResultPartition)); - factories.add(() -> networkBufferPool.createBufferPool(numForOutputGate, numForOutputGate)); - return factories; - } + private List<SupplierWithException<BufferPool, IOException>> createBufferPoolFactory() { + NetworkBufferPool networkBufferPool = + new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000)); + int numBuffersPerPartition = 64 * 1024 / 32; + int numForResultPartition = numBuffersPerPartition * 7 / 8; + int numForOutputGate = numBuffersPerPartition - numForResultPartition; + List<SupplierWithException<BufferPool, IOException>> factories = new ArrayList<>(); + factories.add( + () -> networkBufferPool.createBufferPool(numForResultPartition, numForResultPartition)); + factories.add(() -> networkBufferPool.createBufferPool(numForOutputGate, numForOutputGate)); + return factories; + } } diff --git a/pom.xml b/pom.xml index c7ffcf1a..0b736c2f 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ <maven.compiler.target>${java.version}</maven.compiler.target> <maven.version>3.6.3</maven.version> + <flink.version>1.14.0</flink.version> <hadoop.version>3.2.1</hadoop.version> <spark.version>3.3.1</spark.version> @@ -90,9 +91,6 @@ <snakeyaml.version>1.30</snakeyaml.version> <zstd-jni.version>1.5.2-1</zstd-jni.version> - <!-- default flink version --> - <flink.version>1.14.0</flink.version> - <shading.prefix>org.apache.celeborn.shaded</shading.prefix> <maven.plugin.antrun.version>3.0.0</maven.plugin.antrun.version> @@ -1016,18 +1014,19 @@ <profile> <id>flink-1.14</id> <modules> + <module>client-flink/common</module> <module>client-flink/flink-1.14</module> - <module>client-flink/flink-common</module> - <module>client-flink/flink-shaded</module> + <module>client-flink/flink-1.14-shaded</module> </modules> <properties> + <flink.version>1.14.0</flink.version> + <flink.binary.version>1.14</flink.binary.version> <jackson.version>2.6.7</jackson.version> <jackson.databind.version>2.6.7.3</jackson.databind.version> <lz4-java.version>1.4.0</lz4-java.version> + <scala.version>2.12.15</scala.version> <scala.binary.version>2.12</scala.binary.version> <zstd-jni.version>1.4.4-3</zstd-jni.version> - <scala.version>2.12.15</scala.version> - <flink.version>1.14.0</flink.version> </properties> </profile>
