This is an automated email from the ASF dual-hosted git repository.
lwz9103 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6f9c1f1b1f [GLUTEN-9085][CH] Add UT for mergetree write stats (#9089)
6f9c1f1b1f is described below
commit 6f9c1f1b1f8e1464c1b507f9c4e27eeaf3879885
Author: Wenzheng Liu <[email protected]>
AuthorDate: Tue Mar 25 10:26:11 2025 +0800
[GLUTEN-9085][CH] Add UT for mergetree write stats (#9089)
---
.../GlutenClickHouseMergetreeWriteStatsSuite.scala | 302 ++++++++++++++++++---
cpp-ch/local-engine/Common/GlutenConfig.cpp | 3 +
cpp-ch/local-engine/Common/GlutenConfig.h | 2 +
.../Storages/Output/NormalFileWriter.h | 51 +++-
4 files changed, 317 insertions(+), 41 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
index a07960da8e..cd8e07dd01 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergetreeWriteStatsSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.delta.stats.PreparedDeltaFileIndex
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions.input_file_name
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
@@ -51,6 +52,7 @@ class GlutenClickHouseMergetreeWriteStatsSuite
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.databricks.delta.stats.enabled", "true")
.set("spark.databricks.delta.optimizeWrite.enabled", "true")
+ .set("spark.sql.storeAssignmentPolicy", "LEGACY")
.set(RuntimeConfig.LOGGER_LEVEL.key, "error")
.set(GlutenConfig.NATIVE_WRITER_ENABLED.key, "true")
.setCHSettings("mergetree.merge_after_insert", false)
@@ -72,42 +74,246 @@ class GlutenClickHouseMergetreeWriteStatsSuite
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
}
+ def tpcdsMergetreeTables: Map[String, String] = {
+ Map(
+ "store_sales" ->
+ s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS store_sales
+ |(
+ | ss_sold_time_sk INT,
+ | ss_item_sk INT,
+ | ss_customer_sk INT,
+ | ss_cdemo_sk INT,
+ | ss_hdemo_sk INT,
+ | ss_addr_sk INT,
+ | ss_store_sk INT,
+ | ss_promo_sk INT,
+ | ss_ticket_number bigint,
+ | ss_quantity INT,
+ | ss_wholesale_cost DECIMAL(7,2),
+ | ss_list_price DECIMAL(7,2),
+ | ss_sales_price DECIMAL(7,2),
+ | ss_ext_discount_amt DECIMAL(7,2),
+ | ss_ext_sales_price DECIMAL(7,2),
+ | ss_ext_wholesale_cost DECIMAL(7,2),
+ | ss_ext_list_price DECIMAL(7,2),
+ | ss_ext_tax DECIMAL(7,2),
+ | ss_coupon_amt DECIMAL(7,2),
+ | ss_net_paid DECIMAL(7,2),
+ | ss_net_paid_inc_tax DECIMAL(7,2),
+ | ss_net_profit DECIMAL(7,2),
+ | ss_sold_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/stats/store_sales'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin,
+ "store_returns" ->
+ s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS store_returns
+ |(
+ | sr_return_time_sk INT,
+ | sr_item_sk INT,
+ | sr_customer_sk INT,
+ | sr_cdemo_sk INT,
+ | sr_hdemo_sk INT,
+ | sr_addr_sk INT,
+ | sr_store_sk INT,
+ | sr_reason_sk INT,
+ | sr_ticket_number INT,
+ | sr_return_quantity INT,
+ | sr_return_amt DECIMAL(7, 2),
+ | sr_return_tax DECIMAL(7, 2),
+ | sr_return_amt_inc_tax DECIMAL(7, 2),
+ | sr_fee DECIMAL(7, 2),
+ | sr_return_ship_cost DECIMAL(7, 2),
+ | sr_refunded_cash DECIMAL(7, 2),
+ | sr_reversed_charge DECIMAL(7, 2),
+ | sr_store_credit DECIMAL(7, 2),
+ | sr_net_loss DECIMAL(7, 2),
+ | sr_returned_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/stats/store_returns'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin,
+ "catalog_sales" ->
+ s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS catalog_sales
+ |(
+ | cs_sold_time_sk INT,
+ | cs_ship_date_sk INT,
+ | cs_bill_customer_sk INT,
+ | cs_bill_cdemo_sk INT,
+ | cs_bill_hdemo_sk INT,
+ | cs_bill_addr_sk INT,
+ | cs_ship_customer_sk INT,
+ | cs_ship_cdemo_sk INT,
+ | cs_ship_hdemo_sk INT,
+ | cs_ship_addr_sk INT,
+ | cs_call_center_sk INT,
+ | cs_catalog_page_sk INT,
+ | cs_ship_mode_sk INT,
+ | cs_warehouse_sk INT,
+ | cs_item_sk INT,
+ | cs_promo_sk INT,
+ | cs_order_number INT,
+ | cs_quantity INT,
+ | cs_wholesale_cost DECIMAL(7, 2),
+ | cs_list_price DECIMAL(7, 2),
+ | cs_sales_price DECIMAL(7, 2),
+ | cs_ext_discount_amt DECIMAL(7, 2),
+ | cs_ext_sales_price DECIMAL(7, 2),
+ | cs_ext_wholesale_cost DECIMAL(7, 2),
+ | cs_ext_list_price DECIMAL(7, 2),
+ | cs_ext_tax DECIMAL(7, 2),
+ | cs_coupon_amt DECIMAL(7, 2),
+ | cs_ext_ship_cost DECIMAL(7, 2),
+ | cs_net_paid DECIMAL(7, 2),
+ | cs_net_paid_inc_tax DECIMAL(7, 2),
+ | cs_net_paid_inc_ship DECIMAL(7, 2),
+ | cs_net_paid_inc_ship_tax DECIMAL(7, 2),
+ | cs_net_profit DECIMAL(7, 2),
+ | cs_sold_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/stats/catalog_sales'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin,
+ "catalog_returns" ->
+ s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS catalog_returns
+ |(
+ | cr_returned_time_sk INT,
+ | cr_item_sk INT,
+ | cr_refunded_customer_sk INT,
+ | cr_refunded_cdemo_sk INT,
+ | cr_refunded_hdemo_sk INT,
+ | cr_refunded_addr_sk INT,
+ | cr_returning_customer_sk INT,
+ | cr_returning_cdemo_sk INT,
+ | cr_returning_hdemo_sk INT,
+ | cr_returning_addr_sk INT,
+ | cr_call_center_sk INT,
+ | cr_catalog_page_sk INT,
+ | cr_ship_mode_sk INT,
+ | cr_warehouse_sk INT,
+ | cr_reason_sk INT,
+ | cr_order_number INT,
+ | cr_return_quantity INT,
+ | cr_return_amount DECIMAL(7, 2),
+ | cr_return_tax DECIMAL(7, 2),
+ | cr_return_amt_inc_tax DECIMAL(7, 2),
+ | cr_fee DECIMAL(7, 2),
+ | cr_return_ship_cost DECIMAL(7, 2),
+ | cr_refunded_cash DECIMAL(7, 2),
+ | cr_reversed_charge DECIMAL(7, 2),
+ | cr_store_credit DECIMAL(7, 2),
+ | cr_net_loss DECIMAL(7, 2),
+ | cr_returned_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/stats/catalog_returns'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin,
+ "web_sales" ->
+ s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS web_sales
+ |(
+ | ws_sold_time_sk INT,
+ | ws_ship_date_sk INT,
+ | ws_item_sk INT,
+ | ws_bill_customer_sk INT,
+ | ws_bill_cdemo_sk INT,
+ | ws_bill_hdemo_sk INT,
+ | ws_bill_addr_sk INT,
+ | ws_ship_customer_sk INT,
+ | ws_ship_cdemo_sk INT,
+ | ws_ship_hdemo_sk INT,
+ | ws_ship_addr_sk INT,
+ | ws_web_page_sk INT,
+ | ws_web_site_sk INT,
+ | ws_ship_mode_sk INT,
+ | ws_warehouse_sk INT,
+ | ws_promo_sk INT,
+ | ws_order_number INT,
+ | ws_quantity INT,
+ | ws_wholesale_cost DECIMAL(7, 2),
+ | ws_list_price DECIMAL(7, 2),
+ | ws_sales_price DECIMAL(7, 2),
+ | ws_ext_discount_amt DECIMAL(7, 2),
+ | ws_ext_sales_price DECIMAL(7, 2),
+ | ws_ext_wholesale_cost DECIMAL(7, 2),
+ | ws_ext_list_price DECIMAL(7, 2),
+ | ws_ext_tax DECIMAL(7, 2),
+ | ws_coupon_amt DECIMAL(7, 2),
+ | ws_ext_ship_cost DECIMAL(7, 2),
+ | ws_net_paid DECIMAL(7, 2),
+ | ws_net_paid_inc_tax DECIMAL(7, 2),
+ | ws_net_paid_inc_ship DECIMAL(7, 2),
+ | ws_net_paid_inc_ship_tax DECIMAL(7, 2),
+ | ws_net_profit DECIMAL(7, 2),
+ | ws_sold_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/stats/web_sales'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin,
+ "web_returns" ->
+ s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS web_returns
+ |(
+ | wr_returned_time_sk INT,
+ | wr_item_sk INT,
+ | wr_refunded_customer_sk INT,
+ | wr_refunded_cdemo_sk INT,
+ | wr_refunded_hdemo_sk INT,
+ | wr_refunded_addr_sk INT,
+ | wr_returning_customer_sk INT,
+ | wr_returning_cdemo_sk INT,
+ | wr_returning_hdemo_sk INT,
+ | wr_returning_addr_sk INT,
+ | wr_web_page_sk INT,
+ | wr_reason_sk INT,
+ | wr_order_number INT,
+ | wr_return_quantity INT,
+ | wr_return_amt DECIMAL(7, 2),
+ | wr_return_tax DECIMAL(7, 2),
+ | wr_return_amt_inc_tax DECIMAL(7, 2),
+ | wr_fee DECIMAL(7, 2),
+ | wr_return_ship_cost DECIMAL(7, 2),
+ | wr_refunded_cash DECIMAL(7, 2),
+ | wr_reversed_charge DECIMAL(7, 2),
+ | wr_account_credit DECIMAL(7, 2),
+ | wr_net_loss DECIMAL(7, 2),
+ | wr_returned_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/stats/web_returns'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin,
+ "inventory" ->
+ s"""
+ |CREATE EXTERNAL TABLE IF NOT EXISTS inventory
+ |(
+ | inv_item_sk INT,
+ | inv_warehouse_sk INT,
+ | inv_quantity_on_hand INT,
+ | inv_date_sk INT
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/stats/inventory'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin
+ )
+
+ }
+
test("test mergetree virtual columns") {
spark.sql("create database if not exists mergetree")
spark.sql("use mergetree")
spark.sql("drop table if exists store_sales")
- spark.sql(s"""
- |CREATE EXTERNAL TABLE IF NOT EXISTS mergetree.store_sales
- |(
- | ss_sold_time_sk INT,
- | ss_item_sk INT,
- | ss_customer_sk INT,
- | ss_cdemo_sk INT,
- | ss_hdemo_sk INT,
- | ss_addr_sk INT,
- | ss_store_sk INT,
- | ss_promo_sk INT,
- | ss_ticket_number bigint,
- | ss_quantity INT,
- | ss_wholesale_cost DECIMAL(7,2),
- | ss_list_price DECIMAL(7,2),
- | ss_sales_price DECIMAL(7,2),
- | ss_ext_discount_amt DECIMAL(7,2),
- | ss_ext_sales_price DECIMAL(7,2),
- | ss_ext_wholesale_cost DECIMAL(7,2),
- | ss_ext_list_price DECIMAL(7,2),
- | ss_ext_tax DECIMAL(7,2),
- | ss_coupon_amt DECIMAL(7,2),
- | ss_net_paid DECIMAL(7,2),
- | ss_net_paid_inc_tax DECIMAL(7,2),
- | ss_net_profit DECIMAL(7,2),
- | ss_sold_date_sk INT
- |)
- |USING clickhouse
- |LOCATION '$HDFS_URL/test/store_sales'
- |TBLPROPERTIES (storage_policy='__hdfs_main')
- |""".stripMargin)
-
+ spark.sql(tpcdsMergetreeTables("store_sales"))
// scalastyle:off line.size.limit
spark.sql(
"insert into mergetree.store_sales select /*+ REPARTITION(3) */ * from
tpcdsdb.store_sales")
@@ -122,6 +328,22 @@ class GlutenClickHouseMergetreeWriteStatsSuite
// scalastyle:on line.size.limit
}
+ test("verify mergetree delta stats") {
+ if (isSparkVersionGE("3.5")) {
+ spark.sql("create database if not exists mergetree")
+ spark.sql("use mergetree")
+ val tables = Seq(
+ "store_sales",
+ "store_returns",
+ "web_sales",
+ "web_returns",
+ "catalog_sales",
+ "catalog_returns",
+ "inventory")
+ tables.foreach(writeAndCompareDeltaStats(_))
+ }
+ }
+
def getDeltaSnapshot(df: DataFrame): Snapshot = {
val scanExec = collect(df.queryExecution.sparkPlan) {
case nf: FileSourceScanExecTransformer => nf
@@ -137,4 +359,20 @@ class GlutenClickHouseMergetreeWriteStatsSuite
snapshot
}
+ def writeAndCompareDeltaStats(table: String, partNum: Int = 3): Unit = {
+ spark.sql(s"drop table if exists $table")
+ spark.sql(tpcdsMergetreeTables(table))
+ spark.sql(s"insert into $table select /*+ REPARTITION($partNum) */ * from
tpcdsdb.$table")
+ val tableDf = spark.table(table)
+ val snapshot = getDeltaSnapshot(tableDf)
+ val tableStats = tableDf
+ .groupBy(input_file_name().as("path"))
+ .agg(snapshot.statsCollector)
+ .orderBy("path")
+ val deltaLogStats = snapshot.withStats
+ .selectExpr(s"concat('${snapshot.path.getParent}/', path)", "stats")
+ .orderBy("path")
+ checkAnswer(tableStats, deltaLogStats)
+ }
+
}
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp
b/cpp-ch/local-engine/Common/GlutenConfig.cpp
index bcb6c96de9..383e8d34f8 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.cpp
+++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp
@@ -156,6 +156,9 @@ SparkSQLConfig SparkSQLConfig::loadFromContext(const
DB::ContextPtr & context)
{
SparkSQLConfig sql_config;
sql_config.caseSensitive =
context->getConfigRef().getBool("spark.sql.caseSensitive", false);
+ // TODO support transfer spark settings from spark session to native engine
+ sql_config.deltaDataSkippingNumIndexedCols =
context->getConfigRef().getUInt64("spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols",
32);
+ sql_config.deltaDataSkippingStatsColumns =
context->getConfigRef().getString("spark.databricks.delta.properties.defaults.dataSkippingStatsColumns",
"");
return sql_config;
}
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 74ccc441fc..62e0d228e0 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -182,6 +182,8 @@ inline constexpr auto DEFAULT_TEMP_FILE_PATH = "/tmp/libch";
struct SparkSQLConfig
{
bool caseSensitive = false; // spark.sql.caseSensitive
+ size_t deltaDataSkippingNumIndexedCols = 32;
+ String deltaDataSkippingStatsColumns;
static SparkSQLConfig loadFromContext(const DB::ContextPtr & context);
};
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 506beb2c41..9d740eb4ee 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -68,6 +68,8 @@ OutputFormatFilePtr createOutputFormatFile(
struct DeltaStats
{
size_t row_count;
+ // TODO Support delta.dataSkippingStatsColumns, detail see
https://docs.databricks.com/aws/en/delta/data-skipping
+ size_t n_stats_cols;
std::vector<DB::Field> min;
std::vector<DB::Field> max;
std::vector<Int64> null_count;
@@ -91,23 +93,55 @@ struct DeltaStats
assert(partition_index.size() == partition.size());
+ size_t num_stats_cols = numStatsCols(output.columns() -
partition.size());
auto appendBase = [&](const std::string & prefix)
{
- for (const auto & column : output.getColumnsWithTypeAndName())
+ for (size_t i = 0, n = 0; i < output.columns() && n <
num_stats_cols; i++)
+ {
+ const auto & column = output.getByPosition(i);
if (!partition_index.contains(column.name))
+ {
statsHeaderBase.emplace_back(wrapNullableType(column.type), prefix +
column.name);
+ ++n;
+ }
+ }
};
appendBase("min_");
appendBase("max_");
- for (const auto & column : output.getColumnsWithTypeAndName())
+ for (size_t i = 0, n = 0; i < output.columns() && n < num_stats_cols;
i++)
+ {
+ const auto & column = output.getByPosition(i);
if (!partition_index.contains(column.name))
+ {
statsHeaderBase.emplace_back(BIGINT(), "null_count_" +
column.name);
+ ++n;
+ }
+ }
return DB::Block{statsHeaderBase};
}
+ static size_t numStatsCols(size_t origin)
+ {
+ if (DB::CurrentThread::isInitialized())
+ {
+ const DB::ContextPtr query_context =
DB::CurrentThread::get().getQueryContext();
+ if (query_context)
+ {
+ SparkSQLConfig config =
SparkSQLConfig::loadFromContext(query_context);
+ return std::min(config.deltaDataSkippingNumIndexedCols,
origin);
+ }
+ }
+ return origin;
+ }
+
explicit DeltaStats(size_t size, const std::set<size_t> & partition_index_
= {})
- : row_count(0), min(size), max(size), null_count(size, 0),
partition_index(partition_index_)
+ : row_count(0)
+ , n_stats_cols(numStatsCols(size))
+ , min(n_stats_cols)
+ , max(n_stats_cols)
+ , null_count(n_stats_cols, 0)
+ , partition_index(partition_index_)
{
assert(size > 0);
}
@@ -118,8 +152,8 @@ struct DeltaStats
{
assert(chunk.getNumRows() > 0);
const auto & columns = chunk.getColumns();
- assert(columns.size() == min.size() + partition_index.size());
- for (size_t i = 0, col = 0; col < columns.size(); ++col)
+ assert(columns.size() - partition_index.size() >= n_stats_cols);
+ for (size_t i = 0, col = 0; i < n_stats_cols && col < columns.size();
++col)
{
if (partition_index.contains(col))
continue;
@@ -157,10 +191,10 @@ struct DeltaStats
void merge(const DeltaStats & right)
{
- assert(min.size() == right.min.size());
+ assert(n_stats_cols == right.n_stats_cols);
assert(partition_index == right.partition_index);
- for (size_t i = 0; i < min.size(); ++i)
+ for (size_t i = 0; i < n_stats_cols; ++i)
{
null_count[i] += right.null_count[i];
min[i] = std::min(min[i], right.min[i]);
@@ -244,9 +278,8 @@ public:
void collectStats(const String & filename, const String & partition_dir,
const DeltaStats & stats) const
{
const std::string & partition = partition_dir.empty() ?
WriteStatsBase::NO_PARTITION_ID : partition_dir;
- size_t columnSize = stats.min.size();
+ size_t columnSize = stats.n_stats_cols;
assert(columns_.size() == stats_column_start + columnSize * 3);
- assert(stats.min.size() == stats.max.size() && stats.min.size() ==
stats.null_count.size());
columns_[ColumnIndex::filename]->insertData(filename.c_str(),
filename.size());
columns_[partition_id]->insertData(partition.c_str(),
partition.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]