This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6435dd205 [core][spark] check column nullability when write (#3842)
6435dd205 is described below
commit 6435dd2053f7b094a33c0358615d7a3ddf1c4e92
Author: Yann Byron <[email protected]>
AuthorDate: Tue Jul 30 19:39:05 2024 +0800
[core][spark] check column nullability when write (#3842)
---
.../paimon/table/AppendOnlyFileStoreTable.java | 1 +
.../paimon/table/PrimaryKeyFileStoreTable.java | 1 +
.../apache/paimon/table/sink/TableWriteImpl.java | 27 +++++++++
.../spark/catalyst/analysis/PaimonAnalysis.scala | 20 ++-----
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 64 +++++++++++++++++-----
5 files changed, 84 insertions(+), 29 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 40eeb4d28..0af78a5da 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -139,6 +139,7 @@ class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
AppendOnlyFileStoreWrite writer =
store().newWrite(commitUser,
manifestFilter).withBucketMode(bucketMode());
return new TableWriteImpl<>(
+ rowType(),
writer,
createRowKeyExtractor(),
(record, rowKind) -> {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 6ac2763ac..b1e5b5366 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -160,6 +160,7 @@ class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
String commitUser, ManifestCacheFilter manifestFilter) {
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
+ rowType(),
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
(record, rowKind) ->
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index 6e2194646..580d7f4c4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -30,13 +30,16 @@ import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Restorable;
import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -47,6 +50,7 @@ import static
org.apache.paimon.utils.Preconditions.checkState;
*/
public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State<T>>> {
+ private final RowType rowType;
private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
@@ -56,17 +60,28 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
private boolean batchCommitted = false;
private BucketMode bucketMode;
+ private final int[] notNullFieldIndex;
+
public TableWriteImpl(
+ RowType rowType,
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
RecordExtractor<T> recordExtractor,
@Nullable RowKindGenerator rowKindGenerator,
boolean ignoreDelete) {
+ this.rowType = rowType;
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
this.rowKindGenerator = rowKindGenerator;
this.ignoreDelete = ignoreDelete;
+
+ List<String> notNullColumnNames =
+ rowType.getFields().stream()
+ .filter(field -> !field.type().isNullable())
+ .map(DataField::name)
+ .collect(Collectors.toList());
+ this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
}
@Override
@@ -137,6 +152,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
+ checkNullability(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
@@ -148,6 +164,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
@Nullable
public SinkRecord writeAndReturn(InternalRow row, int bucket) throws
Exception {
+ checkNullability(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
@@ -157,6 +174,16 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
return record;
}
+ private void checkNullability(InternalRow row) {
+ for (int idx : notNullFieldIndex) {
+ if (row.isNullAt(idx)) {
+ String columnName = rowType.getFields().get(idx).name();
+ throw new RuntimeException(
+ String.format("Cannot write null to non-null
column(%s)", columnName));
+ }
+ }
+ }
+
private SinkRecord toSinkRecord(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
return new SinkRecord(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 676856126..3dc0e40c9 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -26,11 +26,11 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.ResolvedTable
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast,
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
Expression, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField,
StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import scala.collection.JavaConverters._
@@ -58,8 +58,8 @@ class PaimonAnalysis(session: SparkSession) extends
Rule[LogicalPlan] {
}
private def schemaCompatible(
- tableSchema: StructType,
dataSchema: StructType,
+ tableSchema: StructType,
partitionCols: Seq[String],
parent: Array[String] = Array.empty): Boolean = {
@@ -82,9 +82,8 @@ class PaimonAnalysis(session: SparkSession) extends
Rule[LogicalPlan] {
}
}
- tableSchema.zip(dataSchema).forall {
+ dataSchema.zip(tableSchema).forall {
case (f1, f2) =>
- checkNullability(f1, f2, partitionCols, parent)
f1.name == f2.name && dataTypeCompatible(f1.name, f1.dataType,
f2.dataType)
}
}
@@ -115,17 +114,6 @@ class PaimonAnalysis(session: SparkSession) extends
Rule[LogicalPlan] {
cast.setTagValue(Compatibility.castByTableInsertionTag, ())
cast
}
-
- private def checkNullability(
- input: StructField,
- expected: StructField,
- partitionCols: Seq[String],
- parent: Array[String] = Array.empty): Unit = {
- val fullColumnName = (parent ++ Array(input.name)).mkString(".")
- if (!partitionCols.contains(fullColumnName) && input.nullable &&
!expected.nullable) {
- throw new RuntimeException("Cannot write nullable values to non-null
column")
- }
- }
}
case class PaimonPostHocResolutionRules(session: SparkSession) extends
Rule[LogicalPlan] {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index da4017104..db749a636 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.schema.Schema
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.types.DataTypes
+import org.apache.spark.SparkException
import org.apache.spark.sql.Row
import org.junit.jupiter.api.Assertions
@@ -33,33 +34,70 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
import testImplicits._
- test("Paimon DDL: create table with not null") {
+ test("Paimon DDL: create append table with not null") {
withTable("T") {
- sql("""
- |CREATE TABLE T (id INT NOT NULL, name STRING)
- |""".stripMargin)
+ sql("CREATE TABLE T (id INT NOT NULL, name STRING)")
- val exception = intercept[RuntimeException] {
- sql("""
- |INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")
- |""".stripMargin)
+ val e1 = intercept[SparkException] {
+ sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")""")
}
- Assertions.assertTrue(
- exception.getMessage().contains("Cannot write nullable values to
non-null column"))
+ Assertions.assertTrue(e1.getMessage().contains("Cannot write null to
non-null column"))
+
+ sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)""")
+ checkAnswer(
+ sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a"), (2, "b"), (3, null)).toDF()
+ )
+ val schema = spark.table("T").schema
+ Assertions.assertEquals(schema.size, 2)
+ Assertions.assertFalse(schema("id").nullable)
+ Assertions.assertTrue(schema("name").nullable)
+ }
+ }
+ test("Paimon DDL: create primary-key table with not null") {
+ withTable("T") {
sql("""
- |INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES ('primary-key' = 'id,pt')
|""".stripMargin)
+ val e1 = intercept[SparkException] {
+ sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", null)""")
+ }
+ Assertions.assertTrue(e1.getMessage().contains("Cannot write null to
non-null column"))
+
+ val e2 = intercept[SparkException] {
+ sql("""INSERT INTO T VALUES (1, "a", "pt1"), (null, "b", "pt2")""")
+ }
+ Assertions.assertTrue(e2.getMessage().contains("Cannot write null to
non-null column"))
+
+ sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", "pt1"), (3, null,
"pt2")""")
checkAnswer(
sql("SELECT * FROM T ORDER BY id"),
- Seq((1, "a"), (2, "b"), (3, null)).toDF()
+ Seq((1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")).toDF()
)
val schema = spark.table("T").schema
- Assertions.assertEquals(schema.size, 2)
+ Assertions.assertEquals(schema.size, 3)
Assertions.assertFalse(schema("id").nullable)
Assertions.assertTrue(schema("name").nullable)
+ Assertions.assertFalse(schema("pt").nullable)
+ }
+ }
+
+ test("Paimon DDL: write nullable expression to non-null column") {
+ withTable("T") {
+ sql("""
+ |CREATE TABLE T (id INT NOT NULL, ts TIMESTAMP NOT NULL)
+ |""".stripMargin)
+
+ sql("INSERT INTO T SELECT 1, TO_TIMESTAMP('2024-07-01 16:00:00')")
+
+ checkAnswer(
+ sql("SELECT * FROM T ORDER BY id"),
+ Row(1, Timestamp.valueOf("2024-07-01 16:00:00")) :: Nil
+ )
}
}