This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7d8082193 [spark] convert non-null to nullable directly (#1462)
7d8082193 is described below
commit 7d80821936f297d1265be49e623ba08994dd29c8
Author: Yann Byron <[email protected]>
AuthorDate: Fri Jun 30 20:13:47 2023 +0800
[spark] convert non-null to nullable directly (#1462)
---
.../org/apache/paimon/spark/SparkTypeTest.java | 6 +--
.../org/apache/paimon/spark/SparkTypeUtils.java | 7 ++-
.../org/apache/paimon/spark/SparkReadITCase.java | 12 ++----
.../org/apache/paimon/spark/SparkReadTestBase.java | 2 +-
.../paimon/spark/SparkSchemaEvolutionITCase.java | 50 +++++++++-------------
.../org/apache/paimon/spark/SparkTypeTest.java | 10 ++---
.../org/apache/paimon/spark/SparkWriteITCase.java | 14 ++++++
7 files changed, 52 insertions(+), 49 deletions(-)
diff --git
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index f5cbf8db9..c3e5658e5 100644
---
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -69,12 +69,12 @@ public class SparkTypeTest {
String nestedRowMapType =
"StructField(locations,MapType("
+ "StringType,"
- + "StructType(StructField(posX,DoubleType,false),
StructField(posY,DoubleType,false)),true),true)";
+ + "StructType(StructField(posX,DoubleType,true),
StructField(posY,DoubleType,true)),true),true)";
String expected =
"StructType("
- + "StructField(id,IntegerType,false), "
+ + "StructField(id,IntegerType,true), "
+ "StructField(name,StringType,true), "
- + "StructField(salary,DoubleType,false), "
+ + "StructField(salary,DoubleType,true), "
+ nestedRowMapType
+ ", "
+
"StructField(strArray,ArrayType(StringType,true),true), "
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index 75d39c22c..c127a053b 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -168,13 +168,16 @@ public class SparkTypeUtils {
mapType.getValueType().isNullable());
}
+ /**
+ * For simplicity, as a temporary solution, we directly convert the
non-null attribute to
+ * nullable on the Spark side.
+ */
@Override
public DataType visit(RowType rowType) {
List<StructField> fields = new
ArrayList<>(rowType.getFieldCount());
for (DataField field : rowType.getFields()) {
StructField structField =
- DataTypes.createStructField(
- field.name(), field.type().accept(this),
field.type().isNullable());
+ DataTypes.createStructField(field.name(),
field.type().accept(this), true);
structField =
Optional.ofNullable(field.description())
.map(structField::withComment)
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 1ffb0d9ab..3d51e3933 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -223,8 +223,7 @@ public class SparkReadITCase extends SparkReadTestBase {
.isEqualTo(
String.format(
"[[%sTBLPROPERTIES (\n 'path' = '%s')\n]]",
- showCreateString(
- "t_pk_as", "a BIGINT NOT NULL", "b
STRING", "c STRING"),
+ showCreateString("t_pk_as", "a BIGINT", "b
STRING", "c STRING"),
new Path(warehousePath,
"default.db/t_pk_as")));
List<Row> resultPk = spark.sql("SELECT * FROM
t_pk_as").collectAsList();
@@ -257,8 +256,8 @@ public class SparkReadITCase extends SparkReadTestBase {
"user_id BIGINT",
"item_id BIGINT",
"behavior STRING",
- "dt STRING NOT NULL",
- "hh STRING NOT NULL"),
+ "dt STRING",
+ "hh STRING"),
new Path(warehousePath,
"default.db/t_all_as")));
List<Row> resultAll = spark.sql("SELECT * FROM
t_all_as").collectAsList();
assertThat(resultAll.stream().map(Row::toString))
@@ -336,10 +335,7 @@ public class SparkReadITCase extends SparkReadTestBase {
+ "TBLPROPERTIES (\n"
+ " 'k1' = 'v1',\n"
+ " 'path' = '%s')\n]]",
- showCreateString(
- "tbl",
- "a INT NOT NULL COMMENT 'a comment'",
- "b STRING NOT NULL"),
+ showCreateString("tbl", "a INT COMMENT 'a
comment'", "b STRING"),
new Path(warehousePath, "default.db/tbl")));
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 128401b71..fb7f151eb 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -203,6 +203,6 @@ public abstract class SparkReadTestBase {
// default schema
protected String defaultShowCreateString(String table) {
- return showCreateString(table, "a INT NOT NULL", "b BIGINT", "c
STRING");
+ return showCreateString(table, "a INT", "b BIGINT", "c STRING");
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 770dd2195..7ed2ed2eb 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -34,7 +34,11 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** ITCase for schema evolution in spark. */
+/**
+ * ITCase for schema eqvolution in spark.
+ *
+ * <p>NOTICE: For these tests, we remove
+ */
public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
@Test
@@ -76,11 +80,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
assertThat(afterAdd.toString())
.contains(
showCreateString(
- "testAddColumn",
- "a INT NOT NULL",
- "b BIGINT",
- "c STRING",
- "d STRING"));
+ "testAddColumn", "a INT", "b BIGINT", "c
STRING", "d STRING"));
assertThat(spark.table("testAddColumn").collectAsList().toString())
.isEqualTo("[[1,2,1,null], [5,6,3,null]]");
@@ -113,7 +113,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
showCreateString(
"testAddColumnPositionFirst",
"d INT",
- "a INT NOT NULL",
+ "a INT",
"b BIGINT",
"c STRING"));
@@ -124,7 +124,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
.contains(
showCreateString(
"testAddColumnPositionAfter",
- "a INT NOT NULL",
+ "a INT",
"b BIGINT",
"d INT",
"c STRING"));
@@ -168,9 +168,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
spark.sql("ALTER TABLE testRenameColumn RENAME COLUMN a to aa");
List<Row> afterRename = spark.sql("SHOW CREATE TABLE
testRenameColumn").collectAsList();
assertThat(afterRename.toString())
- .contains(
- showCreateString(
- "testRenameColumn", "aa INT NOT NULL", "b
BIGINT", "c STRING"));
+ .contains(showCreateString("testRenameColumn", "aa INT", "b
BIGINT", "c STRING"));
Dataset<Row> table = spark.table("testRenameColumn");
results = table.select("aa", "c").collectAsList();
assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
@@ -260,9 +258,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE
testDropPrimaryKey").collectAsList();
assertThat(beforeDrop.toString())
- .contains(
- showCreateString(
- "testDropPrimaryKey", "a BIGINT NOT NULL", "b
STRING NOT NULL"));
+ .contains(showCreateString("testDropPrimaryKey", "a BIGINT",
"b STRING"));
assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPrimaryKey
DROP COLUMN b"))
.isInstanceOf(RuntimeException.class)
@@ -277,26 +273,22 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
spark.sql("ALTER TABLE tableFirst ALTER COLUMN b FIRST");
List<Row> result = spark.sql("SHOW CREATE TABLE
tableFirst").collectAsList();
assertThat(result.toString())
- .contains(showCreateString("tableFirst", "b BIGINT", "a INT
NOT NULL", "c STRING"));
+ .contains(showCreateString("tableFirst", "b BIGINT", "a INT",
"c STRING"));
// move after
createTable("tableAfter");
spark.sql("ALTER TABLE tableAfter ALTER COLUMN c AFTER a");
result = spark.sql("SHOW CREATE TABLE tableAfter").collectAsList();
assertThat(result.toString())
- .contains(showCreateString("tableAfter", "a INT NOT NULL", "c
STRING", "b BIGINT"));
+ .contains(showCreateString("tableAfter", "a INT", "c STRING",
"b BIGINT"));
- spark.sql("CREATE TABLE tableAfter1 (a INT NOT NULL, b BIGINT, c
STRING, d DOUBLE)");
+ spark.sql("CREATE TABLE tableAfter1 (a INT, b BIGINT, c STRING, d
DOUBLE)");
spark.sql("ALTER TABLE tableAfter1 ALTER COLUMN b AFTER c");
result = spark.sql("SHOW CREATE TABLE tableAfter1").collectAsList();
assertThat(result.toString())
.contains(
showCreateString(
- "tableAfter1",
- "a INT NOT NULL",
- "c STRING",
- "b BIGINT",
- "d DOUBLE"));
+ "tableAfter1", "a INT", "c STRING", "b
BIGINT", "d DOUBLE"));
// move self to first test
createTable("tableFirstSelf");
@@ -338,9 +330,7 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
List<Row> afterAlter = spark.sql("SHOW CREATE TABLE
testAlterColumnType").collectAsList();
assertThat(afterAlter.toString())
- .contains(
- showCreateString(
- "testAlterColumnType", "a INT NOT NULL", "b
DOUBLE", "c STRING"));
+ .contains(showCreateString("testAlterColumnType", "a INT", "b
DOUBLE", "c STRING"));
}
@Test
@@ -449,12 +439,12 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
// Create table with fields [a, b, c] and insert 2 records
spark.sql(
"CREATE TABLE testSchemaEvolution(\n"
- + "a INT NOT NULL, \n"
- + "b BIGINT NOT NULL, \n"
+ + "a INT, \n"
+ + "b BIGINT, \n"
+ "c VARCHAR(10), \n"
- + "d INT NOT NULL, \n"
- + "e INT NOT NULL, \n"
- + "f INT NOT NULL) \n"
+ + "d INT, \n"
+ + "e INT, \n"
+ + "f INT) \n"
+ "TBLPROPERTIES ('file.format'='avro')");
writeTable("testSchemaEvolution", "(1, 2L, '3', 4, 5, 6)", "(7, 8L,
'9', 10, 11, 12)");
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index d72c4415f..670099045 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.paimon.spark.SparkTypeUtils.fromPaimonRowType;
-import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link SparkTypeUtils}. */
@@ -75,12 +74,12 @@ public class SparkTypeTest {
String nestedRowMapType =
"StructField(locations,MapType("
+ "StringType,"
- +
"StructType(StructField(posX,DoubleType,false),StructField(posY,DoubleType,false)),true),true)";
+ +
"StructType(StructField(posX,DoubleType,true),StructField(posY,DoubleType,true)),true),true)";
String expected =
"StructType("
- + "StructField(id,IntegerType,false),"
+ + "StructField(id,IntegerType,true),"
+ "StructField(name,StringType,true),"
- + "StructField(salary,DoubleType,false),"
+ + "StructField(salary,DoubleType,true),"
+ nestedRowMapType
+ ","
+
"StructField(strArray,ArrayType(StringType,true),true),"
@@ -99,6 +98,7 @@ public class SparkTypeTest {
StructType sparkType = fromPaimonRowType(ALL_TYPES);
assertThat(sparkType.toString().replace(", ",
",")).isEqualTo(expected);
- assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
+ // Ignore the assertion below, since we force to make all the fields
nullable.
+ // assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index e214adb5f..401a56337 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -156,4 +156,18 @@ public class SparkWriteITCase {
.collectAsList();
assertThat(rows.toString()).isEqualTo("[[[1],2], [[2],0]]");
}
+
+ @Test
+ public void testNonnull() {
+ try {
+ spark.sql("CREATE TABLE S AS SELECT 1 as a, 2 as b, 'yann' as c");
+
+ spark.sql("CREATE TABLE T (a INT NOT NULL, b INT, c STRING)");
+ spark.sql("INSERT INTO T SELECT * FROM S");
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[1,2,yann]]");
+ } finally {
+ spark.sql("DROP TABLE IF EXISTS S");
+ }
+ }
}