This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d8082193 [spark] convert non-null to nullable directly (#1462)
7d8082193 is described below

commit 7d80821936f297d1265be49e623ba08994dd29c8
Author: Yann Byron <[email protected]>
AuthorDate: Fri Jun 30 20:13:47 2023 +0800

    [spark] convert non-null to nullable directly (#1462)
---
 .../org/apache/paimon/spark/SparkTypeTest.java     |  6 +--
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  7 ++-
 .../org/apache/paimon/spark/SparkReadITCase.java   | 12 ++----
 .../org/apache/paimon/spark/SparkReadTestBase.java |  2 +-
 .../paimon/spark/SparkSchemaEvolutionITCase.java   | 50 +++++++++-------------
 .../org/apache/paimon/spark/SparkTypeTest.java     | 10 ++---
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 14 ++++++
 7 files changed, 52 insertions(+), 49 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
 
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index f5cbf8db9..c3e5658e5 100644
--- 
a/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ 
b/paimon-spark/paimon-spark-2/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -69,12 +69,12 @@ public class SparkTypeTest {
         String nestedRowMapType =
                 "StructField(locations,MapType("
                         + "StringType,"
-                        + "StructType(StructField(posX,DoubleType,false), 
StructField(posY,DoubleType,false)),true),true)";
+                        + "StructType(StructField(posX,DoubleType,true), 
StructField(posY,DoubleType,true)),true),true)";
         String expected =
                 "StructType("
-                        + "StructField(id,IntegerType,false), "
+                        + "StructField(id,IntegerType,true), "
                         + "StructField(name,StringType,true), "
-                        + "StructField(salary,DoubleType,false), "
+                        + "StructField(salary,DoubleType,true), "
                         + nestedRowMapType
                         + ", "
                         + 
"StructField(strArray,ArrayType(StringType,true),true), "
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
index 75d39c22c..c127a053b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTypeUtils.java
@@ -168,13 +168,16 @@ public class SparkTypeUtils {
                     mapType.getValueType().isNullable());
         }
 
+        /**
+         * For simplicity, as a temporary solution, we directly convert the 
non-null attribute to
+         * nullable on the Spark side.
+         */
         @Override
         public DataType visit(RowType rowType) {
             List<StructField> fields = new 
ArrayList<>(rowType.getFieldCount());
             for (DataField field : rowType.getFields()) {
                 StructField structField =
-                        DataTypes.createStructField(
-                                field.name(), field.type().accept(this), 
field.type().isNullable());
+                        DataTypes.createStructField(field.name(), 
field.type().accept(this), true);
                 structField =
                         Optional.ofNullable(field.description())
                                 .map(structField::withComment)
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
index 1ffb0d9ab..3d51e3933 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadITCase.java
@@ -223,8 +223,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                 .isEqualTo(
                         String.format(
                                 "[[%sTBLPROPERTIES (\n  'path' = '%s')\n]]",
-                                showCreateString(
-                                        "t_pk_as", "a BIGINT NOT NULL", "b 
STRING", "c STRING"),
+                                showCreateString("t_pk_as", "a BIGINT", "b 
STRING", "c STRING"),
                                 new Path(warehousePath, 
"default.db/t_pk_as")));
         List<Row> resultPk = spark.sql("SELECT * FROM 
t_pk_as").collectAsList();
 
@@ -257,8 +256,8 @@ public class SparkReadITCase extends SparkReadTestBase {
                                         "user_id BIGINT",
                                         "item_id BIGINT",
                                         "behavior STRING",
-                                        "dt STRING NOT NULL",
-                                        "hh STRING NOT NULL"),
+                                        "dt STRING",
+                                        "hh STRING"),
                                 new Path(warehousePath, 
"default.db/t_all_as")));
         List<Row> resultAll = spark.sql("SELECT * FROM 
t_all_as").collectAsList();
         assertThat(resultAll.stream().map(Row::toString))
@@ -336,10 +335,7 @@ public class SparkReadITCase extends SparkReadTestBase {
                                         + "TBLPROPERTIES (\n"
                                         + "  'k1' = 'v1',\n"
                                         + "  'path' = '%s')\n]]",
-                                showCreateString(
-                                        "tbl",
-                                        "a INT NOT NULL COMMENT 'a comment'",
-                                        "b STRING NOT NULL"),
+                                showCreateString("tbl", "a INT COMMENT 'a 
comment'", "b STRING"),
                                 new Path(warehousePath, "default.db/tbl")));
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
index 128401b71..fb7f151eb 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java
@@ -203,6 +203,6 @@ public abstract class SparkReadTestBase {
 
     // default schema
     protected String defaultShowCreateString(String table) {
-        return showCreateString(table, "a INT NOT NULL", "b BIGINT", "c 
STRING");
+        return showCreateString(table, "a INT", "b BIGINT", "c STRING");
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 770dd2195..7ed2ed2eb 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -34,7 +34,11 @@ import java.util.stream.Collectors;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** ITCase for schema evolution in spark. */
+/**
+ * ITCase for schema eqvolution in spark.
+ *
+ * <p>NOTICE: For these tests, we remove
+ */
 public class SparkSchemaEvolutionITCase extends SparkReadTestBase {
 
     @Test
@@ -76,11 +80,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         assertThat(afterAdd.toString())
                 .contains(
                         showCreateString(
-                                "testAddColumn",
-                                "a INT NOT NULL",
-                                "b BIGINT",
-                                "c STRING",
-                                "d STRING"));
+                                "testAddColumn", "a INT", "b BIGINT", "c 
STRING", "d STRING"));
 
         assertThat(spark.table("testAddColumn").collectAsList().toString())
                 .isEqualTo("[[1,2,1,null], [5,6,3,null]]");
@@ -113,7 +113,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         showCreateString(
                                 "testAddColumnPositionFirst",
                                 "d INT",
-                                "a INT NOT NULL",
+                                "a INT",
                                 "b BIGINT",
                                 "c STRING"));
 
@@ -124,7 +124,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                 .contains(
                         showCreateString(
                                 "testAddColumnPositionAfter",
-                                "a INT NOT NULL",
+                                "a INT",
                                 "b BIGINT",
                                 "d INT",
                                 "c STRING"));
@@ -168,9 +168,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         spark.sql("ALTER TABLE testRenameColumn RENAME COLUMN a to aa");
         List<Row> afterRename = spark.sql("SHOW CREATE TABLE 
testRenameColumn").collectAsList();
         assertThat(afterRename.toString())
-                .contains(
-                        showCreateString(
-                                "testRenameColumn", "aa INT NOT NULL", "b 
BIGINT", "c STRING"));
+                .contains(showCreateString("testRenameColumn", "aa INT", "b 
BIGINT", "c STRING"));
         Dataset<Row> table = spark.table("testRenameColumn");
         results = table.select("aa", "c").collectAsList();
         assertThat(results.toString()).isEqualTo("[[1,1], [5,3]]");
@@ -260,9 +258,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
 
         List<Row> beforeDrop = spark.sql("SHOW CREATE TABLE 
testDropPrimaryKey").collectAsList();
         assertThat(beforeDrop.toString())
-                .contains(
-                        showCreateString(
-                                "testDropPrimaryKey", "a BIGINT NOT NULL", "b 
STRING NOT NULL"));
+                .contains(showCreateString("testDropPrimaryKey", "a BIGINT", 
"b STRING"));
 
         assertThatThrownBy(() -> spark.sql("ALTER TABLE testDropPrimaryKey 
DROP COLUMN b"))
                 .isInstanceOf(RuntimeException.class)
@@ -277,26 +273,22 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         spark.sql("ALTER TABLE tableFirst ALTER COLUMN b FIRST");
         List<Row> result = spark.sql("SHOW CREATE TABLE 
tableFirst").collectAsList();
         assertThat(result.toString())
-                .contains(showCreateString("tableFirst", "b BIGINT", "a INT 
NOT NULL", "c STRING"));
+                .contains(showCreateString("tableFirst", "b BIGINT", "a INT", 
"c STRING"));
 
         // move after
         createTable("tableAfter");
         spark.sql("ALTER TABLE tableAfter ALTER COLUMN c AFTER a");
         result = spark.sql("SHOW CREATE TABLE tableAfter").collectAsList();
         assertThat(result.toString())
-                .contains(showCreateString("tableAfter", "a INT NOT NULL", "c 
STRING", "b BIGINT"));
+                .contains(showCreateString("tableAfter", "a INT", "c STRING", 
"b BIGINT"));
 
-        spark.sql("CREATE TABLE tableAfter1 (a INT NOT NULL, b BIGINT, c 
STRING, d DOUBLE)");
+        spark.sql("CREATE TABLE tableAfter1 (a INT, b BIGINT, c STRING, d 
DOUBLE)");
         spark.sql("ALTER TABLE tableAfter1 ALTER COLUMN b AFTER c");
         result = spark.sql("SHOW CREATE TABLE tableAfter1").collectAsList();
         assertThat(result.toString())
                 .contains(
                         showCreateString(
-                                "tableAfter1",
-                                "a INT NOT NULL",
-                                "c STRING",
-                                "b BIGINT",
-                                "d DOUBLE"));
+                                "tableAfter1", "a INT", "c STRING", "b 
BIGINT", "d DOUBLE"));
 
         //  move self to first test
         createTable("tableFirstSelf");
@@ -338,9 +330,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
 
         List<Row> afterAlter = spark.sql("SHOW CREATE TABLE 
testAlterColumnType").collectAsList();
         assertThat(afterAlter.toString())
-                .contains(
-                        showCreateString(
-                                "testAlterColumnType", "a INT NOT NULL", "b 
DOUBLE", "c STRING"));
+                .contains(showCreateString("testAlterColumnType", "a INT", "b 
DOUBLE", "c STRING"));
     }
 
     @Test
@@ -449,12 +439,12 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
         // Create table with fields [a, b, c] and insert 2 records
         spark.sql(
                 "CREATE TABLE testSchemaEvolution(\n"
-                        + "a INT NOT NULL, \n"
-                        + "b BIGINT NOT NULL, \n"
+                        + "a INT, \n"
+                        + "b BIGINT, \n"
                         + "c VARCHAR(10), \n"
-                        + "d INT NOT NULL, \n"
-                        + "e INT NOT NULL, \n"
-                        + "f INT NOT NULL) \n"
+                        + "d INT, \n"
+                        + "e INT, \n"
+                        + "f INT) \n"
                         + "TBLPROPERTIES ('file.format'='avro')");
         writeTable("testSchemaEvolution", "(1, 2L, '3', 4, 5, 6)", "(7, 8L, 
'9', 10, 11, 12)");
 
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
index d72c4415f..670099045 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTypeTest.java
@@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.paimon.spark.SparkTypeUtils.fromPaimonRowType;
-import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link SparkTypeUtils}. */
@@ -75,12 +74,12 @@ public class SparkTypeTest {
         String nestedRowMapType =
                 "StructField(locations,MapType("
                         + "StringType,"
-                        + 
"StructType(StructField(posX,DoubleType,false),StructField(posY,DoubleType,false)),true),true)";
+                        + 
"StructType(StructField(posX,DoubleType,true),StructField(posY,DoubleType,true)),true),true)";
         String expected =
                 "StructType("
-                        + "StructField(id,IntegerType,false),"
+                        + "StructField(id,IntegerType,true),"
                         + "StructField(name,StringType,true),"
-                        + "StructField(salary,DoubleType,false),"
+                        + "StructField(salary,DoubleType,true),"
                         + nestedRowMapType
                         + ","
                         + 
"StructField(strArray,ArrayType(StringType,true),true),"
@@ -99,6 +98,7 @@ public class SparkTypeTest {
         StructType sparkType = fromPaimonRowType(ALL_TYPES);
         assertThat(sparkType.toString().replace(", ", 
",")).isEqualTo(expected);
 
-        assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
+        // Ignore the assertion below, since we force to make all the fields 
nullable.
+        // assertThat(toPaimonType(sparkType)).isEqualTo(ALL_TYPES);
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index e214adb5f..401a56337 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -156,4 +156,18 @@ public class SparkWriteITCase {
                         .collectAsList();
         assertThat(rows.toString()).isEqualTo("[[[1],2], [[2],0]]");
     }
+
+    @Test
+    public void testNonnull() {
+        try {
+            spark.sql("CREATE TABLE S AS SELECT 1 as a, 2 as b, 'yann' as c");
+
+            spark.sql("CREATE TABLE T (a INT NOT NULL, b INT, c STRING)");
+            spark.sql("INSERT INTO T SELECT * FROM S");
+            List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+            assertThat(rows.toString()).isEqualTo("[[1,2,yann]]");
+        } finally {
+            spark.sql("DROP TABLE IF EXISTS S");
+        }
+    }
 }

Reply via email to