This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new c74dd0908 [#4386] Fix(spark-connector): spark connector build failed 
with scala2.13 (#4486)
c74dd0908 is described below

commit c74dd0908811a7640ae5537f98b526698d1eb4e1
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Aug 12 17:33:57 2024 +0800

    [#4386] Fix(spark-connector): spark connector build failed with scala2.13 
(#4486)
    
    ### What changes were proposed in this pull request?
    Fixed spark connector build failed with scala2.13.
    
    ### Why are the changes needed?
    
    Fix:  https://github.com/apache/gravitino/issues/4386
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    1. build with scala2.12 and scala2.13 locally.
    2. add a new job about building spark-connector in github pipeline to
    test scala2.13
    
    Co-authored-by: cai can <[email protected]>
    Co-authored-by: caican <[email protected]>
---
 .github/workflows/build.yml                        | 33 +++++++++++++++++++
 .../IcebergExtendedDataSourceV2Strategy.java       | 37 ++++++++++++++--------
 .../spark/connector/utils/ConnectorUtil.java       | 12 +++++++
 .../integration/test/sql/SQLQueryTestHelper.java   |  8 +++--
 4 files changed, 73 insertions(+), 17 deletions(-)

diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index cbb9eaffb..0cf17683a 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -68,6 +68,39 @@ jobs:
       - name: Build with Gradle
         run: ./gradlew build -x test -PjdkVersion=8
 
+  # To check the spark-connector is compatible with scala2.13
+  spark-connector-build:
+    runs-on: ubuntu-latest
+    timeout-minutes: 30
+    needs: changes
+    if: needs.changes.outputs.source_changes == 'true'
+    steps:
+      - uses: actions/checkout@v3
+
+      - uses: actions/setup-java@v4
+        with:
+          java-version: 8
+          distribution: 'temurin'
+          cache: 'gradle'
+
+      - name: Free up disk space
+        run: |
+          dev/ci/util_free_space.sh
+
+      - name: Build with Scala2.13
+        run: |
+          ./gradlew :spark-connector:spark-3.4:build -PscalaVersion=2.13 
-PskipITs -PskipDockerTests=false
+          ./gradlew :spark-connector:spark-3.5:build -PscalaVersion=2.13 
-PskipITs -PskipDockerTests=false
+
+      - name: Upload unit tests report
+        uses: actions/upload-artifact@v3
+        if: failure()
+        with:
+          name: unit test report
+          path: |
+            build/reports
+            spark-connector/**/*.log
+
   build:
     # The type of runner that the job will run on
     runs-on: ubuntu-latest
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
index 9132ac3c4..3bf263156 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.spark.connector.iceberg.extensions;
 
+import static 
org.apache.gravitino.spark.connector.utils.ConnectorUtil.toJavaList;
+
 import java.util.Collections;
 import org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalog;
 import org.apache.iceberg.spark.Spark3Util;
@@ -51,7 +53,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.SetWriteDistributionAndOrde
 import scala.Option;
 import scala.Some;
 import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import scala.collection.immutable.Seq;
 
 public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Strategy {
 
@@ -66,7 +68,8 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
   public Seq<SparkPlan> apply(LogicalPlan plan) {
     if (plan instanceof AddPartitionField) {
       AddPartitionField addPartitionField = (AddPartitionField) plan;
-      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark, 
addPartitionField.table())
+      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
+              spark, addPartitionField.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 AddPartitionFieldExec addPartitionFieldExec =
@@ -81,7 +84,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
     } else if (plan instanceof CreateOrReplaceBranch) {
       CreateOrReplaceBranch createOrReplaceBranch = (CreateOrReplaceBranch) 
plan;
       return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
-              spark, createOrReplaceBranch.table())
+              spark, createOrReplaceBranch.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 CreateOrReplaceBranchExec createOrReplaceBranchExec =
@@ -99,7 +102,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
     } else if (plan instanceof CreateOrReplaceTag) {
       CreateOrReplaceTag createOrReplaceTag = (CreateOrReplaceTag) plan;
       return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
-              spark, createOrReplaceTag.table())
+              spark, createOrReplaceTag.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 CreateOrReplaceTagExec createOrReplaceTagExec =
@@ -116,7 +119,8 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
           .get();
     } else if (plan instanceof DropBranch) {
       DropBranch dropBranch = (DropBranch) plan;
-      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark, 
dropBranch.table())
+      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
+              spark, dropBranch.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 DropBranchExec dropBranchExec =
@@ -130,7 +134,8 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
           .get();
     } else if (plan instanceof DropTag) {
       DropTag dropTag = (DropTag) plan;
-      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark, 
dropTag.table())
+      return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
+              spark, dropTag.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 DropTagExec dropTagExec =
@@ -145,7 +150,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
     } else if (plan instanceof DropPartitionField) {
       DropPartitionField dropPartitionField = (DropPartitionField) plan;
       return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
-              spark, dropPartitionField.table())
+              spark, dropPartitionField.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 DropPartitionFieldExec dropPartitionFieldExec =
@@ -159,7 +164,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
     } else if (plan instanceof ReplacePartitionField) {
       ReplacePartitionField replacePartitionField = (ReplacePartitionField) 
plan;
       return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
-              spark, replacePartitionField.table())
+              spark, replacePartitionField.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 ReplacePartitionFieldExec replacePartitionFieldExec =
@@ -175,7 +180,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
     } else if (plan instanceof SetIdentifierFields) {
       SetIdentifierFields setIdentifierFields = (SetIdentifierFields) plan;
       return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
-              spark, setIdentifierFields.table())
+              spark, setIdentifierFields.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 SetIdentifierFieldsExec setIdentifierFieldsExec =
@@ -189,7 +194,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
     } else if (plan instanceof DropIdentifierFields) {
       DropIdentifierFields dropIdentifierFields = (DropIdentifierFields) plan;
       return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
-              spark, dropIdentifierFields.table())
+              spark, dropIdentifierFields.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 DropIdentifierFieldsExec dropIdentifierFieldsExec =
@@ -204,7 +209,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
       SetWriteDistributionAndOrdering setWriteDistributionAndOrdering =
           (SetWriteDistributionAndOrdering) plan;
       return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
-              spark, setWriteDistributionAndOrdering.table())
+              spark, setWriteDistributionAndOrdering.table().toIndexedSeq())
           .map(
               catalogAndIdentifier -> {
                 SetWriteDistributionAndOrderingExec 
setWriteDistributionAndOrderingExec =
@@ -217,14 +222,18 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
               })
           .get();
     } else {
-      return super.apply(plan);
+      scala.collection.Seq<SparkPlan> sparkPlans = super.apply(plan);
+      if (sparkPlans != null) {
+        return sparkPlans.toIndexedSeq();
+      }
+      return null;
     }
   }
 
   private Seq<SparkPlan> toSeq(SparkPlan plan) {
     return 
JavaConverters.asScalaIteratorConverter(Collections.singletonList(plan).listIterator())
         .asScala()
-        .toSeq();
+        .toIndexedSeq();
   }
 
   static class IcebergCatalogAndIdentifier {
@@ -244,7 +253,7 @@ public class IcebergExtendedDataSourceV2Strategy extends 
ExtendedDataSourceV2Str
     static Option<IcebergCatalogAndIdentifier> buildCatalogAndIdentifier(
         SparkSession spark, Seq<String> identifiers) {
       Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
-          Spark3Util.catalogAndIdentifier(spark, 
JavaConverters.<String>seqAsJavaList(identifiers));
+          Spark3Util.catalogAndIdentifier(spark, toJavaList(identifiers));
       CatalogPlugin catalog = catalogAndIdentifier.catalog();
       if (catalog instanceof GravitinoIcebergCatalog) {
         return new Some<>(
diff --git 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
index 550333155..672369854 100644
--- 
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
+++ 
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
@@ -19,11 +19,14 @@
 
 package org.apache.gravitino.spark.connector.utils;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.spark.connector.ConnectorConstants;
+import scala.collection.immutable.Seq;
 
 public class ConnectorUtil {
 
@@ -37,4 +40,13 @@ public class ConnectorUtil {
         .reduce((element1, element2) -> element1 + ConnectorConstants.COMMA + 
element2)
         .orElse("");
   }
+
+  public static List<String> toJavaList(Seq<String> seq) {
+    List<String> javaList = new ArrayList<>();
+    if (seq == null || seq.isEmpty()) {
+      return javaList;
+    }
+    seq.foreach(javaList::add);
+    return javaList;
+  }
 }
diff --git 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
index d86cc686c..39c29a4a0 100644
--- 
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
+++ 
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.spark.connector.integration.test.sql;
 
+import static 
org.apache.gravitino.spark.connector.utils.ConnectorUtil.toJavaList;
+
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -31,7 +33,6 @@ import org.apache.spark.sql.execution.HiveResult;
 import org.apache.spark.sql.execution.SQLExecution;
 import org.apache.spark.sql.types.StructType;
 import scala.Option;
-import scala.collection.JavaConverters;
 
 public class SQLQueryTestHelper {
   private static final String notIncludedMsg = "[not included in comparison]";
@@ -62,8 +63,9 @@ public class SQLQueryTestHelper {
             df.queryExecution(),
             Option.apply(""),
             () ->
-                JavaConverters.seqAsJavaList(
-                        
HiveResult.hiveResultString(df.queryExecution().executedPlan()))
+                toJavaList(
+                        
HiveResult.hiveResultString(df.queryExecution().executedPlan())
+                            .toIndexedSeq())
                     .stream()
                     .map(s -> replaceNotIncludedMsg(s))
                     .filter(s -> !s.isEmpty())

Reply via email to