danny0405 commented on code in PR #14183:
URL: https://github.com/apache/hudi/pull/14183#discussion_r2480262614


##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/insert/TestInsertTable.scala:
##########
@@ -777,5 +779,88 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
       }
     })
   }
+
+  test("Test Query With PK Filter") {
+    withTable(generateTableName) { tableName =>
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           |) using hudi
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id,name',
+           |  preCombineField = 'ts',
+           |  'hoodie.index.type' = 'BUCKET',
+           |  'hoodie.bucket.index.num.buckets' = '1',
+           |  '${HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key()}' = 
'parquet'
+           | )
+           | partitioned by (dt)
+          """.stripMargin
+      )
+      spark.conf.unset("hoodie.datasource.insert.dup.policy")
+
+      withSQLConf("hoodie.datasource.overwrite.mode" -> "dynamic") {
+        spark.sql(
+          s"""
+             | insert overwrite table $tableName partition(dt) values
+             | (0, 'a0', 10, 1000, '2023-12-06'),
+             | (1, 'a1', 10, 1000, '2023-12-06'),
+             | (2, 'a2', 10, 1000, '2023-12-06'),
+             | (3, 'a3', 10, 1000, '2023-12-06')
+          """.stripMargin)
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(0, "a0", 10.0, 1000, "2023-12-06"),
+          Seq(1, "a1", 10.0, 1000, "2023-12-06"),
+          Seq(2, "a2", 10.0, 1000, "2023-12-06"),
+          Seq(3, "a3", 10.0, 1000, "2023-12-06")
+        )
+      }
+      withSQLConf("hoodie.datasource.write.operation" -> "upsert") {
+        spark.sql(
+          s"""
+             | insert into table $tableName partition (dt='2023-12-06') values
+             | (1, 'a1', 11, 2000),
+             | (4, 'a4', 10, 1000)
+            """.stripMargin)
+      }
+
+      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+        Seq(0, "a0", 10.0, 1000, "2023-12-06"),
+        Seq(1, "a1", 11.0, 2000, "2023-12-06"),
+        Seq(2, "a2", 10.0, 1000, "2023-12-06"),
+        Seq(3, "a3", 10.0, 1000, "2023-12-06"),
+        Seq(4, "a4", 10.0, 1000, "2023-12-06")
+      )
+
+      withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "true") {
+        checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or 
name = 'a3') and price <> 10")(
+          Seq(11.0, 2000, "2023-12-06")
+        )
+        // Filter(id = 1) and Filter(name = 'a3') can be push down, 
Filter(price <> 10) can't be push down since it's not primary key
+        val df = spark.sql(s"select price, ts, dt from $tableName where (id = 
1 or name = 'a3') and price <> 10")
+        // only execute file scan physical plan
+        // expected in file scan only (id: 1), (id: 3) and (id: 4) matched, 
(id: 3) and (id: 4)  matched but will be filtered later
+        
assertResult(3)(df.queryExecution.sparkPlan.children(0).children(0).executeCollect().length)
+      }
+
+      withSQLConf(s"${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}" -> "false") {
+        spark.sql(s"set ${SQLConf.PARQUET_RECORD_FILTER_ENABLED.key}=false")
+        checkAnswer(s"select price, ts, dt from $tableName where (id = 1 or 
name = 'a3') and price <> 10")(
+          Seq(11.0, 2000, "2023-12-06")
+        )
+        val df = spark.sql(s"select price, ts, dt from $tableName where (id = 
1 or name = 'a3') and price <> 10")

Review Comment:
   does it work for id > 1 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to