This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new c74dd0908 [#4386] Fix(spark-connector): spark connector build failed
with scala2.13 (#4486)
c74dd0908 is described below
commit c74dd0908811a7640ae5537f98b526698d1eb4e1
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Aug 12 17:33:57 2024 +0800
[#4386] Fix(spark-connector): spark connector build failed with scala2.13
(#4486)
### What changes were proposed in this pull request?
Fixed spark connector build failed with scala2.13.
### Why are the changes needed?
Fix: https://github.com/apache/gravitino/issues/4386
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
1. build with scala2.12 and scala2.13 locally.
2. add a new job about building spark-connector in github pipeline to
test scala2.13
Co-authored-by: cai can <[email protected]>
Co-authored-by: caican <[email protected]>
---
.github/workflows/build.yml | 33 +++++++++++++++++++
.../IcebergExtendedDataSourceV2Strategy.java | 37 ++++++++++++++--------
.../spark/connector/utils/ConnectorUtil.java | 12 +++++++
.../integration/test/sql/SQLQueryTestHelper.java | 8 +++--
4 files changed, 73 insertions(+), 17 deletions(-)
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index cbb9eaffb..0cf17683a 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -68,6 +68,39 @@ jobs:
- name: Build with Gradle
run: ./gradlew build -x test -PjdkVersion=8
+ # To check the spark-connector is compatible with scala2.13
+ spark-connector-build:
+ runs-on: ubuntu-latest
+ timeout-minutes: 30
+ needs: changes
+ if: needs.changes.outputs.source_changes == 'true'
+ steps:
+ - uses: actions/checkout@v3
+
+ - uses: actions/setup-java@v4
+ with:
+ java-version: 8
+ distribution: 'temurin'
+ cache: 'gradle'
+
+ - name: Free up disk space
+ run: |
+ dev/ci/util_free_space.sh
+
+ - name: Build with Scala2.13
+ run: |
+ ./gradlew :spark-connector:spark-3.4:build -PscalaVersion=2.13
-PskipITs -PskipDockerTests=false
+ ./gradlew :spark-connector:spark-3.5:build -PscalaVersion=2.13
-PskipITs -PskipDockerTests=false
+
+ - name: Upload unit tests report
+ uses: actions/upload-artifact@v3
+ if: failure()
+ with:
+ name: unit test report
+ path: |
+ build/reports
+ spark-connector/**/*.log
+
build:
# The type of runner that the job will run on
runs-on: ubuntu-latest
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
index 9132ac3c4..3bf263156 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/extensions/IcebergExtendedDataSourceV2Strategy.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.spark.connector.iceberg.extensions;
+import static
org.apache.gravitino.spark.connector.utils.ConnectorUtil.toJavaList;
+
import java.util.Collections;
import org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalog;
import org.apache.iceberg.spark.Spark3Util;
@@ -51,7 +53,7 @@ import
org.apache.spark.sql.execution.datasources.v2.SetWriteDistributionAndOrde
import scala.Option;
import scala.Some;
import scala.collection.JavaConverters;
-import scala.collection.Seq;
+import scala.collection.immutable.Seq;
public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Strategy {
@@ -66,7 +68,8 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
public Seq<SparkPlan> apply(LogicalPlan plan) {
if (plan instanceof AddPartitionField) {
AddPartitionField addPartitionField = (AddPartitionField) plan;
- return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark,
addPartitionField.table())
+ return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
+ spark, addPartitionField.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
AddPartitionFieldExec addPartitionFieldExec =
@@ -81,7 +84,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
} else if (plan instanceof CreateOrReplaceBranch) {
CreateOrReplaceBranch createOrReplaceBranch = (CreateOrReplaceBranch)
plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
- spark, createOrReplaceBranch.table())
+ spark, createOrReplaceBranch.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
CreateOrReplaceBranchExec createOrReplaceBranchExec =
@@ -99,7 +102,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
} else if (plan instanceof CreateOrReplaceTag) {
CreateOrReplaceTag createOrReplaceTag = (CreateOrReplaceTag) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
- spark, createOrReplaceTag.table())
+ spark, createOrReplaceTag.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
CreateOrReplaceTagExec createOrReplaceTagExec =
@@ -116,7 +119,8 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
.get();
} else if (plan instanceof DropBranch) {
DropBranch dropBranch = (DropBranch) plan;
- return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark,
dropBranch.table())
+ return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
+ spark, dropBranch.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropBranchExec dropBranchExec =
@@ -130,7 +134,8 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
.get();
} else if (plan instanceof DropTag) {
DropTag dropTag = (DropTag) plan;
- return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(spark,
dropTag.table())
+ return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
+ spark, dropTag.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropTagExec dropTagExec =
@@ -145,7 +150,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
} else if (plan instanceof DropPartitionField) {
DropPartitionField dropPartitionField = (DropPartitionField) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
- spark, dropPartitionField.table())
+ spark, dropPartitionField.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropPartitionFieldExec dropPartitionFieldExec =
@@ -159,7 +164,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
} else if (plan instanceof ReplacePartitionField) {
ReplacePartitionField replacePartitionField = (ReplacePartitionField)
plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
- spark, replacePartitionField.table())
+ spark, replacePartitionField.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
ReplacePartitionFieldExec replacePartitionFieldExec =
@@ -175,7 +180,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
} else if (plan instanceof SetIdentifierFields) {
SetIdentifierFields setIdentifierFields = (SetIdentifierFields) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
- spark, setIdentifierFields.table())
+ spark, setIdentifierFields.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
SetIdentifierFieldsExec setIdentifierFieldsExec =
@@ -189,7 +194,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
} else if (plan instanceof DropIdentifierFields) {
DropIdentifierFields dropIdentifierFields = (DropIdentifierFields) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
- spark, dropIdentifierFields.table())
+ spark, dropIdentifierFields.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
DropIdentifierFieldsExec dropIdentifierFieldsExec =
@@ -204,7 +209,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
SetWriteDistributionAndOrdering setWriteDistributionAndOrdering =
(SetWriteDistributionAndOrdering) plan;
return IcebergCatalogAndIdentifier.buildCatalogAndIdentifier(
- spark, setWriteDistributionAndOrdering.table())
+ spark, setWriteDistributionAndOrdering.table().toIndexedSeq())
.map(
catalogAndIdentifier -> {
SetWriteDistributionAndOrderingExec
setWriteDistributionAndOrderingExec =
@@ -217,14 +222,18 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
})
.get();
} else {
- return super.apply(plan);
+ scala.collection.Seq<SparkPlan> sparkPlans = super.apply(plan);
+ if (sparkPlans != null) {
+ return sparkPlans.toIndexedSeq();
+ }
+ return null;
}
}
private Seq<SparkPlan> toSeq(SparkPlan plan) {
return
JavaConverters.asScalaIteratorConverter(Collections.singletonList(plan).listIterator())
.asScala()
- .toSeq();
+ .toIndexedSeq();
}
static class IcebergCatalogAndIdentifier {
@@ -244,7 +253,7 @@ public class IcebergExtendedDataSourceV2Strategy extends
ExtendedDataSourceV2Str
static Option<IcebergCatalogAndIdentifier> buildCatalogAndIdentifier(
SparkSession spark, Seq<String> identifiers) {
Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
- Spark3Util.catalogAndIdentifier(spark,
JavaConverters.<String>seqAsJavaList(identifiers));
+ Spark3Util.catalogAndIdentifier(spark, toJavaList(identifiers));
CatalogPlugin catalog = catalogAndIdentifier.catalog();
if (catalog instanceof GravitinoIcebergCatalog) {
return new Some<>(
diff --git
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
index 550333155..672369854 100644
---
a/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
+++
b/spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/utils/ConnectorUtil.java
@@ -19,11 +19,14 @@
package org.apache.gravitino.spark.connector.utils;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.spark.connector.ConnectorConstants;
+import scala.collection.immutable.Seq;
public class ConnectorUtil {
@@ -37,4 +40,13 @@ public class ConnectorUtil {
.reduce((element1, element2) -> element1 + ConnectorConstants.COMMA +
element2)
.orElse("");
}
+
+ public static List<String> toJavaList(Seq<String> seq) {
+ List<String> javaList = new ArrayList<>();
+ if (seq == null || seq.isEmpty()) {
+ return javaList;
+ }
+ seq.foreach(javaList::add);
+ return javaList;
+ }
}
diff --git
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
index d86cc686c..39c29a4a0 100644
---
a/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
+++
b/spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/integration/test/sql/SQLQueryTestHelper.java
@@ -18,6 +18,8 @@
*/
package org.apache.gravitino.spark.connector.integration.test.sql;
+import static
org.apache.gravitino.spark.connector.utils.ConnectorUtil.toJavaList;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -31,7 +33,6 @@ import org.apache.spark.sql.execution.HiveResult;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.types.StructType;
import scala.Option;
-import scala.collection.JavaConverters;
public class SQLQueryTestHelper {
private static final String notIncludedMsg = "[not included in comparison]";
@@ -62,8 +63,9 @@ public class SQLQueryTestHelper {
df.queryExecution(),
Option.apply(""),
() ->
- JavaConverters.seqAsJavaList(
-
HiveResult.hiveResultString(df.queryExecution().executedPlan()))
+ toJavaList(
+
HiveResult.hiveResultString(df.queryExecution().executedPlan())
+ .toIndexedSeq())
.stream()
.map(s -> replaceNotIncludedMsg(s))
.filter(s -> !s.isEmpty())