This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f033cee04f [spark][infra] run spark integration tests in CI. (#5590)
f033cee04f is described below

commit f033cee04ff557102f3c6eb0cae4d1fefed09916
Author: Yujiang Zhong <42907416+zhongyuji...@users.noreply.github.com>
AuthorDate: Fri Jul 25 16:28:00 2025 +0800

    [spark][infra] run spark integration tests in CI. (#5590)
---
 .github/workflows/utitcase-spark-3.x.yml           |  2 +-
 .github/workflows/utitcase-spark-4.x.yml           |  2 +-
 .../table/source/snapshot/TimeTravelUtil.java      |  2 +
 .../org/apache/paimon/spark/SparkReadITCase.java   | 91 +++++++++++-----------
 .../org/apache/paimon/spark/SparkReadTestBase.java | 12 +++
 .../paimon/spark/SparkSchemaEvolutionITCase.java   | 71 +++++++++--------
 .../apache/paimon/spark/SparkTimeTravelITCase.java |  3 +-
 .../org/apache/paimon/spark/RowTestHelper.scala    | 53 +++++++++++++
 8 files changed, 154 insertions(+), 82 deletions(-)

diff --git a/.github/workflows/utitcase-spark-3.x.yml 
b/.github/workflows/utitcase-spark-3.x.yml
index de8b3deaeb..1a23b55b58 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -59,6 +59,6 @@ jobs:
           test_modules+="org.apache.paimon:paimon-spark-${suffix},"
           done
           test_modules="${test_modules%,}"
-          mvn -T 2C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone
+          mvn -T 2C -B verify -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git a/.github/workflows/utitcase-spark-4.x.yml 
b/.github/workflows/utitcase-spark-4.x.yml
index 06495b6975..fd1365aabe 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -59,6 +59,6 @@ jobs:
           test_modules+="org.apache.paimon:paimon-spark-${suffix},"
           done
           test_modules="${test_modules%,}"
-          mvn -T 2C -B test -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone -Pspark4,flink1
+          mvn -T 2C -B verify -pl "${test_modules}" 
-Duser.timezone=$jvm_timezone -Pspark4,flink1
         env:
           MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 20802f961f..bdef244230 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -147,6 +147,8 @@ public class TimeTravelUtil {
         } else if (version.chars().allMatch(Character::isDigit)) {
             options.set(SCAN_SNAPSHOT_ID.key(), version);
         } else {
+            // by here, the scan version should be a tag.
+            options.set(SCAN_TAG_NAME.key(), version);
             throw new RuntimeException("Cannot find a time travel version for 
" + version);
         }
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index a96954dcdf..25d7a39622 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -592,8 +592,9 @@ public class SparkReadITCase extends SparkReadTestBase {
         Dataset<Row> dataset = 
spark.read().format("paimon").load(tablePath.toString());
         assertThat(dataset.select("order_id", "buyer_id", 
"dt").collectAsList().toString())
                 .isEqualTo("[[1,10,2022-07-20]]");
-        assertThat(dataset.select("coupon_info").collectAsList().toString())
-                .isEqualTo("[[WrappedArray(loyalty_discount, 
shipping_discount)]]");
+
+        RowTestHelper.checkRowEquals(
+                dataset.select("coupon_info"), row(array("loyalty_discount", 
"shipping_discount")));
 
         // test drop table
         assertThat(
@@ -647,37 +648,38 @@ public class SparkReadITCase extends SparkReadTestBase {
     }
 
     private void innerTestNestedType(Dataset<Row> dataset) {
-        List<Row> results = dataset.collectAsList();
-        assertThat(results.toString())
-                .isEqualTo(
-                        "[[1,WrappedArray(AAA, 
BBB),[[1.0,WrappedArray(null)],1]], "
-                                + "[2,WrappedArray(CCC, 
DDD),[[null,WrappedArray(true)],null]], "
-                                + "[3,WrappedArray(null, 
null),[[2.0,WrappedArray(true, false)],2]], "
-                                + "[4,WrappedArray(null, 
EEE),[[3.0,WrappedArray(true, false, true)],3]]]");
+        RowTestHelper.checkRowEquals(
+                dataset,
+                Arrays.asList(
+                        row(1, array("AAA", "BBB"), row(row(1.0, array(null)), 
1L)),
+                        row(2, array("CCC", "DDD"), row(row(null, 
array(true)), null)),
+                        row(3, array(null, null), row(row(2.0, array(true, 
false)), 2L)),
+                        row(4, array(null, "EEE"), row(row(3.0, array(true, 
false, true)), 3L))));
 
-        results = dataset.select("a").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1], [2], [3], [4]]");
+        RowTestHelper.checkRowEquals(
+                dataset.select("a"), Arrays.asList(row(1), row(2), row(3), 
row(4)));
 
-        results = dataset.select("c.c1").collectAsList();
-        assertThat(results.toString())
-                .isEqualTo(
-                        "[[[1.0,WrappedArray(null)]], 
[[null,WrappedArray(true)]], "
-                                + "[[2.0,WrappedArray(true, false)]], "
-                                + "[[3.0,WrappedArray(true, false, true)]]]");
+        RowTestHelper.checkRowEquals(
+                dataset.select("c.c1"),
+                Arrays.asList(
+                        row(row(1.0, array(null))),
+                        row(row(null, array(true))),
+                        row(row(2.0, array(true, false))),
+                        row(row(3.0, array(true, false, true)))));
 
-        results = dataset.select("c.c2").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1], [null], [2], [3]]");
+        RowTestHelper.checkRowEquals(
+                dataset.select("c.c2"), Arrays.asList(row(1), row(null), 
row(2), row(3)));
 
-        results = dataset.select("c.c1.c11").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1.0], [null], [2.0], 
[3.0]]");
+        RowTestHelper.checkRowEquals(
+                dataset.select("c.c1.c11"), Arrays.asList(row(1.0), row(null), 
row(2.0), row(3.0)));
 
-        results = dataset.select("c.c1.c12").collectAsList();
-        assertThat(results.toString())
-                .isEqualTo(
-                        "[[WrappedArray(null)], "
-                                + "[WrappedArray(true)], "
-                                + "[WrappedArray(true, false)], "
-                                + "[WrappedArray(true, false, true)]]");
+        RowTestHelper.checkRowEquals(
+                dataset.select("c.c1.c12"),
+                Arrays.asList(
+                        row(array(null)),
+                        row(array(true)),
+                        row(array(true, false)),
+                        row(array(true, false, true))));
     }
 
     private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) {
@@ -689,28 +691,27 @@ public class SparkReadITCase extends SparkReadTestBase {
     }
 
     private void innerTestNestedTypeFilterPushDown(Dataset<Row> dataset) {
-        List<Row> results = dataset.filter("a < 
4").select("a").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[1], [2], [3]]");
+        RowTestHelper.checkRowEquals(
+                dataset.filter("a < 4").select("a"), Arrays.asList(row(1), 
row(2), row(3)));
 
-        results = dataset.filter("array_contains(b, 
'AAA')").select("b").collectAsList();
-        assertThat(results.toString()).isEqualTo("[[WrappedArray(AAA, BBB)]]");
+        RowTestHelper.checkRowEquals(
+                dataset.filter("array_contains(b, 'AAA')").select("b"), 
row(array("AAA", "BBB")));
 
-        results = dataset.filter("c.c1.c11 is null").select("a", 
"c").collectAsList();
-        
assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+        RowTestHelper.checkRowEquals(
+                dataset.filter("c.c1.c11 is null").select("a", "c"),
+                row(2, row(row(null, array(true)), null)));
 
-        results = dataset.filter("c.c1.c11 = 1.0").select("a", 
"c.c1").collectAsList();
-        
assertThat(results.toString()).isEqualTo("[[1,[1.0,WrappedArray(null)]]]");
+        RowTestHelper.checkRowEquals(
+                dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1"),
+                row(1, row(1.0, array(null))));
 
-        results = dataset.filter("c.c2 is null").select("a", 
"c").collectAsList();
-        
assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]");
+        RowTestHelper.checkRowEquals(
+                dataset.filter("c.c2 is null").select("a", "c"),
+                row(2, row(row(null, array(true)), null)));
 
-        results =
-                dataset.filter("array_contains(c.c1.c12, false)")
-                        .select("a", "c.c1.c12", "c.c2")
-                        .collectAsList();
-        assertThat(results.toString())
-                .isEqualTo(
-                        "[[3,WrappedArray(true, false),2], 
[4,WrappedArray(true, false, true),3]]");
+        RowTestHelper.checkRowEquals(
+                dataset.filter("array_contains(c.c1.c12, false)").select("a", 
"c.c1.c12", "c.c2"),
+                Arrays.asList(row(3, array(true, false), 2), row(4, 
array(true, false, true), 3)));
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index a4983325c4..27a7557ab7 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -51,6 +51,8 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import scala.collection.Seq;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base tests for spark read. */
@@ -251,4 +253,14 @@ public abstract class SparkReadTestBase {
     protected String defaultShowCreateStringWithNonNullColumn(String table) {
         return showCreateString(table, "a INT NOT NULL", "b BIGINT NOT NULL", 
"c STRING");
     }
+
+    protected static Row row(Object... values) {
+        Object[] array = values != null ? values : new Object[] {null};
+        return RowTestHelper.row(array);
+    }
+
+    protected static Seq array(Object... values) {
+        Object[] array = values != null ? values : new Object[] {null};
+        return RowTestHelper.seq(array);
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 933658b004..ea837960cc 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -183,10 +184,16 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         Dataset<Row> table = spark.table("testRenameColumn");
         results = table.select("bb", "c").collectAsList();
         assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]");
+
         assertThatThrownBy(() -> table.select("b", "c"))
                 .isInstanceOf(AnalysisException.class)
+                // Messages vary across different Spark versions, only 
validating the common parts.
+                // Spark 4: A column, variable, or function parameter with 
name `b` cannot be
+                // resolved. Did you mean one of the following? [`a`, `bb`, 
`c`]
+                // Spark 3.5 and earlier versions: A column or function 
parameter with name `b`
+                // cannot be resolved. Did you mean one of the following? 
[`a`, `bb`, `c`]
                 .hasMessageContaining(
-                        "A column or function parameter with name `b` cannot 
be resolved. Did you mean one of the following?");
+                        "name `b` cannot be resolved. Did you mean one of the 
following? [`a`, `bb`, `c`]");
     }
 
     @Test
@@ -388,13 +395,15 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                                 "Cannot move itself for column b"));
 
         // missing column
+        // Messages vary across different Spark versions and there are no 
common parts, only
+        // validate the exception class
         createTable("tableMissing");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER 
COLUMN d FIRST"))
-                .hasMessageContaining("Missing field d in table 
paimon.default.tableMissing");
+                .isInstanceOf(AnalysisException.class);
 
         createTable("tableMissingAfter");
         assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter 
ALTER COLUMN a AFTER d"))
-                .hasMessageContaining("Missing field d in table 
paimon.default.tableMissingAfter");
+                .isInstanceOf(AnalysisException.class);
     }
 
     @Test
@@ -806,13 +815,12 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         + tableName
                         + " VALUES (1, ARRAY(STRUCT('apple', 100), 
STRUCT('banana', 101))), "
                         + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 
201)))");
-        assertThat(
-                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
-                                .stream()
-                                .map(Row::toString))
-                .containsExactlyInAnyOrder(
-                        "[1,WrappedArray([apple,100], [banana,101])]",
-                        "[2,WrappedArray([cat,200], [dog,201])]");
+
+        RowTestHelper.checkRowEquals(
+                spark.sql("SELECT * FROM paimon.default." + tableName),
+                Arrays.asList(
+                        row(1, array(row("apple", 100), row("banana", 101))),
+                        row(2, array(row("cat", 200), row("dog", 201)))));
 
         spark.sql(
                 "ALTER TABLE paimon.default."
@@ -824,14 +832,13 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         + tableName
                         + " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111, 
'BANANA'))), "
                         + "(3, ARRAY(STRUCT(310, 'FLOWER')))");
-        assertThat(
-                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
-                                .stream()
-                                .map(Row::toString))
-                .containsExactlyInAnyOrder(
-                        "[1,WrappedArray([110,APPLE], [111,BANANA])]",
-                        "[2,WrappedArray([200,null], [201,null])]",
-                        "[3,WrappedArray([310,FLOWER])]");
+
+        RowTestHelper.checkRowEquals(
+                spark.sql("SELECT * FROM paimon.default." + tableName),
+                Arrays.asList(
+                        row(1, array(row(110, "APPLE"), row(111, "BANANA"))),
+                        row(2, array(row(200, null), row(201, null))),
+                        row(3, array(row(310, "FLOWER")))));
     }
 
     @ParameterizedTest()
@@ -1012,13 +1019,12 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         + tableName
                         + " VALUES (1, ARRAY(STRUCT('apple', 100), 
STRUCT('banana', 101))), "
                         + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 
201)))");
-        assertThat(
-                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
-                                .stream()
-                                .map(Row::toString))
-                .containsExactlyInAnyOrder(
-                        "[1,WrappedArray([apple,100], [banana,101])]",
-                        "[2,WrappedArray([cat,200], [dog,201])]");
+
+        RowTestHelper.checkRowEquals(
+                spark.sql("SELECT * FROM paimon.default." + tableName),
+                Arrays.asList(
+                        row(1, array(row("apple", 100), row("banana", 101))),
+                        row(2, array(row("cat", 200), row("dog", 201)))));
 
         spark.sql(
                 "ALTER TABLE paimon.default."
@@ -1029,14 +1035,13 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         + tableName
                         + " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000), 
STRUCT('BANANA', 111))), "
                         + "(3, ARRAY(STRUCT('FLOWER', 3000000000000)))");
-        assertThat(
-                        spark.sql("SELECT * FROM paimon.default." + 
tableName).collectAsList()
-                                .stream()
-                                .map(Row::toString))
-                .containsExactlyInAnyOrder(
-                        "[1,WrappedArray([APPLE,1000000000000], 
[BANANA,111])]",
-                        "[2,WrappedArray([cat,200], [dog,201])]",
-                        "[3,WrappedArray([FLOWER,3000000000000])]");
+
+        RowTestHelper.checkRowEquals(
+                spark.sql("SELECT * FROM paimon.default." + tableName),
+                Arrays.asList(
+                        row(1, array(row("APPLE", 1000000000000L), 
row("BANANA", 111))),
+                        row(2, array(row("cat", 200), row("dog", 201))),
+                        row(3, array(row("FLOWER", 3000000000000L)))));
     }
 
     @ParameterizedTest()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
index 8e13cb18f5..d08142995f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java
@@ -245,8 +245,7 @@ public class SparkTimeTravelITCase extends 
SparkReadTestBase {
                         () -> spark.sql("SELECT * FROM t VERSION AS OF 
'unknown'").collectAsList())
                 .satisfies(
                         anyCauseMatches(
-                                RuntimeException.class,
-                                "Cannot find a time travel version for 
unknown"));
+                                IllegalArgumentException.class, "Tag 'unknown' 
doesn't exist"));
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala
new file mode 100644
index 0000000000..bfab0ee7c9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.QueryTest.checkAnswer
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.JavaConverters._
+
+/**
+ * A helper class for facilitating the comparison of Spark Row objects in Java 
unit tests, which
+ * leverages QueryTest.checkAnswer for the comparison.
+ */
+class RowTestHelper extends QueryTest {
+  override protected def spark: SparkSession = {
+    throw new UnsupportedOperationException("Not supported")
+  }
+}
+
+object RowTestHelper {
+  def checkRowEquals(df: DataFrame, expectedRows: java.util.List[Row]): Unit = 
{
+    checkAnswer(df, expectedRows)
+  }
+
+  def checkRowEquals(df: DataFrame, expectedRow: Row): Unit = {
+    checkAnswer(df, Seq(expectedRow))
+  }
+
+  def row(values: Array[Any]): Row = {
+    Row.fromSeq(values)
+  }
+
+  def seq(values: Array[Any]): Seq[Any] = values.toSeq
+}

Reply via email to