coderfender commented on code in PR #3351:
URL: https://github.com/apache/datafusion-comet/pull/3351#discussion_r2797196426
##########
spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala:
##########
@@ -377,6 +378,276 @@ class CometParquetWriterSuite extends CometTestBase {
}
}
+// NATIVE COMET WRITER TESTS WHICH FAIL IN SPARK
+ // https://github.com/apache/datafusion-comet/issues/3417
+ ignore("Spark compat: empty file should be skipped while write to file") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTempPath { path =>
+ checkCometNativeWriter {
+ spark.range(100).repartition(10).where("id =
50").write.parquet(path.toString)
+ }
+ val partFiles = path
+ .listFiles()
+ .filter(f => f.isFile && !f.getName.startsWith(".") &&
!f.getName.startsWith("_"))
+ assert(partFiles.length === 2)
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3418
+ ignore("Spark compat: SPARK-33901 ctas should not change table's schema") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("t1", "t2") {
+ sql("CREATE TABLE t1(i CHAR(5), c VARCHAR(4)) USING parquet")
+ checkCometNativeWriter {
+ sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
+ }
+ checkAnswer(
+ sql("desc t2").selectExpr("data_type").where("data_type like
'%char%'"),
+ Seq(Row("char(5)"), Row("varchar(4)")))
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3419
+ ignore("Spark compat: SPARK-37160 CREATE TABLE AS SELECT with
CHAR_AS_VARCHAR") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("t1", "t2") {
+ sql("CREATE TABLE t1(col CHAR(5)) USING parquet")
+ withSQLConf(SQLConf.CHAR_AS_VARCHAR.key -> "true") {
+ checkCometNativeWriter {
+ sql("CREATE TABLE t2 USING parquet AS SELECT * FROM t1")
+ }
+ checkAnswer(
+ sql("desc t2").selectExpr("data_type").where("data_type like
'%char%'"),
+ Seq(Row("varchar(5)")))
+ }
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3420
+ ignore("Spark compat: SPARK-29174 Support LOCAL in INSERT OVERWRITE
DIRECTORY") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTempPath { dir =>
+ val path = dir.toURI.getPath
+ withTable("tab1", "tab2") {
+ sql(s"""create table tab1 (a int) using parquet location '$path'""")
+ checkCometNativeWriter {
+ sql("insert into tab1 values(1)")
+ }
+ checkAnswer(sql("select * from tab1"), Seq(Row(1)))
+ sql("create table tab2 (a int) using parquet")
+ checkCometNativeWriter {
+ sql("insert into tab2 values(2)")
+ }
+ checkCometNativeWriter {
+ sql(s"""insert overwrite local directory '$path' using parquet
select * from tab2""")
+ }
+ sql("refresh table tab1")
+ checkAnswer(sql("select * from tab1"), Seq(Row(2)))
+ }
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3421
+ ignore("Spark compat: SPARK-38336 INSERT INTO with default columns positive
tests") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("t") {
+ sql("create table t(i boolean, s bigint) using parquet")
+ checkCometNativeWriter {
+ sql("insert into t(i) values(true)")
+ }
+ checkAnswer(spark.table("t"), Row(true, null))
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3422
+ ignore("Spark compat: SPARK-38811 INSERT INTO on ALTER TABLE ADD COLUMNS
positive tests") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("t") {
+ sql("create table t(i boolean) using parquet")
+ sql("alter table t add column s string default concat('abc', 'def')")
+ checkCometNativeWriter {
+ sql("insert into t values(true, default)")
+ }
+ checkAnswer(spark.table("t"), Row(true, "abcdef"))
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3423
+ ignore("Spark compat: SPARK-43071 INSERT INTO from non-projection queries") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("t1", "t2") {
+ sql("create table t1(i boolean, s bigint default 42) using parquet")
+ checkCometNativeWriter {
+ sql("insert into t1 values (true, 41), (false, default)")
+ }
+ sql("create table t2(i boolean, s bigint) using parquet")
+ checkCometNativeWriter {
+ sql("insert into t2 select * from t1 order by s")
+ }
+ checkAnswer(spark.table("t2"), Seq(Row(true, 41), Row(false, 42)))
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3424
+ ignore("Spark compat: Insert overwrite table command should output correct
schema basic") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("tbl", "tbl2") {
+ withView("view1") {
+ val df = spark.range(10).toDF("id")
+ checkCometNativeWriter {
+ df.write.format("parquet").saveAsTable("tbl")
+ }
+ spark.sql("CREATE VIEW view1 AS SELECT id FROM tbl")
+ spark.sql("CREATE TABLE tbl2(ID long) USING parquet")
+ checkCometNativeWriter {
+ spark.sql("INSERT OVERWRITE TABLE tbl2 SELECT ID FROM view1")
+ }
+ checkAnswer(spark.table("tbl2"), (0 until 10).map(Row(_)))
+ }
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3425
+ ignore("Spark compat: parquet timestamp conversion") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTempPath { dir =>
+ checkCometNativeWriter {
+ spark
+ .range(1)
+ .selectExpr("current_timestamp() as ts")
+ .write
+ .parquet(dir.toString + "/spark")
+ }
+ val result = spark.read.parquet(dir.toString + "/spark").collect()
+ assert(result.length == 1)
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3426
+ ignore("Spark compat: INSERT INTO TABLE - complex type but different names")
{
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("tab1", "tab2") {
+ sql("""CREATE TABLE tab1 (s struct<a: string, b: string>) USING
parquet""")
+ sql("""CREATE TABLE tab2 (s struct<c: string, d: string>) USING
parquet""")
+ checkCometNativeWriter {
+ sql("INSERT INTO tab1 VALUES (named_struct('a', 'x', 'b', 'y'))")
+ }
+ checkCometNativeWriter {
+ sql("INSERT INTO tab2 SELECT * FROM tab1")
+ }
+ checkAnswer(spark.table("tab2"), Row(Row("x", "y")))
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3427
+ ignore("Spark compat: Write Spark version into Parquet metadata") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTempPath { dir =>
+ checkCometNativeWriter {
+ spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
+ }
+ val files = dir.listFiles().filter(_.getName.endsWith(".parquet"))
+ assert(files.nonEmpty, "Expected parquet files to be written")
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3428
+ ignore("Spark compat: write path implements onTaskCommit API correctly") {
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTempDir { dir =>
+ val path = dir.getCanonicalPath
+ checkCometNativeWriter {
+ spark.range(10).repartition(10).write.mode("overwrite").parquet(path)
+ }
+ val files = new
File(path).listFiles().filter(_.getName.startsWith("part-"))
+ assert(files.length > 0, "Expected part files to be written")
+ }
+ }
+ }
+
+ // COMET NATIVE WRITER Spark 4.x test failures
+ // https://github.com/apache/datafusion-comet/issues/3429
+ ignore("Spark compat: ctas with union") {
+ assume(isSpark40Plus)
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("t") {
+ checkCometNativeWriter {
+ sql("CREATE TABLE t USING parquet AS SELECT 1 AS c UNION ALL SELECT
2")
+ }
+ checkAnswer(spark.table("t"), Seq(Row(1), Row(2)))
+ }
+ }
+ }
+
+ // https://github.com/apache/datafusion-comet/issues/3430
+ ignore("Spark compat: SPARK-48817 test multi insert") {
+ assume(isSpark40Plus)
+ withSQLConf(
+ CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
+ CometConf.COMET_EXEC_ENABLED.key -> "true",
+
CometConf.getOperatorAllowIncompatConfigKey(classOf[DataWritingCommandExec]) ->
"true") {
+ withTable("t1", "t2") {
+ sql("CREATE TABLE t1(a INT) USING parquet")
+ sql("CREATE TABLE t2(a INT) USING parquet")
+ checkCometNativeWriter {
+ sql("FROM (SELECT 1 AS a) src INSERT INTO t1 SELECT a INSERT INTO t2
SELECT a")
Review Comment:
Yeah , I followed the same approach as the test we are copying . Multi table
inserts are supported in spark (carried over approach from hive syntax).
Additional details here :
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-Syntax.1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]