This is an automated email from the ASF dual-hosted git repository.
lwz9103 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 76a7ff8ec6 [GLUTEN-9044][CH] Fix virtual columns in mergetree table
(#9047)
76a7ff8ec6 is described below
commit 76a7ff8ec6cfc39138b68524ea9cab32898af54f
Author: Wenzheng Liu <[email protected]>
AuthorDate: Fri Mar 21 13:23:27 2025 +0800
[GLUTEN-9044][CH] Fix virtual columns in mergetree table (#9047)
---
.../spark/sql/delta/commands/DeleteCommand.scala | 5 +-
.../sql/delta/commands/MergeIntoCommand.scala | 2 -
.../spark/sql/delta/commands/UpdateCommand.scala | 5 +-
.../spark/sql/delta/commands/DeleteCommand.scala | 7 +-
.../sql/delta/commands/MergeIntoCommand.scala | 4 +-
.../spark/sql/delta/commands/UpdateCommand.scala | 5 +-
.../spark/sql/delta/commands/DeleteCommand.scala | 7 +-
.../spark/sql/delta/commands/UpdateCommand.scala | 7 +-
.../commands/merge/ClassicMergeExecutor.scala | 4 +-
...tenClickHouseMergeTreePathBasedWriteSuite.scala | 12 +-
.../GlutenClickHouseMergetreeWriteStatsSuite.scala | 140 +++++++++++++++
cpp-ch/local-engine/Parser/InputFileNameParser.cpp | 197 ---------------------
cpp-ch/local-engine/Parser/InputFileNameParser.h | 55 ------
.../Parser/RelParsers/MergeTreeRelParser.cpp | 85 ++++++---
.../Parser/RelParsers/MergeTreeRelParser.h | 4 +
.../Storages/Output/NormalFileWriter.h | 5 +-
16 files changed, 225 insertions(+), 319 deletions(-)
diff --git
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index 61b0330723..9feac0e0b4 100644
---
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
-import org.apache.spark.sql.functions.{col, explode, input_file_name, lit,
split, typedLit, udf}
+import org.apache.spark.sql.functions.{input_file_name, lit, typedLit, udf}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
@@ -39,8 +39,6 @@ import
com.fasterxml.jackson.databind.annotation.JsonDeserialize
* Gluten overwrite Delta:
*
* This file is copied from Delta 2.2.0. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
*/
trait DeleteCommandMetrics { self: LeafRunnableCommand =>
@@ -221,7 +219,6 @@ case class DeleteCommand(deltaLog: DeltaLog, target:
LogicalPlan, condition: Opt
.filter(new Column(cond))
.select(input_file_name().as("input_files"))
.filter(deletedRowUdf())
- .select(explode(split(col("input_files"), ",")))
.distinct()
.as[String]
.collect()
diff --git
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
index 4b26d5d5d9..4882def7b2 100644
---
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
+++
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
@@ -49,8 +49,6 @@ import scala.collection.mutable
* Gluten overwrite Delta:
*
* This file is copied from Delta 2.2.0. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
*/
case class MergeDataSizes(
diff --git
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 42a081788e..ce6a7a474e 100644
---
a/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++
b/backends-clickhouse/src-delta-20/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -38,9 +38,7 @@ import org.apache.hadoop.fs.Path
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 2.2.0. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.2.0.
*/
/**
@@ -149,7 +147,6 @@ case class UpdateCommand(
.filter(new Column(updateCondition))
.filter(updatedRowUdf())
.select(input_file_name().as("input_files"))
- .select(explode(split(col("input_files"), ",")))
.distinct()
.as[String]
.collect()
diff --git
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index 5f9c2953ba..88f2b208af 100644
---
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -35,15 +35,13 @@ import
org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric,
createTimingMetric}
-import org.apache.spark.sql.functions.{col, explode, input_file_name, split}
+import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.types.LongType
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 2.3.0. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.3.0.
*/
trait DeleteCommandMetrics { self: LeafRunnableCommand =>
@@ -293,7 +291,6 @@ case class DeleteCommand(
data.filter(new Column(cond))
.select(input_file_name().as("input_files"))
.filter(deletedRowUdf())
- .select(explode(split(col("input_files"), ",")))
.distinct()
.as[String]
.collect()
diff --git
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
index bb4d668975..86bd9a4233 100644
---
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
+++
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala
@@ -49,9 +49,7 @@ import org.apache.spark.sql.types.{DataTypes, LongType,
StructType}
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 2.3.0. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.3.0.
*/
case class MergeDataSizes(
diff --git
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 94ccef961c..b39bcd5ba8 100644
---
a/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++
b/backends-clickhouse/src-delta-23/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -39,9 +39,7 @@ import org.apache.spark.sql.types.LongType
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 2.3.0. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 2.3.0.
*/
/**
@@ -156,7 +154,6 @@ case class UpdateCommand(
data.filter(new Column(updateCondition))
.select(input_file_name().as("input_files"))
.filter(updatedRowUdf())
- .select(explode(split(col("input_files"), ",")))
.distinct()
.as[String]
.collect()
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
index 0a25346fc6..a820da4f9f 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/DeleteCommand.scala
@@ -39,15 +39,13 @@ import
org.apache.spark.sql.catalyst.plans.logical.{DeltaDelete, LogicalPlan}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric,
createTimingMetric}
-import org.apache.spark.sql.functions.{col, explode, input_file_name, split}
+import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.types.LongType
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 3.2.1.
*/
trait DeleteCommandMetrics { self: LeafRunnableCommand =>
@@ -314,7 +312,6 @@ case class DeleteCommand(
data.filter(new Column(cond))
.select(input_file_name().as("input_files"))
.filter(new Column(incrDeletedCountExpr))
- .select(explode(split(col("input_files"), ",")))
.distinct()
.as[String]
.collect()
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
index 4e75b84619..692067d97d 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala
@@ -39,15 +39,13 @@ import
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.{createMetric,
createTimingMetric}
-import org.apache.spark.sql.functions.{array, col, explode, input_file_name,
lit, split, struct}
+import org.apache.spark.sql.functions.{array, col, input_file_name, explode,
lit, struct}
import org.apache.spark.sql.types.LongType
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ',').
+ * This file is copied from Delta 3.2.1.
*/
/**
@@ -195,7 +193,6 @@ case class UpdateCommand(
data.filter(new Column(updateCondition))
.select(input_file_name().as("input_files"))
.filter(new Column(incrUpdatedCountExpr))
- .select(explode(split(col("input_files"), ",")))
.distinct()
.as[String]
.collect()
diff --git
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
index aa1f94c5c9..cef996e7ba 100644
---
a/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
+++
b/backends-clickhouse/src-delta-32/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala
@@ -35,9 +35,7 @@ import org.apache.spark.sql.functions.{coalesce, col, count,
input_file_name, li
/**
* Gluten overwrite Delta:
*
- * This file is copied from Delta 3.2.1. It is modified to overcome the
following issues:
- * 1. In Clickhouse backend, we can't implement input_file_name() correctly,
we can only implement
- * it so that it return a a list of filenames (concated by ','). In
findTouchedFiles func.
+ * This file is copied from Delta 3.2.1.
*/
/**
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index 1f3aa89901..4c6de7a2af 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -351,13 +351,13 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
assertResult(600572)(addFiles.map(_.rows).sum)
- // 4 parts belong to the first batch
- // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
+ // 5 parts belong to the first batch
+ // 1 parts belong to the second batch (1 actual updated part).
assertResult(6)(addFiles.size)
val filePaths =
addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
- assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
+ assertResult(Array(1, 5))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
}
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
@@ -428,12 +428,12 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
val mergetreeScan = scanExec.head
val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
- // 4 parts belong to the first batch
- // 2 parts belong to the second batch (1 actual updated part, 1
passively updated).
+ // 5 parts belong to the first batch
+ // 1 parts belong to the second batch (1 actual updated part).
assertResult(6)(addFiles.size)
val filePaths = addFiles.map(_.path).groupBy(name => name.substring(0,
name.lastIndexOf("_")))
assertResult(2)(filePaths.size)
- assertResult(Array(2, 4))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
+ assertResult(Array(1, 5))(filePaths.values.map(paths =>
paths.size).toArray.sorted)
val clickhouseTable = ClickhouseTable.forPath(spark, dataPath)
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
new file mode 100644
index 0000000000..a07960da8e
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.mergetree
+
+import org.apache.gluten.backendsapi.clickhouse.{CHConfig, RuntimeConfig}
+import org.apache.gluten.backendsapi.clickhouse.CHConfig.GlutenCHConf
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{FileSourceScanExecTransformer,
GlutenClickHouseTPCDSAbstractSuite}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.delta.Snapshot
+import org.apache.spark.sql.delta.files.TahoeLogFileIndex
+import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+
+import java.io.File
+
+class GlutenClickHouseMergetreeWriteStatsSuite
+ extends GlutenClickHouseTPCDSAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.driver.memory", "2G")
+ .set("spark.memory.offHeap.size", "4G")
+ .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
+ .set("spark.databricks.delta.stats.enabled", "true")
+ .set("spark.databricks.delta.optimizeWrite.enabled", "true")
+ .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
+ .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
+ .setCHSettings("mergetree.merge_after_insert", false)
+ .set(CHConfig.ENABLE_ONEPIPELINE_MERGETREE_WRITE.key, spark35.toString)
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ val conf = new Configuration
+ conf.set("fs.defaultFS", HDFS_URL)
+ val fs = FileSystem.get(conf)
+ fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ }
+
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ }
+
+ test("test mergetree virtual columns") {
+ spark.sql("create database if not exists mergetree")
+ spark.sql("use mergetree")
+ spark.sql("drop table if exists store_sales")
+ spark.sql(s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS mergetree.store_sales
+ |(
+ | ss_sold_time_sk INT,
+ | ss_item_sk INT,
+ | ss_customer_sk INT,
+ | ss_cdemo_sk INT,
+ | ss_hdemo_sk INT,
+ | ss_addr_sk INT,
+ | ss_store_sk INT,
+ | ss_promo_sk INT,
+ | ss_ticket_number bigint,
+ | ss_quantity INT,
+ | ss_wholesale_cost DECIMAL(7,2),
+ | ss_list_price DECIMAL(7,2),
+ | ss_sales_price DECIMAL(7,2),
+ | ss_ext_discount_amt DECIMAL(7,2),
+ | ss_ext_sales_price DECIMAL(7,2),
+ | ss_ext_wholesale_cost DECIMAL(7,2),
+ | ss_ext_list_price DECIMAL(7,2),
+ | ss_ext_tax DECIMAL(7,2),
+ | ss_coupon_amt DECIMAL(7,2),
+ | ss_net_paid DECIMAL(7,2),
+ | ss_net_paid_inc_tax DECIMAL(7,2),
+ | ss_net_profit DECIMAL(7,2),
+ | ss_sold_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/test/store_sales'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin)
+
+ // scalastyle:off line.size.limit
+ spark.sql(
+ "insert into mergetree.store_sales select /*+ REPARTITION(3) */ * from
tpcdsdb.store_sales")
+ val df = spark.sql(
+ "select input_file_name(), count(*) from mergetree.store_sales group by
input_file_name()")
+ val inputFiles = df.collect().map(_.getString(0))
+ val snapshot = getDeltaSnapshot(df)
+ val deltaLogFiles = snapshot.allFiles
+ .collect()
+ .map(addFile => snapshot.path.getParent.toString + "/" + addFile.path)
+ assertResult(inputFiles.toSet)(deltaLogFiles.toSet)
+ // scalastyle:on line.size.limit
+ }
+
+ def getDeltaSnapshot(df: DataFrame): Snapshot = {
+ val scanExec = collect(df.queryExecution.sparkPlan) {
+ case nf: FileSourceScanExecTransformer => nf
+ case f: FileSourceScanExec => f
+ }
+ assertResult(1)(scanExec.size)
+ val mergetreeScan = scanExec.head
+ val snapshot: Snapshot = mergetreeScan.relation.location match {
+ case pdf: PreparedDeltaFileIndex => pdf.preparedScan.scannedSnapshot
+ case tlf: TahoeLogFileIndex => tlf.getSnapshot
+ }
+ assert(snapshot != null)
+ snapshot
+ }
+
+}
diff --git a/cpp-ch/local-engine/Parser/InputFileNameParser.cpp
b/cpp-ch/local-engine/Parser/InputFileNameParser.cpp
deleted file mode 100644
index ebb707ada4..0000000000
--- a/cpp-ch/local-engine/Parser/InputFileNameParser.cpp
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "InputFileNameParser.h"
-
-#include <ranges>
-
-#include <DataTypes/DataTypeString.h>
-#include <DataTypes/DataTypesNumber.h>
-#include <Processors/ISimpleTransform.h>
-#include <Processors/QueryPlan/QueryPlan.h>
-#include <QueryPipeline/QueryPipelineBuilder.h>
-
-
-namespace local_engine
-{
-static DB::ITransformingStep::Traits getTraits()
-{
- return DB::ITransformingStep::Traits{
- {
- .returns_single_stream = false,
- .preserves_number_of_streams = true,
- .preserves_sorting = true,
- },
- {
- .preserves_number_of_rows = true,
- }};
-}
-
-static DB::Block createOutputHeader(
- const DB::Block & header,
- const std::optional<String> & file_name,
- const std::optional<Int64> & block_start,
- const std::optional<Int64> & block_length)
-{
- DB::Block output_header{header};
- if (file_name.has_value())
-
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeString>(),
FileMetaColumns::INPUT_FILE_NAME});
- if (block_start.has_value())
-
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(),
FileMetaColumns::INPUT_FILE_BLOCK_START});
- if (block_length.has_value())
-
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(),
FileMetaColumns::INPUT_FILE_BLOCK_LENGTH});
- return output_header;
-}
-
-class InputFileExprProjectTransform : public DB::ISimpleTransform
-{
-public:
- InputFileExprProjectTransform(
- const DB::Block & input_header_,
- const DB::Block & output_header_,
- const std::optional<String> & file_name,
- const std::optional<Int64> & block_start,
- const std::optional<Int64> & block_length)
- : ISimpleTransform(input_header_, output_header_, true),
file_name(file_name), block_start(block_start), block_length(block_length)
- {
- }
-
- String getName() const override { return "InputFileExprProjectTransform"; }
-
- void transform(DB::Chunk & chunk) override
- {
- InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(),
chunk, file_name, block_start, block_length);
- }
-
-private:
- std::optional<String> file_name;
- std::optional<Int64> block_start;
- std::optional<Int64> block_length;
-};
-
-class InputFileExprProjectStep : public DB::ITransformingStep
-{
-public:
- InputFileExprProjectStep(
- const DB::Block & input_header,
- const std::optional<String> & file_name,
- const std::optional<Int64> & block_start,
- const std::optional<Int64> & block_length)
- : ITransformingStep(input_header, createOutputHeader(input_header,
file_name, block_start, block_length), getTraits(), true)
- , file_name(file_name)
- , block_start(block_start)
- , block_length(block_length)
- {
- }
-
- String getName() const override { return "InputFileExprProjectStep"; }
-
- void transformPipeline(DB::QueryPipelineBuilder & pipeline, const
DB::BuildQueryPipelineSettings & /*settings*/) override
- {
- pipeline.addSimpleTransform(
- [&](const DB::Block & header)
- { return std::make_shared<InputFileExprProjectTransform>(header,
*output_header, file_name, block_start, block_length); });
- }
-
-protected:
- void updateOutputHeader() override
- {
- // do nothing
- }
-
-private:
- std::optional<String> file_name;
- std::optional<Int64> block_start;
- std::optional<Int64> block_length;
-};
-
-bool InputFileNameParser::hasInputFileNameColumn(const DB::Block & block)
-{
- return block.findByName(FileMetaColumns::INPUT_FILE_NAME) != nullptr;
-}
-
-bool InputFileNameParser::hasInputFileBlockStartColumn(const DB::Block & block)
-{
- return block.findByName(FileMetaColumns::INPUT_FILE_BLOCK_START) !=
nullptr;
-}
-
-bool InputFileNameParser::hasInputFileBlockLengthColumn(const DB::Block &
block)
-{
- return block.findByName(FileMetaColumns::INPUT_FILE_BLOCK_LENGTH) !=
nullptr;
-}
-
-void InputFileNameParser::addInputFileColumnsToChunk(
- const DB::Block & header,
- DB::Chunk & chunk,
- const std::optional<String> & file_name,
- const std::optional<Int64> & block_start,
- const std::optional<Int64> & block_length)
-{
- auto output_columns = chunk.getColumns();
- for (size_t i = 0; i < header.columns(); ++i)
- {
- const auto & column = header.getByPosition(i);
- if (column.name == FileMetaColumns::INPUT_FILE_NAME)
- {
- if (!file_name.has_value())
- throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Input file
name is not set");
- auto type_string = std::make_shared<DB::DataTypeString>();
- auto file_name_column =
type_string->createColumnConst(chunk.getNumRows(), file_name.value());
- output_columns.insert(output_columns.begin() + i,
std::move(file_name_column));
- }
- else if (column.name == FileMetaColumns::INPUT_FILE_BLOCK_START)
- {
- if (!block_start.has_value())
- throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"block_start is not set");
- auto type_int64 = std::make_shared<DB::DataTypeInt64>();
- auto block_start_column =
type_int64->createColumnConst(chunk.getNumRows(), block_start.value());
- output_columns.insert(output_columns.begin() + i,
std::move(block_start_column));
- }
- else if (column.name == FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)
- {
- if (!block_length.has_value())
- throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR,
"block_length is not set");
- auto type_int64 = std::make_shared<DB::DataTypeInt64>();
- auto block_length_column =
type_int64->createColumnConst(chunk.getNumRows(), block_length.value());
- output_columns.insert(output_columns.begin() + i,
std::move(block_length_column));
- }
- }
- chunk.setColumns(output_columns, chunk.getNumRows());
-}
-
-bool InputFileNameParser::containsInputFileColumns(const DB::Block & block)
-{
- return FileMetaColumns::hasVirtualColumns(block);
-}
-
-DB::Block InputFileNameParser::removeInputFileColumn(const DB::Block & block)
-{
- return FileMetaColumns::removeVirtualColumns(block);
-}
-
-std::optional<DB::IQueryPlanStep *>
InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan)
-{
- if (!file_name.has_value() && !block_start.has_value() &&
!block_length.has_value())
- return std::nullopt;
- auto step =
std::make_unique<InputFileExprProjectStep>(plan.getCurrentHeader(), file_name,
block_start, block_length);
- step->setStepDescription("Input file expression project");
- std::optional<DB::IQueryPlanStep *> result = step.get();
- plan.addStep(std::move(step));
- return result;
-}
-
-}
diff --git a/cpp-ch/local-engine/Parser/InputFileNameParser.h
b/cpp-ch/local-engine/Parser/InputFileNameParser.h
deleted file mode 100644
index a716f91c7c..0000000000
--- a/cpp-ch/local-engine/Parser/InputFileNameParser.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#pragma once
-#include <Processors/QueryPlan/ExpressionStep.h>
-#include <Storages/SubstraitSource/FormatFile.h>
-
-namespace DB
-{
-class Chunk;
-}
-
-namespace local_engine
-{
-class InputFileNameParser
-{
-public:
- static bool hasInputFileNameColumn(const DB::Block & block);
- static bool hasInputFileBlockStartColumn(const DB::Block & block);
- static bool hasInputFileBlockLengthColumn(const DB::Block & block);
- static bool containsInputFileColumns(const DB::Block & block);
- static DB::Block removeInputFileColumn(const DB::Block & block);
- static void addInputFileColumnsToChunk(
- const DB::Block & header,
- DB::Chunk & chunk,
- const std::optional<String> & file_name,
- const std::optional<Int64> & block_start,
- const std::optional<Int64> & block_length);
-
-
- void setFileName(const String & file_name) { this->file_name = file_name; }
- void setBlockStart(const Int64 block_start) { this->block_start =
block_start; }
- void setBlockLength(const Int64 block_length) { this->block_length =
block_length; }
-
- [[nodiscard]] std::optional<DB::IQueryPlanStep *>
addInputFileProjectStep(DB::QueryPlan & plan);
-
-private:
- std::optional<String> file_name;
- std::optional<Int64> block_start;
- std::optional<Int64> block_length;
-};
-} // local_engine
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
index 4c8c59fe21..428313ecc7 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.cpp
@@ -19,10 +19,11 @@
#include <Core/Settings.h>
#include <Parser/ExpressionParser.h>
#include <Parser/FunctionParser.h>
-#include <Parser/InputFileNameParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
+#include <Processors/QueryPlan/ExpressionStep.h>
#include <Storages/MergeTree/StorageMergeTreeFactory.h>
+#include <Storages/SubstraitSource/FormatFile.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <google/protobuf/wrappers.pb.h>
@@ -73,33 +74,18 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
merge_tree_table.snapshot_id = "";
auto storage =
merge_tree_table.restoreStorage(QueryContext::globalMutableContext());
- InputFileNameParser input_file_name_parser;
DB::Block input;
DB::Block original_input;
if (rel.has_base_schema() && rel.base_schema().names_size())
{
input = TypeParser::buildBlockFromNamedStruct(rel.base_schema());
- if (InputFileNameParser::hasInputFileNameColumn(input))
+ if (input.findByName(FileMetaColumns::INPUT_FILE_NAME) != nullptr)
{
- std::vector<String> parts;
- for (const auto & part : merge_tree_table.parts)
- {
- parts.push_back(merge_tree_table.absolute_path + "/" +
part.name);
- }
- auto name = Poco::cat<String>(",", parts.begin(), parts.end());
- input_file_name_parser.setFileName(name);
- }
- if (InputFileNameParser::hasInputFileBlockStartColumn(input))
- {
- // mergetree doesn't support block start
- input_file_name_parser.setBlockStart(0);
- }
- if (InputFileNameParser::hasInputFileBlockLengthColumn(input))
- {
- // mergetree doesn't support block length
- input_file_name_parser.setBlockLength(0);
+ // mergetree use concat(path, _part) instead of input_file_name
+
input.insert(ColumnWithTypeAndName(ColumnWithTypeAndName(std::make_shared<DataTypeString>(),
VIRTUAL_COLUMN_PART)));
}
- input = InputFileNameParser::removeInputFileColumn(input);
+ // remove input_file_name, input_file_block_start,
input_file_block_size due to mergetree doesn't have them
+ input = FileMetaColumns::removeVirtualColumns(input);
SparkSQLConfig sql_config = SparkSQLConfig::loadFromContext(context);
// case_insensitive_matching
@@ -186,11 +172,66 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
"Rename MergeTree Output"));
}
- if (auto step =
input_file_name_parser.addInputFileProjectStep(*query_plan); step.has_value())
+ // add virtual columns step
+ if (auto step =
MergeTreeRelParser::addVirtualColumnsProjectStep(*query_plan, rel,
merge_tree_table.absolute_path); step.has_value())
steps.emplace_back(step.value());
return query_plan;
}
+std::optional<DB::IQueryPlanStep *>
MergeTreeRelParser::addVirtualColumnsProjectStep(DB::QueryPlan & plan, const
substrait::ReadRel & rel, const std::string & path)
+{
+ if (!rel.has_base_schema() || rel.base_schema().names_size() < 1)
+ return std::nullopt;
+ bool contains_input_file_name = false;
+ bool contains_input_file_block_start = false;
+ bool contains_input_file_block_length = false;
+ for (const auto & name : rel.base_schema().names())
+ {
+ if (name == FileMetaColumns::INPUT_FILE_NAME)
+ contains_input_file_name = true;
+ if (name == FileMetaColumns::INPUT_FILE_BLOCK_START)
+ contains_input_file_block_start = true;
+ if (name == FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)
+ contains_input_file_block_length = true;
+ }
+ if (!contains_input_file_name && !contains_input_file_block_start &&
!contains_input_file_block_length)
+ return std::nullopt;
+
+ const auto & header = plan.getCurrentHeader();
+ DB::ActionsDAG actions_dag(header.getNamesAndTypesList());
+ if (contains_input_file_name)
+ {
+ auto concat_func = FunctionFactory::instance().get("concat", context);
+ DB::ActionsDAG::NodeRawConstPtrs args;
+ const auto string_type = std::make_shared<DB::DataTypeString>();
+ const auto * path_node =
&actions_dag.addColumn(DB::ColumnWithTypeAndName(string_type->createColumnConst(1,
path + "/"), string_type, "path"));
+ args.emplace_back(path_node);
+ const auto & part_name =
actions_dag.findInOutputs(VIRTUAL_COLUMN_PART);
+ args.emplace_back(&part_name);
+ actions_dag.addOrReplaceInOutputs(actions_dag.addFunction(concat_func,
args, FileMetaColumns::INPUT_FILE_NAME));
+ }
+ if (contains_input_file_block_start)
+ {
+ const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
-1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_START)));
+ }
+ if (contains_input_file_block_length)
+ {
+ const auto int64_type = std::make_shared<DB::DataTypeInt64>();
+
actions_dag.addOrReplaceInOutputs(actions_dag.addColumn(DB::ColumnWithTypeAndName(int64_type->createColumnConst(1,
-1), int64_type, FileMetaColumns::INPUT_FILE_BLOCK_LENGTH)));
+ }
+
+ if (contains_input_file_name)
+ actions_dag.removeUnusedResult(VIRTUAL_COLUMN_PART);
+ auto step = std::make_unique<DB::ExpressionStep>(plan.getCurrentHeader(),
std::move(actions_dag));
+ step->setStepDescription("Add virtual columns");
+ std::optional<DB::IQueryPlanStep *> result = step.get();
+ plan.addStep(std::move(step));
+ return result;
+}
+
+
+
PrewhereInfoPtr MergeTreeRelParser::parsePreWhereInfo(const
substrait::Expression & rel, const Block & input)
{
std::string filter_name;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
index 93159311a7..d3bf29e1f6 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/RelParsers/MergeTreeRelParser.h
@@ -36,6 +36,8 @@ namespace local_engine
class MergeTreeRelParser : public RelParser
{
public:
+ inline static const std::string VIRTUAL_COLUMN_PART = "_part";
+
explicit MergeTreeRelParser(ParserContextPtr parser_context_, const
DB::ContextPtr & context_)
: RelParser(parser_context_), context(context_)
{
@@ -56,6 +58,8 @@ public:
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser
can't call getSingleInput().");
}
+ std::optional<DB::IQueryPlanStep *>
addVirtualColumnsProjectStep(DB::QueryPlan & plan, const substrait::ReadRel &
rel, const std::string & path);
+
String filterRangesOnDriver(const substrait::ReadRel & read_rel);
struct Condition
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 77096f3f49..506beb2c41 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -134,10 +134,7 @@ struct DeltaStats
this->null_count[i] += null_count;
DB::Field min_value, max_value;
- if (const auto * column_nullable = typeid_cast<const
DB::ColumnNullable *>(column.get()))
- column_nullable->getExtremesNullLast(min_value, max_value);
- else
- column->getExtremes(min_value, max_value);
+ column->getExtremes(min_value, max_value);
assert(min[i].isNull() || min_value.getType() == min[i].getType());
assert(max[i].isNull() || max_value.getType() == max[i].getType());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]