This is an automated email from the ASF dual-hosted git repository.

lwz9103 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 76a7ff8ec6 [GLUTEN-9044][CH] Fix virtual columns in mergetree table 
(#9047)
76a7ff8ec6 is described below

commit 76a7ff8ec6cfc39138b68524ea9cab32898af54f
Author: Wenzheng Liu <[email protected]>
AuthorDate: Fri Mar 21 13:23:27 2025 +0800

    [GLUTEN-9044][CH] Fix virtual columns in mergetree table (#9047)
---
 .../spark/sql/delta/commands/DeleteCommand.scala   |   5 +-
 .../sql/delta/commands/MergeIntoCommand.scala      |   2 -
 .../spark/sql/delta/commands/UpdateCommand.scala   |   5 +-
 .../spark/sql/delta/commands/DeleteCommand.scala   |   7 +-
 .../sql/delta/commands/MergeIntoCommand.scala      |   4 +-
 .../spark/sql/delta/commands/UpdateCommand.scala   |   5 +-
 .../spark/sql/delta/commands/DeleteCommand.scala   |   7 +-
 .../spark/sql/delta/commands/UpdateCommand.scala   |   7 +-
 .../commands/merge/ClassicMergeExecutor.scala      |   4 +-
 ...tenClickHouseMergeTreePathBasedWriteSuite.scala |  12 +-
 .../GlutenClickHouseMergetreeWriteStatsSuite.scala | 140 +++++++++++++++
 cpp-ch/local-engine/Parser/InputFileNameParser.cpp | 197 ---------------------
 cpp-ch/local-engine/Parser/InputFileNameParser.h   |  55 ------
 .../Parser/RelParsers/MergeTreeRelParser.cpp       |  85 ++++++---
 .../Parser/RelParsers/MergeTreeRelParser.h         |   4 +
 .../Storages/Output/NormalFileWriter.h             |   5 +-
 16 files changed, 225 insertions(+), 319 deletions(-)

diff --git 
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
 
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index 61b0330723..9feac0e0b4 100644
--- 
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++ 
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
-import org.apache.spark.sql.functions.{col, explode, input_file_name, lit, 
split, typedLit, udf}
+import org.apache.spark.sql.functions.{input_file_name, lit, typedLit, udf}
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
@@ -39,8 +39,6 @@ import 
com.fasterxml.jackson.databind.annotation.JsonDeserialize
  * Gluten overwrite Delta:
  *
  * This file is copied from Delta 2.2.0. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
  */
 
 trait DeleteCommandMetrics { self: LeafRunnableCommand =>
@@ -221,7 +219,6 @@ case class DeleteCommand(deltaLog: DeltaLog, target: 
LogicalPlan, condition: Opt
                   .filter(new Column(cond))
                   .select(input_file_name().as("input_files"))
                   .filter(deletedRowUdf())
-                  .select(explode(split(col("input_files"), ",")))
                   .distinct()
                   .as[String]
                   .collect()
diff --git 
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
 
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
index 4b26d5d5d9..4882def7b2 100644
--- 
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
+++ 
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
@@ -49,8 +49,6 @@ import scala.collection.mutable
  * Gluten overwrite Delta:
  *
  * This file is copied from Delta 2.2.0. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
  */
 
 case class MergeDataSizes(
diff --git 
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
 
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 42a081788e..ce6a7a474e 100644
--- 
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++ 
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -38,9 +38,7 @@ import org.apache.hadoop.fs.Path
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 2.2.0. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.2.0.
  */
 
 /**
@@ -149,7 +147,6 @@ case class UpdateCommand(
             .filter(new Column(updateCondition))
             .filter(updatedRowUdf())
             .select(input_file_name().as("input_files"))
-            .select(explode(split(col("input_files"), ",")))
             .distinct()
             .as[String]
             .collect()
diff --git 
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
 
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index 5f9c2953ba..88f2b208af 100644
--- 
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++ 
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -35,15 +35,13 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan}
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric, 
createTimingMetric}
-import org.apache.spark.sql.functions.{col, explode, input_file_name, split}
+import org.apache.spark.sql.functions.input_file_name
 import org.apache.spark.sql.types.LongType
 
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 2.3.0. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.3.0.
  */
 
 trait DeleteCommandMetrics { self: LeafRunnableCommand =>
@@ -293,7 +291,6 @@ case class DeleteCommand(
                   data.filter(new Column(cond))
                     .select(input_file_name().as("input_files"))
                     .filter(deletedRowUdf())
-                    .select(explode(split(col("input_files"), ",")))
                     .distinct()
                     .as[String]
                     .collect()
diff --git 
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
 
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
index bb4d668975..86bd9a4233 100644
--- 
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
+++ 
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
@@ -49,9 +49,7 @@ import org.apache.spark.sql.types.{DataTypes, LongType, 
StructType}
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 2.3.0. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.3.0.
  */
 
 case class MergeDataSizes(
diff --git 
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
 
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 94ccef961c..b39bcd5ba8 100644
--- 
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++ 
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -39,9 +39,7 @@ import org.apache.spark.sql.types.LongType
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 2.3.0. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.3.0.
  */
 
 /**
@@ -156,7 +154,6 @@ case class UpdateCommand(
           data.filter(new Column(updateCondition))
             .select(input_file_name().as("input_files"))
             .filter(updatedRowUdf())
-            .select(explode(split(col("input_files"), ",")))
             .distinct()
             .as[String]
             .collect()
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index 0a25346fc6..a820da4f9f 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -39,15 +39,13 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan}
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric, 
createTimingMetric}
-import org.apache.spark.sql.functions.{col, explode, input_file_name, split}
+import org.apache.spark.sql.functions.input_file_name
 import org.apache.spark.sql.types.LongType
 
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 3.2.1.
  */
 
 trait DeleteCommandMetrics { self: LeafRunnableCommand =>
@@ -314,7 +312,6 @@ case class DeleteCommand(
                   data.filter(new Column(cond))
                     .select(input_file_name().as("input_files"))
                     .filter(new Column(incrDeletedCountExpr))
-                    .select(explode(split(col("input_files"), ",")))
                     .distinct()
                     .as[String]
                     .collect()
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 4e75b84619..692067d97d 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -39,15 +39,13 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric, 
createTimingMetric}
-import org.apache.spark.sql.functions.{array, col, explode, input_file_name, 
lit, split, struct}
+import org.apache.spark.sql.functions.{array, col, input_file_name, explode, 
lit, struct}
 import org.apache.spark.sql.types.LongType
 
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 3.2.1.
  */
 
 /**
@@ -195,7 +193,6 @@ case class UpdateCommand(
             data.filter(new Column(updateCondition))
               .select(input_file_name().as("input_files"))
               .filter(new Column(incrUpdatedCountExpr))
-              .select(explode(split(col("input_files"), ",")))
               .distinct()
               .as[String]
               .collect()
diff --git 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
index aa1f94c5c9..cef996e7ba 100644
--- 
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
+++ 
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
@@ -35,9 +35,7 @@ import org.apache.spark.sql.functions.{coalesce, col, count, 
input_file_name, li
 /**
  * Gluten overwrite Delta:
  *
- * This file is copied from Delta 3.2.1. It is modified to overcome the 
following issues:
- *   1. In Clickhouse backend, we can't implement input_file_name() correctly, 
we can only implement
- *      it so that it return a a list of filenames (concated by ','). In 
findTouchedFiles func.
+ * This file is copied from Delta 3.2.1.
  */
 
 /**
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 1f3aa89901..4c6de7a2af 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -351,13 +351,13 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
         
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
         val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
         assertResult(600572)(addFiles.map(_.rows).sum)
-        // 4 parts belong to the first batch
-        // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
+        // 5 parts belong to the first batch
+        // 1 parts belong to the second batch (1 actual updated part).
         assertResult(6)(addFiles.size)
         val filePaths =
           addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
         assertResult(2)(filePaths.size)
-        assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
+        assertResult(Array(1, 5))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
       }
 
       val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
@@ -428,12 +428,12 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
       val mergetreeScan = scanExec.head
       val fileIndex = 
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
       val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => 
f.asInstanceOf[AddMergeTreeParts])
-      // 4 parts belong to the first batch
-      // 2 parts belong to the second batch (1 actual updated part, 1 
passively updated).
+      // 5 parts belong to the first batch
+      // 1 parts belong to the second batch (1 actual updated part).
       assertResult(6)(addFiles.size)
       val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0, 
name.lastIndexOf("_")))
       assertResult(2)(filePaths.size)
-      assertResult(Array(2, 4))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
+      assertResult(Array(1, 5))(filePaths.values.map(paths => 
paths.size).toArray.sorted)
 
       val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
       clickhouseTable.delete("mod(l_orderkey, 3) = 2")
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
new file mode 100644
index 0000000000..a07960da8e
--- /dev/null
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.mergetree
+
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
GlutenClickHouseTPCDSAbstractSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.delta.Snapshot
+import org.apache.spark.sql.delta.files.TahoeLogFileIndex
+import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+
+import java.io.File
+
+class GlutenClickHouseMergetreeWriteStatsSuite
+  extends GlutenClickHouseTPCDSAbstractSuite
+  with AdaptiveSparkPlanHelper {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.io.compression.codec", "LZ4")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.driver.memory", "2G")
+      .set("spark.memory.offHeap.size", "4G")
+      .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
+      .set("spark.databricks.delta.stats.enabled", "true")
+      .set("spark.databricks.delta.optimizeWrite.enabled", "true")
+      .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
+      .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+      .setCHSettings("mergetree.merge_after_insert", false)
+      .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
+  }
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    val conf = new Configuration
+    conf.set("fs.defaultFS", HDFS_URL)
+    val fs = FileSystem.get(conf)
+    fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+    FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+  }
+
+  override protected def afterEach(): Unit = {
+    super.afterEach()
+    FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+  }
+
+  test("test mergetree virtual columns") {
+    spark.sql("create database if not exists mergetree")
+    spark.sql("use mergetree")
+    spark.sql("drop table if exists store_sales")
+    spark.sql(s"""
+                 |CREATE EXTERNAL TABLE IF NOT EXISTS mergetree.store_sales
+                 |(
+                 |     ss_sold_time_sk INT,
+                 |     ss_item_sk INT,
+                 |     ss_customer_sk INT,
+                 |     ss_cdemo_sk INT,
+                 |     ss_hdemo_sk INT,
+                 |     ss_addr_sk INT,
+                 |     ss_store_sk INT,
+                 |     ss_promo_sk INT,
+                 |     ss_ticket_number bigint,
+                 |     ss_quantity INT,
+                 |     ss_wholesale_cost DECIMAL(7,2),
+                 |     ss_list_price DECIMAL(7,2),
+                 |     ss_sales_price DECIMAL(7,2),
+                 |     ss_ext_discount_amt DECIMAL(7,2),
+                 |     ss_ext_sales_price DECIMAL(7,2),
+                 |     ss_ext_wholesale_cost DECIMAL(7,2),
+                 |     ss_ext_list_price DECIMAL(7,2),
+                 |     ss_ext_tax DECIMAL(7,2),
+                 |     ss_coupon_amt DECIMAL(7,2),
+                 |     ss_net_paid DECIMAL(7,2),
+                 |     ss_net_paid_inc_tax DECIMAL(7,2),
+                 |     ss_net_profit DECIMAL(7,2),
+                 |     ss_sold_date_sk INT
+                 |)
+                 |USING clickhouse
+                 |LOCATION '$HDFS_URL/test/store_sales'
+                 |TBLPROPERTIES (storage_policy='__hdfs_main')
+                 |""".stripMargin)
+
+    // scalastyle:off line.size.limit
+    spark.sql(
+      "insert into mergetree.store_sales select /*+ REPARTITION(3) */ * from 
tpcdsdb.store_sales")
+    val df = spark.sql(
+      "select input_file_name(), count(*) from mergetree.store_sales group by 
input_file_name()")
+    val inputFiles = df.collect().map(_.getString(0))
+    val snapshot = getDeltaSnapshot(df)
+    val deltaLogFiles = snapshot.allFiles
+      .collect()
+      .map(addFile => snapshot.path.getParent.toString + "/" + addFile.path)
+    assertResult(inputFiles.toSet)(deltaLogFiles.toSet)
+    // scalastyle:on line.size.limit
+  }
+
+  def getDeltaSnapshot(df: DataFrame): Snapshot = {
+    val scanExec = collect(df.queryExecution.sparkPlan) {
+      case nf: FileSourceScanExecTransformer => nf
+      case f: FileSourceScanExec => f
+    }
+    assertResult(1)(scanExec.size)
+    val mergetreeScan = scanExec.head
+    val snapshot: Snapshot = mergetreeScan.relation.location match {
+      case pdf: PreparedDeltaFileIndex => pdf.preparedScan.scannedSnapshot
+      case tlf: TahoeLogFileIndex => tlf.getSnapshot
+    }
+    assert(snapshot != null)
+    snapshot
+  }
+
+}
diff --git a/cpp-ch/local-engine/Parser/InputFileNameParser.cpp 
b/cpp-ch/local-engine/Parser/InputFileNameParser.cpp
deleted file mode 100644
index ebb707ada4..0000000000
--- a/cpp-ch/local-engine/Parser/InputFileNameParser.cpp
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "InputFileNameParser.h"
-
-#include <ranges>
-
-#include <DataTypes/DataTypeString.h>
-#include <DataTypes/DataTypesNumber.h>
-#include <Processors/ISimpleTransform.h>
-#include <Processors/QueryPlan/QueryPlan.h>
-#include <QueryPipeline/QueryPipelineBuilder.h>
-
-
-namespace local_engine
-{
-static DB::ITransformingStep::Traits getTraits()
-{
-    return DB::ITransformingStep::Traits{
-        {
-            .returns_single_stream = false,
-            .preserves_number_of_streams = true,
-            .preserves_sorting = true,
-        },
-        {
-            .preserves_number_of_rows = true,
-        }};
-}
-
-static DB::Block createOutputHeader(
-    const DB::Block & header,
-    const std::optional<String> & file_name,
-    const std::optional<Int64> & block_start,
-    const std::optional<Int64> & block_length)
-{
-    DB::Block output_header{header};
-    if (file_name.has_value())
-        
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeString>(),
 FileMetaColumns::INPUT_FILE_NAME});
-    if (block_start.has_value())
-        
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(),
 FileMetaColumns::INPUT_FILE_BLOCK_START});
-    if (block_length.has_value())
-        
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(),
 FileMetaColumns::INPUT_FILE_BLOCK_LENGTH});
-    return output_header;
-}
-
-class InputFileExprProjectTransform : public DB::ISimpleTransform
-{
-public:
-    InputFileExprProjectTransform(
-        const DB::Block & input_header_,
-        const DB::Block & output_header_,
-        const std::optional<String> & file_name,
-        const std::optional<Int64> & block_start,
-        const std::optional<Int64> & block_length)
-        : ISimpleTransform(input_header_, output_header_, true), 
file_name(file_name), block_start(block_start), block_length(block_length)
-    {
-    }
-
-    String getName() const override { return "InputFileExprProjectTransform"; }
-
-    void transform(DB::Chunk & chunk) override
-    {
-        InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(), 
chunk, file_name, block_start, block_length);
-    }
-
-private:
-    std::optional<String> file_name;
-    std::optional<Int64> block_start;
-    std::optional<Int64> block_length;
-};
-
-class InputFileExprProjectStep : public DB::ITransformingStep
-{
-public:
-    InputFileExprProjectStep(
-        const DB::Block & input_header,
-        const std::optional<String> & file_name,
-        const std::optional<Int64> & block_start,
-        const std::optional<Int64> & block_length)
-        : ITransformingStep(input_header, createOutputHeader(input_header, 
file_name, block_start, block_length), getTraits(), true)
-        , file_name(file_name)
-        , block_start(block_start)
-        , block_length(block_length)
-    {
-    }
-
-    String getName() const override { return "InputFileExprProjectStep"; }
-
-    void transformPipeline(DB::QueryPipelineBuilder & pipeline, const 
DB::BuildQueryPipelineSettings & /*settings*/) override
-    {
-        pipeline.addSimpleTransform(
-            [&](const DB::Block & header)
-            { return std::make_shared<InputFileExprProjectTransform>(header, 
*output_header, file_name, block_start, block_length); });
-    }
-
-protected:
-    void updateOutputHeader() override
-    {
-        // do nothing
-    }
-
-private:
-    std::optional<String> file_name;
-    std::optional<Int64> block_start;
-    std::optional<Int64> block_length;
-};
-
-bool InputFileNameParser::hasInputFileNameColumn(const DB::Block & block)
-{
-    return block.findByName(FileMetaColumns::INPUT_FILE_NAME) != nullptr;
-}
-
-bool InputFileNameParser::hasInputFileBlockStartColumn(const DB::Block & block)
-{
-    return block.findByName(FileMetaColumns::INPUT_FILE_BLOCK_START) != 
nullptr;
-}
-
-bool InputFileNameParser::hasInputFileBlockLengthColumn(const DB::Block & 
block)
-{
-    return block.findByName(FileMetaColumns::INPUT_FILE_BLOCK_LENGTH) != 
nullptr;
-}
-
-void InputFileNameParser::addInputFileColumnsToChunk(
-    const DB::Block & header,
-    DB::Chunk & chunk,
-    const std::optional<String> & file_name,
-    const std::optional<Int64> & block_start,
-    const std::optional<Int64> & block_length)
-{
-    auto output_columns = chunk.getColumns();
-    for (size_t i = 0; i < header.columns(); ++i)
-    {
-        const auto & column = header.getByPosition(i);
-        if (column.name == FileMetaColumns::INPUT_FILE_NAME)
-        {
-            if (!file_name.has_value())
-                throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Input file 
name is not set");
-            auto type_string = std::make_shared<DB::DataTypeString>();
-            auto file_name_column = 
type_string->createColumnConst(chunk.getNumRows(), file_name.value());
-            output_columns.insert(output_columns.begin() + i, 
std::move(file_name_column));
-        }
-        else if (column.name == FileMetaColumns::INPUT_FILE_BLOCK_START)
-        {
-            if (!block_start.has_value())
-                throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, 
"block_start is not set");
-            auto type_int64 = std::make_shared<DB::DataTypeInt64>();
-            auto block_start_column = 
type_int64->createColumnConst(chunk.getNumRows(), block_start.value());
-            output_columns.insert(output_columns.begin() + i, 
std::move(block_start_column));
-        }
-        else if (column.name == FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)
-        {
-            if (!block_length.has_value())
-                throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, 
"block_length is not set");
-            auto type_int64 = std::make_shared<DB::DataTypeInt64>();
-            auto block_length_column = 
type_int64->createColumnConst(chunk.getNumRows(), block_length.value());
-            output_columns.insert(output_columns.begin() + i, 
std::move(block_length_column));
-        }
-    }
-    chunk.setColumns(output_columns, chunk.getNumRows());
-}
-
-bool InputFileNameParser::containsInputFileColumns(const DB::Block & block)
-{
-    return FileMetaColumns::hasVirtualColumns(block);
-}
-
-DB::Block InputFileNameParser::removeInputFileColumn(const DB::Block & block)
-{
-    return FileMetaColumns::removeVirtualColumns(block);
-}
-
-std::optional<DB::IQueryPlanStep *> 
InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan)
-{
-    if (!file_name.has_value() && !block_start.has_value() && 
!block_length.has_value())
-        return std::nullopt;
-    auto step = 
std::make_unique<InputFileExprProjectStep>(plan.getCurrentHeader(), file_name, 
block_start, block_length);
-    step->setStepDescription("Input file expression project");
-    std::optional<DB::IQueryPlanStep *> result = step.get();
-    plan.addStep(std::move(step));
-    return result;
-}
-
-}
diff --git a/cpp-ch/local-engine/Parser/InputFileNameParser.h 
b/cpp-ch/local-engine/Parser/InputFileNameParser.h
deleted file mode 100644
index a716f91c7c..0000000000
--- a/cpp-ch/local-engine/Parser/InputFileNameParser.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-#include <Processors/QueryPlan/ExpressionStep.h>
-#include <Storages/SubstraitSource/FormatFile.h>
-
-namespace DB
-{
-class Chunk;
-}
-
-namespace local_engine
-{
-class InputFileNameParser
-{
-public:
-    static bool hasInputFileNameColumn(const DB::Block & block);
-    static bool hasInputFileBlockStartColumn(const DB::Block & block);
-    static bool hasInputFileBlockLengthColumn(const DB::Block & block);
-    static bool containsInputFileColumns(const DB::Block & block);
-    static DB::Block removeInputFileColumn(const DB::Block & block);
-    static void addInputFileColumnsToChunk(
-        const DB::Block & header,
-        DB::Chunk & chunk,
-        const std::optional<String> & file_name,
-        const std::optional<Int64> & block_start,
-        const std::optional<Int64> & block_length);
-
-
-    void setFileName(const String & file_name) { this->file_name = file_name; }
-    void setBlockStart(const Int64 block_start) { this->block_start = 
block_start; }
-    void setBlockLength(const Int64 block_length) { this->block_length = 
block_length; }
-
-    [[nodiscard]] std::optional<DB::IQueryPlanStep *> 
addInputFileProjectStep(DB::QueryPlan & plan);
-
-private:
-    std::optional<String> file_name;
-    std::optional<Int64> block_start;
-    std::optional<Int64> block_length;
-};
-} // local_engine
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
index 4c8c59fe21..428313ecc7 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
@@ -19,10 +19,11 @@
 #include <Core/Settings.h>
 #include <Parser/ExpressionParser.h>
 #include <Parser/FunctionParser.h>
-#include <Parser/InputFileNameParser.h>
 #include <Parser/SubstraitParserUtils.h>
 #include <Parser/TypeParser.h>
+#include <Processors/QueryPlan/ExpressionStep.h>
 #include <Storages/MergeTree/StorageMergeTreeFactory.h>
+#include <Storages/SubstraitSource/FormatFile.h>
 #include <boost/algorithm/string/case_conv.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 #include <google/protobuf/wrappers.pb.h>
@@ -73,33 +74,18 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
     merge_tree_table.snapshot_id = "";
     auto storage = 
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
 
-    InputFileNameParser input_file_name_parser;
     DB::Block input;
     DB::Block original_input;
     if (rel.has_base_schema() && rel.base_schema().names_size())
     {
         input = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
-        if (InputFileNameParser::hasInputFileNameColumn(input))
+        if (input.findByName(FileMetaColumns::INPUT_FILE_NAME) != nullptr)
         {
-            std::vector<String> parts;
-            for (const auto & part : merge_tree_table.parts)
-            {
-                parts.push_back(merge_tree_table.absolute_path + "/" + 
part.name);
-            }
-            auto name = Poco::cat<String>(",", parts.begin(), parts.end());
-            input_file_name_parser.setFileName(name);
-        }
-        if (InputFileNameParser::hasInputFileBlockStartColumn(input))
-        {
-            // mergetree doesn't support block start
-            input_file_name_parser.setBlockStart(0);
-        }
-        if (InputFileNameParser::hasInputFileBlockLengthColumn(input))
-        {
-            // mergetree doesn't support block length
-            input_file_name_parser.setBlockLength(0);
+            // mergetree use concat(path, _part) instead of input_file_name
+            
input.insert(ColumnWithTypeAndName(ColumnWithTypeAndName(std::make_shared<DataTypeString>(),
 VIRTUAL_COLUMN_PART)));
         }
-        input = InputFileNameParser::removeInputFileColumn(input);
+        // remove input_file_name, input_file_block_start, 
input_file_block_size due to mergetree doesn't have them
+        input = FileMetaColumns::removeVirtualColumns(input);
 
         SparkSQLConfig sql_config = SparkSQLConfig::loadFromContext(context);
         // case_insensitive_matching
@@ -186,11 +172,66 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
             "Rename MergeTree Output"));
     }
 
-    if (auto step = 
input_file_name_parser.addInputFileProjectStep(*query_plan); step.has_value())
+    // add virtual columns step
+    if (auto step = 
MergeTreeRelParser::addVirtualColumnsProjectStep(*query_plan, rel, 
merge_tree_table.absolute_path); step.has_value())
         steps.emplace_back(step.value());
     return query_plan;
 }
 
+std::optional<DB::IQueryPlanStep *> 
MergeTreeRelParser::addVirtualColumnsProjectStep(DB::QueryPlan & plan, const 
substrait::ReadRel & rel, const std::string & path)
+{
+    if (!rel.has_base_schema() || rel.base_schema().names_size() < 1)
+        return std::nullopt;
+    bool contains_input_file_name = false;
+    bool contains_input_file_block_start = false;
+    bool contains_input_file_block_length = false;
+    for (const auto & name : rel.base_schema().names())
+    {
+        if (name == FileMetaColumns::INPUT_FILE_NAME)
+            contains_input_file_name = true;
+        if (name == FileMetaColumns::INPUT_FILE_BLOCK_START)
+            contains_input_file_block_start = true;
+        if (name == FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)
+            contains_input_file_block_length = true;
+    }
+    if (!contains_input_file_name && !contains_input_file_block_start && 
!contains_input_file_block_length)
+        return std::nullopt;
+
+    const auto & header = plan.getCurrentHeader();
+    DB::ActionsDAG actions_dag(header.getNamesAndTypesList());
+    if (contains_input_file_name)
+    {
+        auto concat_func = FunctionFactory::instance().get("concat", context);
+        DB::ActionsDAG::NodeRawConstPtrs args;
+        const auto string_type = std::make_shared<DB::DataTypeString>();
+        const auto * path_node = 
&actions_dag.addColumn(DB::ColumnWithTypeAndName(string_type->createColumnConst(1,
 path + "/"), string_type, "path"));
+        args.emplace_back(path_node);
+        const auto & part_name = 
actions_dag.findInOutputs(VIRTUAL_COLUMN_PART);
+        args.emplace_back(&part_name);
+        actions_dag.addOrReplaceInOutputs(actions_dag.addFunction(concat_func, 
args, FileMetaColumns::INPUT_FILE_NAME));
+    }
+    if (contains_input_file_block_start)
+    {
+        const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+        
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
 -1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_START)));
+    }
+    if (contains_input_file_block_length)
+    {
+        const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+        
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
 -1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)));
+    }
+
+    if (contains_input_file_name)
+        actions_dag.removeUnusedResult(VIRTUAL_COLUMN_PART);
+    auto step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(), 
std::move(actions_dag));
+    step->setStepDescription("Add virtual columns");
+    std::optional<DB::IQueryPlanStep *> result = step.get();
+    plan.addStep(std::move(step));
+    return result;
+}
+
+
+
 PrewhereInfoPtr MergeTreeRelParser::parsePreWhereInfo(const 
substrait::Expression & rel, const Block & input)
 {
     std::string filter_name;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h 
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
index 93159311a7..d3bf29e1f6 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
@@ -36,6 +36,8 @@ namespace local_engine
 class MergeTreeRelParser : public RelParser
 {
 public:
+    inline static const std::string VIRTUAL_COLUMN_PART = "_part";
+
     explicit MergeTreeRelParser(ParserContextPtr parser_context_, const 
DB::ContextPtr & context_)
         : RelParser(parser_context_), context(context_)
     {
@@ -56,6 +58,8 @@ public:
         throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser 
can't call getSingleInput().");
     }
 
+    std::optional<DB::IQueryPlanStep *> 
addVirtualColumnsProjectStep(DB::QueryPlan & plan, const substrait::ReadRel & 
rel, const std::string & path);
+
     String filterRangesOnDriver(const substrait::ReadRel & read_rel);
 
     struct Condition
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 77096f3f49..506beb2c41 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -134,10 +134,7 @@ struct DeltaStats
             this->null_count[i] += null_count;
 
             DB::Field min_value, max_value;
-            if (const auto * column_nullable = typeid_cast<const 
DB::ColumnNullable *>(column.get()))
-                column_nullable->getExtremesNullLast(min_value, max_value);
-            else
-                column->getExtremes(min_value, max_value);
+            column->getExtremes(min_value, max_value);
 
             assert(min[i].isNull() || min_value.getType() == min[i].getType());
             assert(max[i].isNull() || max_value.getType() == max[i].getType());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to