This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6435dd205 [core][spark] check column nullability when write (#3842)
6435dd205 is described below

commit 6435dd2053f7b094a33c0358615d7a3ddf1c4e92
Author: Yann Byron <[email protected]>
AuthorDate: Tue Jul 30 19:39:05 2024 +0800

    [core][spark] check column nullability when write (#3842)
---
 .../paimon/table/AppendOnlyFileStoreTable.java     |  1 +
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  1 +
 .../apache/paimon/table/sink/TableWriteImpl.java   | 27 +++++++++
 .../spark/catalyst/analysis/PaimonAnalysis.scala   | 20 ++-----
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  | 64 +++++++++++++++++-----
 5 files changed, 84 insertions(+), 29 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 40eeb4d28..0af78a5da 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -139,6 +139,7 @@ class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
         AppendOnlyFileStoreWrite writer =
                 store().newWrite(commitUser, 
manifestFilter).withBucketMode(bucketMode());
         return new TableWriteImpl<>(
+                rowType(),
                 writer,
                 createRowKeyExtractor(),
                 (record, rowKind) -> {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 6ac2763ac..b1e5b5366 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -160,6 +160,7 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
             String commitUser, ManifestCacheFilter manifestFilter) {
         KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
+                rowType(),
                 store().newWrite(commitUser, manifestFilter),
                 createRowKeyExtractor(),
                 (record, rowKind) ->
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 6e2194646..580d7f4c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -30,13 +30,16 @@ import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite.State;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Restorable;
 
 import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -47,6 +50,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
  */
 public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State<T>>> {
 
+    private final RowType rowType;
     private final FileStoreWrite<T> write;
     private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
     private final RecordExtractor<T> recordExtractor;
@@ -56,17 +60,28 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     private boolean batchCommitted = false;
     private BucketMode bucketMode;
 
+    private final int[] notNullFieldIndex;
+
     public TableWriteImpl(
+            RowType rowType,
             FileStoreWrite<T> write,
             KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
             RecordExtractor<T> recordExtractor,
             @Nullable RowKindGenerator rowKindGenerator,
             boolean ignoreDelete) {
+        this.rowType = rowType;
         this.write = write;
         this.keyAndBucketExtractor = keyAndBucketExtractor;
         this.recordExtractor = recordExtractor;
         this.rowKindGenerator = rowKindGenerator;
         this.ignoreDelete = ignoreDelete;
+
+        List<String> notNullColumnNames =
+                rowType.getFields().stream()
+                        .filter(field -> !field.type().isNullable())
+                        .map(DataField::name)
+                        .collect(Collectors.toList());
+        this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
     }
 
     @Override
@@ -137,6 +152,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
 
     @Nullable
     public SinkRecord writeAndReturn(InternalRow row) throws Exception {
+        checkNullability(row);
         RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
         if (ignoreDelete && rowKind.isRetract()) {
             return null;
@@ -148,6 +164,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
 
     @Nullable
     public SinkRecord writeAndReturn(InternalRow row, int bucket) throws 
Exception {
+        checkNullability(row);
         RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
         if (ignoreDelete && rowKind.isRetract()) {
             return null;
@@ -157,6 +174,16 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         return record;
     }
 
+    private void checkNullability(InternalRow row) {
+        for (int idx : notNullFieldIndex) {
+            if (row.isNullAt(idx)) {
+                String columnName = rowType.getFields().get(idx).name();
+                throw new RuntimeException(
+                        String.format("Cannot write null to non-null 
column(%s)", columnName));
+            }
+        }
+    }
+
     private SinkRecord toSinkRecord(InternalRow row) {
         keyAndBucketExtractor.setRecord(row);
         return new SinkRecord(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 676856126..3dc0e40c9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -26,11 +26,11 @@ import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.ResolvedTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 
 import scala.collection.JavaConverters._
 
@@ -58,8 +58,8 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 
   private def schemaCompatible(
-      tableSchema: StructType,
       dataSchema: StructType,
+      tableSchema: StructType,
       partitionCols: Seq[String],
       parent: Array[String] = Array.empty): Boolean = {
 
@@ -82,9 +82,8 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
       }
     }
 
-    tableSchema.zip(dataSchema).forall {
+    dataSchema.zip(tableSchema).forall {
       case (f1, f2) =>
-        checkNullability(f1, f2, partitionCols, parent)
         f1.name == f2.name && dataTypeCompatible(f1.name, f1.dataType, 
f2.dataType)
     }
   }
@@ -115,17 +114,6 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
     cast.setTagValue(Compatibility.castByTableInsertionTag, ())
     cast
   }
-
-  private def checkNullability(
-      input: StructField,
-      expected: StructField,
-      partitionCols: Seq[String],
-      parent: Array[String] = Array.empty): Unit = {
-    val fullColumnName = (parent ++ Array(input.name)).mkString(".")
-    if (!partitionCols.contains(fullColumnName) && input.nullable && 
!expected.nullable) {
-      throw new RuntimeException("Cannot write nullable values to non-null 
column")
-    }
-  }
 }
 
 case class PaimonPostHocResolutionRules(session: SparkSession) extends 
Rule[LogicalPlan] {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index da4017104..db749a636 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.schema.Schema
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.types.DataTypes
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.Row
 import org.junit.jupiter.api.Assertions
 
@@ -33,33 +34,70 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
 
   import testImplicits._
 
-  test("Paimon DDL: create table with not null") {
+  test("Paimon DDL: create append table with not null") {
     withTable("T") {
-      sql("""
-            |CREATE TABLE T (id INT NOT NULL, name STRING)
-            |""".stripMargin)
+      sql("CREATE TABLE T (id INT NOT NULL, name STRING)")
 
-      val exception = intercept[RuntimeException] {
-        sql("""
-              |INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")
-              |""".stripMargin)
+      val e1 = intercept[SparkException] {
+        sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")""")
       }
-      Assertions.assertTrue(
-        exception.getMessage().contains("Cannot write nullable values to 
non-null column"))
+      Assertions.assertTrue(e1.getMessage().contains("Cannot write null to 
non-null column"))
+
+      sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)""")
+      checkAnswer(
+        sql("SELECT * FROM T ORDER BY id"),
+        Seq((1, "a"), (2, "b"), (3, null)).toDF()
+      )
 
+      val schema = spark.table("T").schema
+      Assertions.assertEquals(schema.size, 2)
+      Assertions.assertFalse(schema("id").nullable)
+      Assertions.assertTrue(schema("name").nullable)
+    }
+  }
+  test("Paimon DDL: create primary-key table with not null") {
+    withTable("T") {
       sql("""
-            |INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)
+            |CREATE TABLE T (id INT, name STRING, pt STRING)
+            |TBLPROPERTIES ('primary-key' = 'id,pt')
             |""".stripMargin)
 
+      val e1 = intercept[SparkException] {
+        sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", null)""")
+      }
+      Assertions.assertTrue(e1.getMessage().contains("Cannot write null to 
non-null column"))
+
+      val e2 = intercept[SparkException] {
+        sql("""INSERT INTO T VALUES (1, "a", "pt1"), (null, "b", "pt2")""")
+      }
+      Assertions.assertTrue(e2.getMessage().contains("Cannot write null to 
non-null column"))
+
+      sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", "pt1"), (3, null, 
"pt2")""")
       checkAnswer(
         sql("SELECT * FROM T ORDER BY id"),
-        Seq((1, "a"), (2, "b"), (3, null)).toDF()
+        Seq((1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")).toDF()
       )
 
       val schema = spark.table("T").schema
-      Assertions.assertEquals(schema.size, 2)
+      Assertions.assertEquals(schema.size, 3)
       Assertions.assertFalse(schema("id").nullable)
       Assertions.assertTrue(schema("name").nullable)
+      Assertions.assertFalse(schema("pt").nullable)
+    }
+  }
+
+  test("Paimon DDL: write nullable expression to non-null column") {
+    withTable("T") {
+      sql("""
+            |CREATE TABLE T (id INT NOT NULL, ts TIMESTAMP NOT NULL)
+            |""".stripMargin)
+
+      sql("INSERT INTO T SELECT 1, TO_TIMESTAMP('2024-07-01 16:00:00')")
+
+      checkAnswer(
+        sql("SELECT * FROM T ORDER BY id"),
+        Row(1, Timestamp.valueOf("2024-07-01 16:00:00")) :: Nil
+      )
     }
   }
 

Reply via email to