This is an automated email from the ASF dual-hosted git repository.

lwz9103 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f9c1f1b1f [GLUTEN-9085][CH] Add UT for mergetree write stats (#9089)
6f9c1f1b1f is described below

commit 6f9c1f1b1f8e1464c1b507f9c4e27eeaf3879885
Author: Wenzheng Liu <[email protected]>
AuthorDate: Tue Mar 25 10:26:11 2025 +0800

    [GLUTEN-9085][CH] Add UT for mergetree write stats (#9089)
---
 .../GlutenClickHouseMergetreeWriteStatsSuite.scala | 302 ++++++++++++++++++---
 cpp-ch/local-engine/Common/GlutenConfig.cpp        |   3 +
 cpp-ch/local-engine/Common/GlutenConfig.h          |   2 +
 .../Storages/Output/NormalFileWriter.h             |  51 +++-
 4 files changed, 317 insertions(+), 41 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
index a07960da8e..cd8e07dd01 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.files.TahoeLogFileIndex
 import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.input_file_name
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
@@ -51,6 +52,7 @@ class GlutenClickHouseMergetreeWriteStatsSuite
       .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
       .set("spark.databricks.delta.stats.enabled", "true")
       .set("spark.databricks.delta.optimizeWrite.enabled", "true")
+      .set("spark.sql.storeAssignmentPolicy", "LEGACY")
       .set(RuntimeConfig.LOGGER_LEVEL.key, "error")
       .set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
       .setCHSettings("mergetree.merge_after_insert", false)
@@ -72,42 +74,246 @@ class GlutenClickHouseMergetreeWriteStatsSuite
     FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
   }
 
+  def tpcdsMergetreeTables: Map[String, String] = {
+    Map(
+      "store_sales" ->
+        s"""
+           |CREATE EXTERNAL TABLE IF NOT EXISTS store_sales
+           |(
+           |     ss_sold_time_sk INT,
+           |     ss_item_sk INT,
+           |     ss_customer_sk INT,
+           |     ss_cdemo_sk INT,
+           |     ss_hdemo_sk INT,
+           |     ss_addr_sk INT,
+           |     ss_store_sk INT,
+           |     ss_promo_sk INT,
+           |     ss_ticket_number bigint,
+           |     ss_quantity INT,
+           |     ss_wholesale_cost DECIMAL(7,2),
+           |     ss_list_price DECIMAL(7,2),
+           |     ss_sales_price DECIMAL(7,2),
+           |     ss_ext_discount_amt DECIMAL(7,2),
+           |     ss_ext_sales_price DECIMAL(7,2),
+           |     ss_ext_wholesale_cost DECIMAL(7,2),
+           |     ss_ext_list_price DECIMAL(7,2),
+           |     ss_ext_tax DECIMAL(7,2),
+           |     ss_coupon_amt DECIMAL(7,2),
+           |     ss_net_paid DECIMAL(7,2),
+           |     ss_net_paid_inc_tax DECIMAL(7,2),
+           |     ss_net_profit DECIMAL(7,2),
+           |     ss_sold_date_sk INT
+           |)
+           |USING clickhouse
+           |LOCATION '$HDFS_URL/stats/store_sales'
+           |TBLPROPERTIES (storage_policy='__hdfs_main')
+           |""".stripMargin,
+      "store_returns" ->
+        s"""
+           |CREATE EXTERNAL TABLE IF NOT EXISTS store_returns
+           |(
+           |     sr_return_time_sk INT,
+           |     sr_item_sk INT,
+           |     sr_customer_sk INT,
+           |     sr_cdemo_sk INT,
+           |     sr_hdemo_sk INT,
+           |     sr_addr_sk INT,
+           |     sr_store_sk INT,
+           |     sr_reason_sk INT,
+           |     sr_ticket_number INT,
+           |     sr_return_quantity INT,
+           |     sr_return_amt DECIMAL(7, 2),
+           |     sr_return_tax DECIMAL(7, 2),
+           |     sr_return_amt_inc_tax DECIMAL(7, 2),
+           |     sr_fee DECIMAL(7, 2),
+           |     sr_return_ship_cost DECIMAL(7, 2),
+           |     sr_refunded_cash DECIMAL(7, 2),
+           |     sr_reversed_charge DECIMAL(7, 2),
+           |     sr_store_credit DECIMAL(7, 2),
+           |     sr_net_loss DECIMAL(7, 2),
+           |     sr_returned_date_sk INT
+           |)
+           |USING clickhouse
+           |LOCATION '$HDFS_URL/stats/store_returns'
+           |TBLPROPERTIES (storage_policy='__hdfs_main')
+           |""".stripMargin,
+      "catalog_sales" ->
+        s"""
+           |CREATE EXTERNAL TABLE IF NOT EXISTS catalog_sales
+           |(
+           |     cs_sold_time_sk INT,
+           |     cs_ship_date_sk INT,
+           |     cs_bill_customer_sk INT,
+           |     cs_bill_cdemo_sk INT,
+           |     cs_bill_hdemo_sk INT,
+           |     cs_bill_addr_sk INT,
+           |     cs_ship_customer_sk INT,
+           |     cs_ship_cdemo_sk INT,
+           |     cs_ship_hdemo_sk INT,
+           |     cs_ship_addr_sk INT,
+           |     cs_call_center_sk INT,
+           |     cs_catalog_page_sk INT,
+           |     cs_ship_mode_sk INT,
+           |     cs_warehouse_sk INT,
+           |     cs_item_sk INT,
+           |     cs_promo_sk INT,
+           |     cs_order_number INT,
+           |     cs_quantity INT,
+           |     cs_wholesale_cost DECIMAL(7, 2),
+           |     cs_list_price DECIMAL(7, 2),
+           |     cs_sales_price DECIMAL(7, 2),
+           |     cs_ext_discount_amt DECIMAL(7, 2),
+           |     cs_ext_sales_price DECIMAL(7, 2),
+           |     cs_ext_wholesale_cost DECIMAL(7, 2),
+           |     cs_ext_list_price DECIMAL(7, 2),
+           |     cs_ext_tax DECIMAL(7, 2),
+           |     cs_coupon_amt DECIMAL(7, 2),
+           |     cs_ext_ship_cost DECIMAL(7, 2),
+           |     cs_net_paid DECIMAL(7, 2),
+           |     cs_net_paid_inc_tax DECIMAL(7, 2),
+           |     cs_net_paid_inc_ship DECIMAL(7, 2),
+           |     cs_net_paid_inc_ship_tax DECIMAL(7, 2),
+           |     cs_net_profit DECIMAL(7, 2),
+           |     cs_sold_date_sk INT
+           |)
+           |USING clickhouse
+           |LOCATION '$HDFS_URL/stats/catalog_sales'
+           |TBLPROPERTIES (storage_policy='__hdfs_main')
+           |""".stripMargin,
+      "catalog_returns" ->
+        s"""
+           |CREATE EXTERNAL TABLE IF NOT EXISTS catalog_returns
+           |(
+           |     cr_returned_time_sk INT,
+           |     cr_item_sk INT,
+           |     cr_refunded_customer_sk INT,
+           |     cr_refunded_cdemo_sk INT,
+           |     cr_refunded_hdemo_sk INT,
+           |     cr_refunded_addr_sk INT,
+           |     cr_returning_customer_sk INT,
+           |     cr_returning_cdemo_sk INT,
+           |     cr_returning_hdemo_sk INT,
+           |     cr_returning_addr_sk INT,
+           |     cr_call_center_sk INT,
+           |     cr_catalog_page_sk INT,
+           |     cr_ship_mode_sk INT,
+           |     cr_warehouse_sk INT,
+           |     cr_reason_sk INT,
+           |     cr_order_number INT,
+           |     cr_return_quantity INT,
+           |     cr_return_amount DECIMAL(7, 2),
+           |     cr_return_tax DECIMAL(7, 2),
+           |     cr_return_amt_inc_tax DECIMAL(7, 2),
+           |     cr_fee DECIMAL(7, 2),
+           |     cr_return_ship_cost DECIMAL(7, 2),
+           |     cr_refunded_cash DECIMAL(7, 2),
+           |     cr_reversed_charge DECIMAL(7, 2),
+           |     cr_store_credit DECIMAL(7, 2),
+           |     cr_net_loss DECIMAL(7, 2),
+           |     cr_returned_date_sk INT
+           |)
+           |USING clickhouse
+           |LOCATION '$HDFS_URL/stats/catalog_returns'
+           |TBLPROPERTIES (storage_policy='__hdfs_main')
+           |""".stripMargin,
+      "web_sales" ->
+        s"""
+           |CREATE EXTERNAL TABLE IF NOT EXISTS web_sales
+           |(
+           |     ws_sold_time_sk INT,
+           |     ws_ship_date_sk INT,
+           |     ws_item_sk INT,
+           |     ws_bill_customer_sk INT,
+           |     ws_bill_cdemo_sk INT,
+           |     ws_bill_hdemo_sk INT,
+           |     ws_bill_addr_sk INT,
+           |     ws_ship_customer_sk INT,
+           |     ws_ship_cdemo_sk INT,
+           |     ws_ship_hdemo_sk INT,
+           |     ws_ship_addr_sk INT,
+           |     ws_web_page_sk INT,
+           |     ws_web_site_sk INT,
+           |     ws_ship_mode_sk INT,
+           |     ws_warehouse_sk INT,
+           |     ws_promo_sk INT,
+           |     ws_order_number INT,
+           |     ws_quantity INT,
+           |     ws_wholesale_cost DECIMAL(7, 2),
+           |     ws_list_price DECIMAL(7, 2),
+           |     ws_sales_price DECIMAL(7, 2),
+           |     ws_ext_discount_amt DECIMAL(7, 2),
+           |     ws_ext_sales_price DECIMAL(7, 2),
+           |     ws_ext_wholesale_cost DECIMAL(7, 2),
+           |     ws_ext_list_price DECIMAL(7, 2),
+           |     ws_ext_tax DECIMAL(7, 2),
+           |     ws_coupon_amt DECIMAL(7, 2),
+           |     ws_ext_ship_cost DECIMAL(7, 2),
+           |     ws_net_paid DECIMAL(7, 2),
+           |     ws_net_paid_inc_tax DECIMAL(7, 2),
+           |     ws_net_paid_inc_ship DECIMAL(7, 2),
+           |     ws_net_paid_inc_ship_tax DECIMAL(7, 2),
+           |     ws_net_profit DECIMAL(7, 2),
+           |     ws_sold_date_sk INT
+           |)
+           |USING clickhouse
+           |LOCATION '$HDFS_URL/stats/web_sales'
+           |TBLPROPERTIES (storage_policy='__hdfs_main')
+           |""".stripMargin,
+      "web_returns" ->
+        s"""
+           |CREATE EXTERNAL TABLE IF NOT EXISTS web_returns
+           |(
+           |     wr_returned_time_sk INT,
+           |     wr_item_sk INT,
+           |     wr_refunded_customer_sk INT,
+           |     wr_refunded_cdemo_sk INT,
+           |     wr_refunded_hdemo_sk INT,
+           |     wr_refunded_addr_sk INT,
+           |     wr_returning_customer_sk INT,
+           |     wr_returning_cdemo_sk INT,
+           |     wr_returning_hdemo_sk INT,
+           |     wr_returning_addr_sk INT,
+           |     wr_web_page_sk INT,
+           |     wr_reason_sk INT,
+           |     wr_order_number INT,
+           |     wr_return_quantity INT,
+           |     wr_return_amt DECIMAL(7, 2),
+           |     wr_return_tax DECIMAL(7, 2),
+           |     wr_return_amt_inc_tax DECIMAL(7, 2),
+           |     wr_fee DECIMAL(7, 2),
+           |     wr_return_ship_cost DECIMAL(7, 2),
+           |     wr_refunded_cash DECIMAL(7, 2),
+           |     wr_reversed_charge DECIMAL(7, 2),
+           |     wr_account_credit DECIMAL(7, 2),
+           |     wr_net_loss DECIMAL(7, 2),
+           |     wr_returned_date_sk INT
+           |)
+           |USING clickhouse
+           |LOCATION '$HDFS_URL/stats/web_returns'
+           |TBLPROPERTIES (storage_policy='__hdfs_main')
+           |""".stripMargin,
+      "inventory" ->
+        s"""
+           |CREATE EXTERNAL TABLE IF NOT EXISTS inventory
+           |(
+           |     inv_item_sk INT,
+           |     inv_warehouse_sk INT,
+           |     inv_quantity_on_hand INT,
+           |     inv_date_sk INT
+           |)
+           |USING clickhouse
+           |LOCATION '$HDFS_URL/stats/inventory'
+           |TBLPROPERTIES (storage_policy='__hdfs_main')
+           |""".stripMargin
+    )
+
+  }
+
   test("test mergetree virtual columns") {
     spark.sql("create database if not exists mergetree")
     spark.sql("use mergetree")
     spark.sql("drop table if exists store_sales")
-    spark.sql(s"""
-                 |CREATE EXTERNAL TABLE IF NOT EXISTS mergetree.store_sales
-                 |(
-                 |     ss_sold_time_sk INT,
-                 |     ss_item_sk INT,
-                 |     ss_customer_sk INT,
-                 |     ss_cdemo_sk INT,
-                 |     ss_hdemo_sk INT,
-                 |     ss_addr_sk INT,
-                 |     ss_store_sk INT,
-                 |     ss_promo_sk INT,
-                 |     ss_ticket_number bigint,
-                 |     ss_quantity INT,
-                 |     ss_wholesale_cost DECIMAL(7,2),
-                 |     ss_list_price DECIMAL(7,2),
-                 |     ss_sales_price DECIMAL(7,2),
-                 |     ss_ext_discount_amt DECIMAL(7,2),
-                 |     ss_ext_sales_price DECIMAL(7,2),
-                 |     ss_ext_wholesale_cost DECIMAL(7,2),
-                 |     ss_ext_list_price DECIMAL(7,2),
-                 |     ss_ext_tax DECIMAL(7,2),
-                 |     ss_coupon_amt DECIMAL(7,2),
-                 |     ss_net_paid DECIMAL(7,2),
-                 |     ss_net_paid_inc_tax DECIMAL(7,2),
-                 |     ss_net_profit DECIMAL(7,2),
-                 |     ss_sold_date_sk INT
-                 |)
-                 |USING clickhouse
-                 |LOCATION '$HDFS_URL/test/store_sales'
-                 |TBLPROPERTIES (storage_policy='__hdfs_main')
-                 |""".stripMargin)
-
+    spark.sql(tpcdsMergetreeTables("store_sales"))
     // scalastyle:off line.size.limit
     spark.sql(
       "insert into mergetree.store_sales select /*+ REPARTITION(3) */ * from 
tpcdsdb.store_sales")
@@ -122,6 +328,22 @@ class GlutenClickHouseMergetreeWriteStatsSuite
     // scalastyle:on line.size.limit
   }
 
+  test("verify mergetree delta stats") {
+    if (isSparkVersionGE("3.5")) {
+      spark.sql("create database if not exists mergetree")
+      spark.sql("use mergetree")
+      val tables = Seq(
+        "store_sales",
+        "store_returns",
+        "web_sales",
+        "web_returns",
+        "catalog_sales",
+        "catalog_returns",
+        "inventory")
+      tables.foreach(writeAndCompareDeltaStats(_))
+    }
+  }
+
   def getDeltaSnapshot(df: DataFrame): Snapshot = {
     val scanExec = collect(df.queryExecution.sparkPlan) {
       case nf: FileSourceScanExecTransformer => nf
@@ -137,4 +359,20 @@ class GlutenClickHouseMergetreeWriteStatsSuite
     snapshot
   }
 
+  def writeAndCompareDeltaStats(table: String, partNum: Int = 3): Unit = {
+    spark.sql(s"drop table if exists $table")
+    spark.sql(tpcdsMergetreeTables(table))
+    spark.sql(s"insert into $table select /*+ REPARTITION($partNum) */ * from 
tpcdsdb.$table")
+    val tableDf = spark.table(table)
+    val snapshot = getDeltaSnapshot(tableDf)
+    val tableStats = tableDf
+      .groupBy(input_file_name().as("path"))
+      .agg(snapshot.statsCollector)
+      .orderBy("path")
+    val deltaLogStats = snapshot.withStats
+      .selectExpr(s"concat('${snapshot.path.getParent}/', path)", "stats")
+      .orderBy("path")
+    checkAnswer(tableStats, deltaLogStats)
+  }
+
 }
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp 
b/cpp-ch/local-engine/Common/GlutenConfig.cpp
index bcb6c96de9..383e8d34f8 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.cpp
+++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp
@@ -156,6 +156,9 @@ SparkSQLConfig SparkSQLConfig::loadFromContext(const 
DB::ContextPtr & context)
 {
     SparkSQLConfig sql_config;
     sql_config.caseSensitive = 
context->getConfigRef().getBool("spark.sql.caseSensitive", false);
+    // TODO support transfer spark settings from spark session to native engine
+    sql_config.deltaDataSkippingNumIndexedCols = 
context->getConfigRef().getUInt64("spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols",
 32);
+    sql_config.deltaDataSkippingStatsColumns = 
context->getConfigRef().getString("spark.databricks.delta.properties.defaults.dataSkippingStatsColumns",
 "");
 
     return sql_config;
 }
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h 
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 74ccc441fc..62e0d228e0 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -182,6 +182,8 @@ inline constexpr auto DEFAULT_TEMP_FILE_PATH = "/tmp/libch";
 struct SparkSQLConfig
 {
     bool caseSensitive = false; // spark.sql.caseSensitive
+    size_t deltaDataSkippingNumIndexedCols = 32;
+    String deltaDataSkippingStatsColumns;
 
     static SparkSQLConfig loadFromContext(const DB::ContextPtr & context);
 };
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 506beb2c41..9d740eb4ee 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -68,6 +68,8 @@ OutputFormatFilePtr createOutputFormatFile(
 struct DeltaStats
 {
     size_t row_count;
+    // TODO Support delta.dataSkippingStatsColumns, detail see 
https://docs.databricks.com/aws/en/delta/data-skipping
+    size_t n_stats_cols;
     std::vector<DB::Field> min;
     std::vector<DB::Field> max;
     std::vector<Int64> null_count;
@@ -91,23 +93,55 @@ struct DeltaStats
 
         assert(partition_index.size() == partition.size());
 
+        size_t num_stats_cols = numStatsCols(output.columns() - 
partition.size());
         auto appendBase = [&](const std::string & prefix)
         {
-            for (const auto & column : output.getColumnsWithTypeAndName())
+            for (size_t i = 0, n = 0; i < output.columns() && n < 
num_stats_cols; i++)
+            {
+                const auto & column = output.getByPosition(i);
                 if (!partition_index.contains(column.name))
+                {
                     
statsHeaderBase.emplace_back(wrapNullableType(column.type), prefix + 
column.name);
+                    ++n;
+                }
+            }
         };
         appendBase("min_");
         appendBase("max_");
-        for (const auto & column : output.getColumnsWithTypeAndName())
+        for (size_t i = 0, n = 0; i < output.columns() && n < num_stats_cols; 
i++)
+        {
+            const auto & column = output.getByPosition(i);
             if (!partition_index.contains(column.name))
+            {
                 statsHeaderBase.emplace_back(BIGINT(), "null_count_" + 
column.name);
+                ++n;
+            }
+        }
 
         return DB::Block{statsHeaderBase};
     }
 
+    static size_t numStatsCols(size_t origin)
+    {
+        if (DB::CurrentThread::isInitialized())
+        {
+            const DB::ContextPtr query_context = 
DB::CurrentThread::get().getQueryContext();
+            if (query_context)
+            {
+                SparkSQLConfig config = 
SparkSQLConfig::loadFromContext(query_context);
+                return std::min(config.deltaDataSkippingNumIndexedCols, 
origin);
+            }
+        }
+        return origin;
+    }
+
     explicit DeltaStats(size_t size, const std::set<size_t> & partition_index_ 
= {})
-        : row_count(0), min(size), max(size), null_count(size, 0), 
partition_index(partition_index_)
+        : row_count(0)
+        , n_stats_cols(numStatsCols(size))
+        , min(n_stats_cols)
+        , max(n_stats_cols)
+        , null_count(n_stats_cols, 0)
+        , partition_index(partition_index_)
     {
         assert(size > 0);
     }
@@ -118,8 +152,8 @@ struct DeltaStats
     {
         assert(chunk.getNumRows() > 0);
         const auto & columns = chunk.getColumns();
-        assert(columns.size() == min.size() + partition_index.size());
-        for (size_t i = 0, col = 0; col < columns.size(); ++col)
+        assert(columns.size() - partition_index.size() >= n_stats_cols);
+        for (size_t i = 0, col = 0; i < n_stats_cols && col < columns.size(); 
++col)
         {
             if (partition_index.contains(col))
                 continue;
@@ -157,10 +191,10 @@ struct DeltaStats
 
     void merge(const DeltaStats & right)
     {
-        assert(min.size() == right.min.size());
+        assert(n_stats_cols == right.n_stats_cols);
         assert(partition_index == right.partition_index);
 
-        for (size_t i = 0; i < min.size(); ++i)
+        for (size_t i = 0; i < n_stats_cols; ++i)
         {
             null_count[i] += right.null_count[i];
             min[i] = std::min(min[i], right.min[i]);
@@ -244,9 +278,8 @@ public:
     void collectStats(const String & filename, const String & partition_dir, 
const DeltaStats & stats) const
     {
         const std::string & partition = partition_dir.empty() ? 
WriteStatsBase::NO_PARTITION_ID : partition_dir;
-        size_t columnSize = stats.min.size();
+        size_t columnSize = stats.n_stats_cols;
         assert(columns_.size() == stats_column_start + columnSize * 3);
-        assert(stats.min.size() == stats.max.size() && stats.min.size() == 
stats.null_count.size());
 
         columns_[ColumnIndex::filename]->insertData(filename.c_str(), 
filename.size());
         columns_[partition_id]->insertData(partition.c_str(), 
partition.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to