This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new f033cee04f [spark][infra] run spark integration tests in CI. (#5590) f033cee04f is described below commit f033cee04ff557102f3c6eb0cae4d1fefed09916 Author: Yujiang Zhong <42907416+zhongyuji...@users.noreply.github.com> AuthorDate: Fri Jul 25 16:28:00 2025 +0800 [spark][infra] run spark integration tests in CI. (#5590) --- .github/workflows/utitcase-spark-3.x.yml | 2 +- .github/workflows/utitcase-spark-4.x.yml | 2 +- .../table/source/snapshot/TimeTravelUtil.java | 2 + .../org/apache/paimon/spark/SparkReadITCase.java | 91 +++++++++++----------- .../org/apache/paimon/spark/SparkReadTestBase.java | 12 +++ .../paimon/spark/SparkSchemaEvolutionITCase.java | 71 +++++++++-------- .../apache/paimon/spark/SparkTimeTravelITCase.java | 3 +- .../org/apache/paimon/spark/RowTestHelper.scala | 53 +++++++++++++ 8 files changed, 154 insertions(+), 82 deletions(-) diff --git a/.github/workflows/utitcase-spark-3.x.yml b/.github/workflows/utitcase-spark-3.x.yml index de8b3deaeb..1a23b55b58 100644 --- a/.github/workflows/utitcase-spark-3.x.yml +++ b/.github/workflows/utitcase-spark-3.x.yml @@ -59,6 +59,6 @@ jobs: test_modules+="org.apache.paimon:paimon-spark-${suffix}," done test_modules="${test_modules%,}" - mvn -T 2C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone + mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/.github/workflows/utitcase-spark-4.x.yml b/.github/workflows/utitcase-spark-4.x.yml index 06495b6975..fd1365aabe 100644 --- a/.github/workflows/utitcase-spark-4.x.yml +++ b/.github/workflows/utitcase-spark-4.x.yml @@ -59,6 +59,6 @@ jobs: test_modules+="org.apache.paimon:paimon-spark-${suffix}," done test_modules="${test_modules%,}" - mvn -T 2C -B test -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1 + mvn -T 2C -B verify -pl "${test_modules}" -Duser.timezone=$jvm_timezone -Pspark4,flink1 env: MAVEN_OPTS: -Xmx4096m \ No newline at end of file diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 20802f961f..bdef244230 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -147,6 +147,8 @@ public class TimeTravelUtil { } else if (version.chars().allMatch(Character::isDigit)) { options.set(SCAN_SNAPSHOT_ID.key(), version); } else { + // by here, the scan version should be a tag. + options.set(SCAN_TAG_NAME.key(), version); throw new RuntimeException("Cannot find a time travel version for " + version); } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java index a96954dcdf..25d7a39622 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadITCase.java @@ -592,8 +592,9 @@ public class SparkReadITCase extends SparkReadTestBase { Dataset<Row> dataset = spark.read().format("paimon").load(tablePath.toString()); assertThat(dataset.select("order_id", "buyer_id", "dt").collectAsList().toString()) .isEqualTo("[[1,10,2022-07-20]]"); - assertThat(dataset.select("coupon_info").collectAsList().toString()) - .isEqualTo("[[WrappedArray(loyalty_discount, shipping_discount)]]"); + + RowTestHelper.checkRowEquals( + dataset.select("coupon_info"), row(array("loyalty_discount", "shipping_discount"))); // test drop table assertThat( @@ -647,37 +648,38 @@ public class SparkReadITCase extends SparkReadTestBase { } private void innerTestNestedType(Dataset<Row> dataset) { - List<Row> results = dataset.collectAsList(); - assertThat(results.toString()) - .isEqualTo( - "[[1,WrappedArray(AAA, BBB),[[1.0,WrappedArray(null)],1]], " - + "[2,WrappedArray(CCC, DDD),[[null,WrappedArray(true)],null]], " - + "[3,WrappedArray(null, null),[[2.0,WrappedArray(true, false)],2]], " - + "[4,WrappedArray(null, EEE),[[3.0,WrappedArray(true, false, true)],3]]]"); + RowTestHelper.checkRowEquals( + dataset, + Arrays.asList( + row(1, array("AAA", "BBB"), row(row(1.0, array(null)), 1L)), + row(2, array("CCC", "DDD"), row(row(null, array(true)), null)), + row(3, array(null, null), row(row(2.0, array(true, false)), 2L)), + row(4, array(null, "EEE"), row(row(3.0, array(true, false, true)), 3L)))); - results = dataset.select("a").collectAsList(); - assertThat(results.toString()).isEqualTo("[[1], [2], [3], [4]]"); + RowTestHelper.checkRowEquals( + dataset.select("a"), Arrays.asList(row(1), row(2), row(3), row(4))); - results = dataset.select("c.c1").collectAsList(); - assertThat(results.toString()) - .isEqualTo( - "[[[1.0,WrappedArray(null)]], [[null,WrappedArray(true)]], " - + "[[2.0,WrappedArray(true, false)]], " - + "[[3.0,WrappedArray(true, false, true)]]]"); + RowTestHelper.checkRowEquals( + dataset.select("c.c1"), + Arrays.asList( + row(row(1.0, array(null))), + row(row(null, array(true))), + row(row(2.0, array(true, false))), + row(row(3.0, array(true, false, true))))); - results = dataset.select("c.c2").collectAsList(); - assertThat(results.toString()).isEqualTo("[[1], [null], [2], [3]]"); + RowTestHelper.checkRowEquals( + dataset.select("c.c2"), Arrays.asList(row(1), row(null), row(2), row(3))); - results = dataset.select("c.c1.c11").collectAsList(); - assertThat(results.toString()).isEqualTo("[[1.0], [null], [2.0], [3.0]]"); + RowTestHelper.checkRowEquals( + dataset.select("c.c1.c11"), Arrays.asList(row(1.0), row(null), row(2.0), row(3.0))); - results = dataset.select("c.c1.c12").collectAsList(); - assertThat(results.toString()) - .isEqualTo( - "[[WrappedArray(null)], " - + "[WrappedArray(true)], " - + "[WrappedArray(true, false)], " - + "[WrappedArray(true, false, true)]]"); + RowTestHelper.checkRowEquals( + dataset.select("c.c1.c12"), + Arrays.asList( + row(array(null)), + row(array(true)), + row(array(true, false)), + row(array(true, false, true)))); } private void innerTestSimpleTypeFilterPushDown(Dataset<Row> dataset) { @@ -689,28 +691,27 @@ public class SparkReadITCase extends SparkReadTestBase { } private void innerTestNestedTypeFilterPushDown(Dataset<Row> dataset) { - List<Row> results = dataset.filter("a < 4").select("a").collectAsList(); - assertThat(results.toString()).isEqualTo("[[1], [2], [3]]"); + RowTestHelper.checkRowEquals( + dataset.filter("a < 4").select("a"), Arrays.asList(row(1), row(2), row(3))); - results = dataset.filter("array_contains(b, 'AAA')").select("b").collectAsList(); - assertThat(results.toString()).isEqualTo("[[WrappedArray(AAA, BBB)]]"); + RowTestHelper.checkRowEquals( + dataset.filter("array_contains(b, 'AAA')").select("b"), row(array("AAA", "BBB"))); - results = dataset.filter("c.c1.c11 is null").select("a", "c").collectAsList(); - assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]"); + RowTestHelper.checkRowEquals( + dataset.filter("c.c1.c11 is null").select("a", "c"), + row(2, row(row(null, array(true)), null))); - results = dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1").collectAsList(); - assertThat(results.toString()).isEqualTo("[[1,[1.0,WrappedArray(null)]]]"); + RowTestHelper.checkRowEquals( + dataset.filter("c.c1.c11 = 1.0").select("a", "c.c1"), + row(1, row(1.0, array(null)))); - results = dataset.filter("c.c2 is null").select("a", "c").collectAsList(); - assertThat(results.toString()).isEqualTo("[[2,[[null,WrappedArray(true)],null]]]"); + RowTestHelper.checkRowEquals( + dataset.filter("c.c2 is null").select("a", "c"), + row(2, row(row(null, array(true)), null))); - results = - dataset.filter("array_contains(c.c1.c12, false)") - .select("a", "c.c1.c12", "c.c2") - .collectAsList(); - assertThat(results.toString()) - .isEqualTo( - "[[3,WrappedArray(true, false),2], [4,WrappedArray(true, false, true),3]]"); + RowTestHelper.checkRowEquals( + dataset.filter("array_contains(c.c1.c12, false)").select("a", "c.c1.c12", "c.c2"), + Arrays.asList(row(3, array(true, false), 2), row(4, array(true, false, true), 3))); } @Test diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java index a4983325c4..27a7557ab7 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java @@ -51,6 +51,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import scala.collection.Seq; + import static org.assertj.core.api.Assertions.assertThat; /** Base tests for spark read. */ @@ -251,4 +253,14 @@ public abstract class SparkReadTestBase { protected String defaultShowCreateStringWithNonNullColumn(String table) { return showCreateString(table, "a INT NOT NULL", "b BIGINT NOT NULL", "c STRING"); } + + protected static Row row(Object... values) { + Object[] array = values != null ? values : new Object[] {null}; + return RowTestHelper.row(array); + } + + protected static Seq array(Object... values) { + Object[] array = values != null ? values : new Object[] {null}; + return RowTestHelper.seq(array); + } } diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java index 933658b004..ea837960cc 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -183,10 +184,16 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { Dataset<Row> table = spark.table("testRenameColumn"); results = table.select("bb", "c").collectAsList(); assertThat(results.toString()).isEqualTo("[[2,1], [6,3]]"); + assertThatThrownBy(() -> table.select("b", "c")) .isInstanceOf(AnalysisException.class) + // Messages vary across different Spark versions, only validating the common parts. + // Spark 4: A column, variable, or function parameter with name `b` cannot be + // resolved. Did you mean one of the following? [`a`, `bb`, `c`] + // Spark 3.5 and earlier versions: A column or function parameter with name `b` + // cannot be resolved. Did you mean one of the following? [`a`, `bb`, `c`] .hasMessageContaining( - "A column or function parameter with name `b` cannot be resolved. Did you mean one of the following?"); + "name `b` cannot be resolved. Did you mean one of the following? [`a`, `bb`, `c`]"); } @Test @@ -388,13 +395,15 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { "Cannot move itself for column b")); // missing column + // Messages vary across different Spark versions and there are no common parts, only + // validate the exception class createTable("tableMissing"); assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissing ALTER COLUMN d FIRST")) - .hasMessageContaining("Missing field d in table paimon.default.tableMissing"); + .isInstanceOf(AnalysisException.class); createTable("tableMissingAfter"); assertThatThrownBy(() -> spark.sql("ALTER TABLE tableMissingAfter ALTER COLUMN a AFTER d")) - .hasMessageContaining("Missing field d in table paimon.default.tableMissingAfter"); + .isInstanceOf(AnalysisException.class); } @Test @@ -806,13 +815,12 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + tableName + " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), " + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))"); - assertThat( - spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - "[1,WrappedArray([apple,100], [banana,101])]", - "[2,WrappedArray([cat,200], [dog,201])]"); + + RowTestHelper.checkRowEquals( + spark.sql("SELECT * FROM paimon.default." + tableName), + Arrays.asList( + row(1, array(row("apple", 100), row("banana", 101))), + row(2, array(row("cat", 200), row("dog", 201))))); spark.sql( "ALTER TABLE paimon.default." @@ -824,14 +832,13 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + tableName + " VALUES (1, ARRAY(STRUCT(110, 'APPLE'), STRUCT(111, 'BANANA'))), " + "(3, ARRAY(STRUCT(310, 'FLOWER')))"); - assertThat( - spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - "[1,WrappedArray([110,APPLE], [111,BANANA])]", - "[2,WrappedArray([200,null], [201,null])]", - "[3,WrappedArray([310,FLOWER])]"); + + RowTestHelper.checkRowEquals( + spark.sql("SELECT * FROM paimon.default." + tableName), + Arrays.asList( + row(1, array(row(110, "APPLE"), row(111, "BANANA"))), + row(2, array(row(200, null), row(201, null))), + row(3, array(row(310, "FLOWER"))))); } @ParameterizedTest() @@ -1012,13 +1019,12 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + tableName + " VALUES (1, ARRAY(STRUCT('apple', 100), STRUCT('banana', 101))), " + "(2, ARRAY(STRUCT('cat', 200), STRUCT('dog', 201)))"); - assertThat( - spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - "[1,WrappedArray([apple,100], [banana,101])]", - "[2,WrappedArray([cat,200], [dog,201])]"); + + RowTestHelper.checkRowEquals( + spark.sql("SELECT * FROM paimon.default." + tableName), + Arrays.asList( + row(1, array(row("apple", 100), row("banana", 101))), + row(2, array(row("cat", 200), row("dog", 201))))); spark.sql( "ALTER TABLE paimon.default." @@ -1029,14 +1035,13 @@ public class SparkSchemaEvolutionITCase extends SparkReadTestBase { + tableName + " VALUES (1, ARRAY(STRUCT('APPLE', 1000000000000), STRUCT('BANANA', 111))), " + "(3, ARRAY(STRUCT('FLOWER', 3000000000000)))"); - assertThat( - spark.sql("SELECT * FROM paimon.default." + tableName).collectAsList() - .stream() - .map(Row::toString)) - .containsExactlyInAnyOrder( - "[1,WrappedArray([APPLE,1000000000000], [BANANA,111])]", - "[2,WrappedArray([cat,200], [dog,201])]", - "[3,WrappedArray([FLOWER,3000000000000])]"); + + RowTestHelper.checkRowEquals( + spark.sql("SELECT * FROM paimon.default." + tableName), + Arrays.asList( + row(1, array(row("APPLE", 1000000000000L), row("BANANA", 111))), + row(2, array(row("cat", 200), row("dog", 201))), + row(3, array(row("FLOWER", 3000000000000L))))); } @ParameterizedTest() diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java index 8e13cb18f5..d08142995f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java @@ -245,8 +245,7 @@ public class SparkTimeTravelITCase extends SparkReadTestBase { () -> spark.sql("SELECT * FROM t VERSION AS OF 'unknown'").collectAsList()) .satisfies( anyCauseMatches( - RuntimeException.class, - "Cannot find a time travel version for unknown")); + IllegalArgumentException.class, "Tag 'unknown' doesn't exist")); } @Test diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala new file mode 100644 index 0000000000..bfab0ee7c9 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/RowTestHelper.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.QueryTest.checkAnswer +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession + +import scala.collection.JavaConverters._ + +/** + * A helper class for facilitating the comparison of Spark Row objects in Java unit tests, which + * leverages QueryTest.checkAnswer for the comparison. + */ +class RowTestHelper extends QueryTest { + override protected def spark: SparkSession = { + throw new UnsupportedOperationException("Not supported") + } +} + +object RowTestHelper { + def checkRowEquals(df: DataFrame, expectedRows: java.util.List[Row]): Unit = { + checkAnswer(df, expectedRows) + } + + def checkRowEquals(df: DataFrame, expectedRow: Row): Unit = { + checkAnswer(df, Seq(expectedRow)) + } + + def row(values: Array[Any]): Row = { + Row.fromSeq(values) + } + + def seq(values: Array[Any]): Seq[Any] = values.toSeq +}