This is an automated email from the ASF dual-hosted git repository. zouxxyy pushed a commit to branch dev/fix-char in repository https://gitbox.apache.org/repos/asf/paimon.git
commit a518f9d36c3d2d3cecf245d793c78860f0d281d0 Author: zouxxyy <[email protected]> AuthorDate: Fri Aug 30 17:47:31 2024 +0800 1 --- .../spark/catalyst/analysis/PaimonAnalysis.scala | 28 +++++++++++------ .../org/apache/paimon/spark/sql/DDLTestBase.scala | 36 +++++++++++++++++++++- .../spark/sql/InsertOverwriteTableTestBase.scala | 2 +- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala index 1b28fa9b6..47b4001a0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, MapType, Metadata, StructField, StructType} import scala.collection.mutable @@ -107,10 +107,7 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { } else { matchedCols += matched.head.name val matchedCol = matched.head - val actualExpectedCol = expectedCol.withDataType { - CharVarcharUtils.getRawType(expectedCol.metadata).getOrElse(expectedCol.dataType) - } - addCastToColumn(matchedCol, actualExpectedCol, isByName = true) + addCastToColumn(matchedCol, expectedCol, isByName = true) } } @@ -185,7 +182,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { case _ => cast(attr, targetAttr.dataType) } - Alias(expr, targetAttr.name)(explicitMetadata = Option(targetAttr.metadata)) + Alias(withStrLenCheck(expr, targetAttr.metadata), targetAttr.name)(explicitMetadata = + Option(targetAttr.metadata)) } private def addCastToStructByName( @@ -248,9 +246,10 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { sourceFieldName: String, targetField: StructField): NamedExpression = { Alias( - cast(GetStructField(parent, i, Option(sourceFieldName)), targetField.dataType), - targetField.name - )(explicitMetadata = Option(targetField.metadata)) + withStrLenCheck( + cast(GetStructField(parent, i, Option(sourceFieldName)), targetField.dataType), + targetField.metadata), + targetField.name)(explicitMetadata = Option(targetField.metadata)) } private def castToArrayStruct( @@ -275,6 +274,17 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { cast.setTagValue(Compatibility.castByTableInsertionTag, ()) cast } + + private def withStrLenCheck(expr: Expression, metadata: Metadata): Expression = { + if (!conf.charVarcharAsString) { + CharVarcharUtils + .getRawType(metadata) + .map(rawType => CharVarcharUtils.stringLengthCheck(expr, rawType)) + .getOrElse(expr) + } else { + expr + } + } } case class PaimonPostHocResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index f7b540981..f99bff2af 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -197,7 +197,7 @@ abstract class DDLTestBase extends PaimonSparkTestBase { ) // check select - if (!gteqSpark3_4) { + if (format == "orc" && !gteqSpark3_4) { // Orc reader will right trim the char type, e.g. "Friday " => "Friday" (see orc's `CharTreeReader`) // and Spark has a conf `spark.sql.readSideCharPadding` to auto padding char only since 3.4 (default true) // So when using orc with Spark3.4-, here will return "Friday" @@ -233,6 +233,40 @@ abstract class DDLTestBase extends PaimonSparkTestBase { } } + test("Paimon DDL: write with char") { + withTable("paimon_tbl") { + spark.sql(s""" + |CREATE TABLE paimon_tbl (id int, c char(6)) + |USING PAIMON + |""".stripMargin) + + withSQLConf("spark.sql.legacy.charVarcharAsString" -> "true") { + sql("INSERT INTO paimon_tbl VALUES (1, 'ab')") + } + + withSQLConf("spark.sql.legacy.charVarcharAsString" -> "false") { + sql("INSERT INTO paimon_tbl VALUES (2, 'ab')") + } + + if (gteqSpark3_4) { + withSQLConf("spark.sql.readSideCharPadding" -> "true") { + checkAnswer( + spark.sql("SELECT c FROM paimon_tbl ORDER BY id"), + Row("ab ") :: Row("ab ") :: Nil) + } + withSQLConf("spark.sql.readSideCharPadding" -> "false") { + checkAnswer( + spark.sql("SELECT c FROM paimon_tbl ORDER BY id"), + Row("ab") :: Row("ab ") :: Nil) + } + } else { + checkAnswer( + spark.sql("SELECT c FROM paimon_tbl ORDER BY id"), + Row("ab") :: Row("ab ") :: Nil) + } + } + } + test("Paimon DDL: create table with timestamp/timestamp_ntz") { Seq("orc", "parquet", "avro").foreach { format => diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala index 6d9ceb187..674b45fda 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala @@ -73,7 +73,7 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { Row(3, 3.3d, "Paimon", "pt2") :: Nil ) - // BY NAME statementis supported since Spark3.5 + // BY NAME statements supported since Spark3.5 if (gteqSpark3_5) { sql("INSERT OVERWRITE TABLE t1 BY NAME SELECT col3, col2, col4, col1 FROM t1") // null for non-specified column
