This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new e45d9e53 Rename Flink modules and enable Flink CI (#1110)
e45d9e53 is described below

commit e45d9e53a80a2cb54c642372d95f3e47f0575be8
Author: Cheng Pan <[email protected]>
AuthorDate: Wed Dec 21 23:52:11 2022 +0800

    Rename Flink modules and enable Flink CI (#1110)
---
 .github/workflows/license.yml                      |   1 +
 .github/workflows/maven.yml                        |  32 ++++++
 .github/workflows/style.yml                        |   1 +
 client-flink/{flink-common => common}/pom.xml      |  14 +--
 .../celeborn/plugin/flink/buffer/BufferHeader.java |   0
 .../celeborn/plugin/flink/buffer/BufferPacker.java |   0
 .../plugin/flink/buffer/PartitionSortedBuffer.java |   0
 .../celeborn/plugin/flink/buffer/SortBuffer.java   |   0
 .../celeborn/plugin/flink/utils/BufferUtils.java   |   0
 .../apache/celeborn/plugin/flink/utils/Utils.java  |   0
 .../{flink-shaded => flink-1.14-shaded}/pom.xml    |   6 +-
 client-flink/flink-1.14/pom.xml                    |  25 +++--
 .../flink/RemoteShuffleResultPartitionSuiteJ.java  | 108 ++++++++++-----------
 pom.xml                                            |  13 ++-
 14 files changed, 116 insertions(+), 84 deletions(-)

diff --git a/.github/workflows/license.yml b/.github/workflows/license.yml
index 098f3046..be166aff 100644
--- a/.github/workflows/license.yml
+++ b/.github/workflows/license.yml
@@ -43,6 +43,7 @@ jobs:
           cache: 'maven'
           check-latest: false
       - run: |
+          build/mvn org.apache.rat:apache-rat-plugin:check 
-Pgoogle-mirror,flink-1.14
           build/mvn org.apache.rat:apache-rat-plugin:check 
-Pgoogle-mirror,spark-2.4
           build/mvn org.apache.rat:apache-rat-plugin:check 
-Pgoogle-mirror,spark-3.3
       - name: Upload rat report
diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 5d3660ef..d24db220 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -76,3 +76,35 @@ jobs:
         name: spark-${{ matrix.spark }}-unit-test-log
         path: |
           **/target/unit-tests.log
+
+  flink:
+    runs-on: ubuntu-22.04
+    strategy:
+      fail-fast: false
+      matrix:
+        java:
+          - 8
+        flink:
+          - '1.14'
+    steps:
+      - uses: actions/checkout@v2
+      - name: Setup JDK ${{ matrix.java }}
+        uses: actions/setup-java@v2
+        with:
+          distribution: zulu
+          java-version: ${{ matrix.java }}
+          cache: maven
+          check-latest: false
+      - name: Test with Maven
+        run: |
+          PROFILES="-Pgoogle-mirror,flink-${{ matrix.flink }}"
+          TEST_MODULES="client-flink/common,client-flink/flink-${{ 
matrix.flink }},client-flink/flink-${{ matrix.flink }}-shaded"
+          build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
+          build/mvn $PROFILES -pl $TEST_MODULES test
+      - name: Upload test log
+        if: failure()
+        uses: actions/upload-artifact@v3
+        with:
+          name: flink-${{ matrix.flink }}-unit-test-log
+          path: |
+            **/target/unit-tests.log
diff --git a/.github/workflows/style.yml b/.github/workflows/style.yml
index d5ec23a8..e7c886e0 100644
--- a/.github/workflows/style.yml
+++ b/.github/workflows/style.yml
@@ -43,5 +43,6 @@ jobs:
           cache: 'maven'
           check-latest: false
       - run: |
+          build/mvn spotless:check -Pgoogle-mirror,flink-1.14
           build/mvn spotless:check -Pgoogle-mirror,spark-2.4
           build/mvn spotless:check -Pgoogle-mirror,spark-3.3
diff --git a/client-flink/flink-common/pom.xml b/client-flink/common/pom.xml
similarity index 91%
rename from client-flink/flink-common/pom.xml
rename to client-flink/common/pom.xml
index 3b37b482..5a4b6505 100644
--- a/client-flink/flink-common/pom.xml
+++ b/client-flink/common/pom.xml
@@ -24,14 +24,11 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
-  
<artifactId>celeborn-client-flink-common-${flink.version}_${scala.binary.version}</artifactId>
+  
<artifactId>celeborn-client-flink-common-${flink.binary.version}_${scala.binary.version}</artifactId>
+  <packaging>jar</packaging>
+  <name>Celeborn Client for Flink Common</name>
 
   <dependencies>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime</artifactId>
-      <scope>provided</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.celeborn</groupId>
       <artifactId>celeborn-common_${scala.binary.version}</artifactId>
@@ -42,5 +39,10 @@
       <artifactId>celeborn-client_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime</artifactId>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
similarity index 100%
rename from 
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
rename to 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferHeader.java
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
similarity index 100%
rename from 
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
rename to 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/BufferPacker.java
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
similarity index 100%
rename from 
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
rename to 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/PartitionSortedBuffer.java
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
similarity index 100%
rename from 
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
rename to 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/buffer/SortBuffer.java
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
similarity index 100%
rename from 
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
rename to 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
diff --git 
a/client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
similarity index 100%
rename from 
client-flink/flink-common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
rename to 
client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/Utils.java
diff --git a/client-flink/flink-shaded/pom.xml 
b/client-flink/flink-1.14-shaded/pom.xml
similarity index 95%
rename from client-flink/flink-shaded/pom.xml
rename to client-flink/flink-1.14-shaded/pom.xml
index 47800c07..d380ab8d 100644
--- a/client-flink/flink-shaded/pom.xml
+++ b/client-flink/flink-1.14-shaded/pom.xml
@@ -24,14 +24,14 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
-  
<artifactId>celeborn-client-flink-${flink.version}-shaded_${scala.binary.version}</artifactId>
+  
<artifactId>celeborn-client-flink-${flink.binary.version}-shaded_${scala.binary.version}</artifactId>
   <packaging>jar</packaging>
-  <name>Celeborn Shaded Client for flink</name>
+  <name>Celeborn Shaded Client for Flink 1.14</name>
 
   <dependencies>
     <dependency>
       <groupId>org.apache.celeborn</groupId>
-      
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
+      
<artifactId>celeborn-client-flink-${flink.binary.version}_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
   </dependencies>
diff --git a/client-flink/flink-1.14/pom.xml b/client-flink/flink-1.14/pom.xml
index 6b88888f..8645add4 100644
--- a/client-flink/flink-1.14/pom.xml
+++ b/client-flink/flink-1.14/pom.xml
@@ -24,16 +24,11 @@
     <relativePath>../../pom.xml</relativePath>
   </parent>
 
-  
<artifactId>celeborn-client-flink-${flink.version}_${scala.binary.version}</artifactId>
+  
<artifactId>celeborn-client-flink-${flink.binary.version}_${scala.binary.version}</artifactId>
   <packaging>jar</packaging>
-  <name>Celeborn Client for flink</name>
+  <name>Celeborn Client for Flink 1.14</name>
 
   <dependencies>
-    <dependency>
-      <groupId>org.apache.flink</groupId>
-      <artifactId>flink-runtime</artifactId>
-      <scope>provided</scope>
-    </dependency>
     <dependency>
       <groupId>org.apache.celeborn</groupId>
       <artifactId>celeborn-common_${scala.binary.version}</artifactId>
@@ -44,17 +39,21 @@
       <artifactId>celeborn-client_${scala.binary.version}</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.celeborn</groupId>
+      
<artifactId>celeborn-client-flink-common-${flink.binary.version}_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime</artifactId>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.celeborn</groupId>
-      
<artifactId>celeborn-client-flink-common-${flink.version}_${scala.binary.version}</artifactId>
-      <version>${project.version}</version>
-      <scope>compile</scope>
-    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
diff --git 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
index a0b620a5..384adf9d 100644
--- 
a/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
+++ 
b/client-flink/flink-1.14/src/test/java/org/apache/celeborn/plugin/flink/RemoteShuffleResultPartitionSuiteJ.java
@@ -17,7 +17,17 @@
 
 package org.apache.celeborn.plugin.flink;
 
-import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -29,65 +39,53 @@ import org.apache.flink.util.function.SupplierWithException;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.celeborn.plugin.flink.buffer.SortBuffer;
 
 public class RemoteShuffleResultPartitionSuiteJ {
-    private BufferCompressor bufferCompressor =
-            new BufferCompressor(32 * 1024, "lz4");
-    private RemoteShuffleOutputGate remoteShuffleOutputGate = 
mock(RemoteShuffleOutputGate.class);
-
-    @Before
-    public void setup() {
-
-    }
-
-    @Test
-    public void tesSimpleFlush() throws IOException, InterruptedException {
-        List<SupplierWithException<BufferPool, IOException>> bufferPool = 
createBufferPoolFactory();
-        RemoteShuffleResultPartition remoteShuffleResultPartition = new 
RemoteShuffleResultPartition("test",
-                0,
-                new ResultPartitionID(),
-                ResultPartitionType.BLOCKING,
-                2,
-                2,
-                32 * 1024,
-                new ResultPartitionManager(),
-                bufferCompressor,
-                bufferPool.get(0),
-                remoteShuffleOutputGate);
-        remoteShuffleResultPartition.setup();
-        doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
-        doNothing().when(remoteShuffleOutputGate).regionFinish();
-        
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
-        SortBuffer sortBuffer = 
remoteShuffleResultPartition.getUnicastSortBuffer();
-        ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
-        sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
-        remoteShuffleResultPartition.flushSortBuffer(sortBuffer, true);
-    }
+  private BufferCompressor bufferCompressor = new BufferCompressor(32 * 1024, 
"lz4");
+  private RemoteShuffleOutputGate remoteShuffleOutputGate = 
mock(RemoteShuffleOutputGate.class);
 
-    private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {
-        NetworkBufferPool networkBufferPool =
-                new NetworkBufferPool(256 * 8, 32 * 1024, 
Duration.ofMillis(1000));
+  @Before
+  public void setup() {}
 
-        int numBuffersPerPartition = 64 * 1024 / 32;
-        int numForResultPartition = numBuffersPerPartition * 7 / 8;
-        int numForOutputGate = numBuffersPerPartition - numForResultPartition;
+  @Test
+  public void tesSimpleFlush() throws IOException, InterruptedException {
+    List<SupplierWithException<BufferPool, IOException>> bufferPool = 
createBufferPoolFactory();
+    RemoteShuffleResultPartition remoteShuffleResultPartition =
+        new RemoteShuffleResultPartition(
+            "test",
+            0,
+            new ResultPartitionID(),
+            ResultPartitionType.BLOCKING,
+            2,
+            2,
+            32 * 1024,
+            new ResultPartitionManager(),
+            bufferCompressor,
+            bufferPool.get(0),
+            remoteShuffleOutputGate);
+    remoteShuffleResultPartition.setup();
+    doNothing().when(remoteShuffleOutputGate).regionStart(anyBoolean());
+    doNothing().when(remoteShuffleOutputGate).regionFinish();
+    
when(remoteShuffleOutputGate.getBufferPool()).thenReturn(bufferPool.get(1).get());
+    SortBuffer sortBuffer = 
remoteShuffleResultPartition.getUnicastSortBuffer();
+    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[] {1, 2, 3});
+    sortBuffer.append(byteBuffer, 0, Buffer.DataType.DATA_BUFFER);
+    remoteShuffleResultPartition.flushSortBuffer(sortBuffer, true);
+  }
 
-        List<SupplierWithException<BufferPool, IOException>> factories = new 
ArrayList<>();
-        factories.add(
-                () -> 
networkBufferPool.createBufferPool(numForResultPartition, 
numForResultPartition));
-        factories.add(() -> 
networkBufferPool.createBufferPool(numForOutputGate, numForOutputGate));
-        return factories;
-    }
+  private List<SupplierWithException<BufferPool, IOException>> 
createBufferPoolFactory() {
+    NetworkBufferPool networkBufferPool =
+        new NetworkBufferPool(256 * 8, 32 * 1024, Duration.ofMillis(1000));
 
+    int numBuffersPerPartition = 64 * 1024 / 32;
+    int numForResultPartition = numBuffersPerPartition * 7 / 8;
+    int numForOutputGate = numBuffersPerPartition - numForResultPartition;
 
+    List<SupplierWithException<BufferPool, IOException>> factories = new 
ArrayList<>();
+    factories.add(
+        () -> networkBufferPool.createBufferPool(numForResultPartition, 
numForResultPartition));
+    factories.add(() -> networkBufferPool.createBufferPool(numForOutputGate, 
numForOutputGate));
+    return factories;
+  }
 }
diff --git a/pom.xml b/pom.xml
index c7ffcf1a..0b736c2f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
     <maven.compiler.target>${java.version}</maven.compiler.target>
     <maven.version>3.6.3</maven.version>
 
+    <flink.version>1.14.0</flink.version>
     <hadoop.version>3.2.1</hadoop.version>
     <spark.version>3.3.1</spark.version>
 
@@ -90,9 +91,6 @@
     <snakeyaml.version>1.30</snakeyaml.version>
     <zstd-jni.version>1.5.2-1</zstd-jni.version>
 
-    <!--  default flink version  -->
-    <flink.version>1.14.0</flink.version>
-
     <shading.prefix>org.apache.celeborn.shaded</shading.prefix>
 
     <maven.plugin.antrun.version>3.0.0</maven.plugin.antrun.version>
@@ -1016,18 +1014,19 @@
     <profile>
       <id>flink-1.14</id>
       <modules>
+        <module>client-flink/common</module>
         <module>client-flink/flink-1.14</module>
-        <module>client-flink/flink-common</module>
-        <module>client-flink/flink-shaded</module>
+        <module>client-flink/flink-1.14-shaded</module>
       </modules>
       <properties>
+        <flink.version>1.14.0</flink.version>
+        <flink.binary.version>1.14</flink.binary.version>
         <jackson.version>2.6.7</jackson.version>
         <jackson.databind.version>2.6.7.3</jackson.databind.version>
         <lz4-java.version>1.4.0</lz4-java.version>
+        <scala.version>2.12.15</scala.version>
         <scala.binary.version>2.12</scala.binary.version>
         <zstd-jni.version>1.4.4-3</zstd-jni.version>
-        <scala.version>2.12.15</scala.version>
-        <flink.version>1.14.0</flink.version>
       </properties>
     </profile>
 

Reply via email to