This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2f312ead50 [GLUTEN-8846][CH] [Part 3] Add benchmark for Icerberg
Delete (#9192)
2f312ead50 is described below
commit 2f312ead50dedc1f9fd962a9fa88953de4a2d991
Author: Chang chen <[email protected]>
AuthorDate: Tue Apr 1 17:58:58 2025 +0800
[GLUTEN-8846][CH] [Part 3] Add benchmark for Icerberg Delete (#9192)
* a simple alias for NamesAndTypesList
* if SparkEnv.get returns null which means something wrong at beforeAll()
* If the 'lineitem' directory doesn't exist in HDFS, upload the 'lineitem'
data from the local system.
* Add BM_IcebergReadWithEqualityDeletes and
BM_IcebergReadWithPositionDeletes
---
.../GlutenClickHouseTPCHAbstractSuite.scala | 31 +--
.../execution/tpch/GlutenClickHouseHDFSSuite.scala | 19 +-
cpp-ch/local-engine/Common/BlockTypeUtils.cpp | 6 +-
cpp-ch/local-engine/Common/BlockTypeUtils.h | 11 +-
cpp-ch/local-engine/Common/PlanUtil.cpp | 2 +-
cpp-ch/local-engine/Parser/FunctionExecutor.cpp | 2 +-
.../Parser/RelParsers/ReadRelParser.cpp | 4 +-
.../Parser/RelParsers/WriteRelParser.cpp | 2 +-
.../Storages/SubstraitSource/CMakeLists.txt | 2 +-
.../Storages/SubstraitSource/FileReader.cpp | 12 +-
.../EqualityDeleteFileReader.cpp | 2 +-
.../EqualityDeleteFileReader.h | 0
.../{iceberg => Iceberg}/IcebergMetadataColumn.h | 0
.../{iceberg => Iceberg}/IcebergReader.cpp | 15 +-
.../{iceberg => Iceberg}/IcebergReader.h | 0
.../PositionalDeleteFileReader.cpp | 4 +-
.../PositionalDeleteFileReader.h | 0
.../{iceberg => Iceberg}/SimpleParquetReader.cpp | 0
.../{iceberg => Iceberg}/SimpleParquetReader.h | 0
.../Storages/SubstraitSource/ParquetFormatFile.cpp | 5 +-
.../local-engine/tests/benchmark_parquet_read.cpp | 260 ++++++++++++++++++++-
cpp-ch/local-engine/tests/gtest_iceberge_test.cpp | 17 +-
.../tests/gtest_parquet_columnindex.cpp | 24 +-
.../local-engine/tests/utils/gluten_test_util.cpp | 4 +-
cpp-ch/local-engine/tests/utils/gluten_test_util.h | 23 +-
25 files changed, 369 insertions(+), 76 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
index 1529a867d3..a299795ad7 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog}
@@ -38,6 +38,8 @@ abstract class GlutenClickHouseTPCHAbstractSuite
protected val parquetTableDataPath: String =
"../../../../gluten-core/src/test/resources/tpch-data"
+ final protected lazy val absoluteParquetPath = rootPath +
parquetTableDataPath
+
protected val tablesPath: String
protected val tpchQueries: String
protected val queriesResults: String
@@ -47,7 +49,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
super.beforeAll()
if (needCopyParquetToTablePath) {
- val sourcePath = new File(rootPath + parquetTableDataPath)
+ val sourcePath = new File(absoluteParquetPath)
FileUtils.copyDirectory(sourcePath, new File(tablesPath))
}
@@ -68,7 +70,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
spark.sql(s"use $parquetSourceDB")
val parquetTablePath = basePath + "/tpch-data"
- FileUtils.copyDirectory(new File(rootPath + parquetTableDataPath), new
File(parquetTablePath))
+ FileUtils.copyDirectory(new File(absoluteParquetPath), new
File(parquetTablePath))
createNotNullTPCHTablesInParquet(parquetTablePath)
@@ -236,7 +238,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
spark.sql(s"use $parquetSourceDB")
val parquetTablePath = basePath + "/tpch-data"
- FileUtils.copyDirectory(new File(rootPath + parquetTableDataPath), new
File(parquetTablePath))
+ FileUtils.copyDirectory(new File(absoluteParquetPath), new
File(parquetTablePath))
createNotNullTPCHTablesInParquet(parquetTablePath)
@@ -575,16 +577,19 @@ abstract class GlutenClickHouseTPCHAbstractSuite
}
override protected def afterAll(): Unit = {
- // guava cache invalidate event trigger remove operation may in seconds
delay, so wait a bit
- // normally this doesn't take more than 1s
- eventually(timeout(60.seconds), interval(1.seconds)) {
- // Spark listener message was not sent in time with ci env.
- // In tpch case, there are more then 10 hbj data has build.
- // Let's just verify it was cleaned ever.
- assert(CHBroadcastBuildSideCache.size() <= 10)
- }
- ClickhouseSnapshot.clearAllFileStatusCache()
+ // if SparkEnv.get returns null which means something wrong at beforeAll()
+ if (SparkEnv.get != null) {
+ // guava cache invalidate event trigger remove operation may in seconds
delay, so wait a bit
+ // normally this doesn't take more than 1s
+ eventually(timeout(60.seconds), interval(1.seconds)) {
+ // Spark listener message was not sent in time with ci env.
+ // In tpch case, there are more than 10 hbj data has built.
+ // Let's just verify it was cleaned ever.
+ assert(CHBroadcastBuildSideCache.size() <= 10)
+ }
+ ClickhouseSnapshot.clearAllFileStatusCache()
+ }
DeltaLog.clearCache()
super.afterAll()
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
index b16ec5fe57..31dbe38f10 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileUtil, Path}
import java.nio.charset.Charset
@@ -70,6 +70,23 @@ class GlutenClickHouseHDFSSuite
override def beforeAll(): Unit = {
super.beforeAll()
+ val targetFile = new Path(s"$tablesPath/lineitem")
+ val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+ val existed = fs.exists(targetFile)
+ // If the 'lineitem' directory doesn't exist in HDFS,
+ // upload the 'lineitem' data from the local system.
+ if (!existed) {
+ val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
+ val localFs =
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
+ FileUtil.copy(
+ localFs,
+ localDataDir,
+ fs,
+ targetFile,
+ false,
+ true,
+ spark.sessionState.newHadoopConf())
+ }
}
override protected def beforeEach(): Unit = {
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
index b065f2a11b..e669e84a1b 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
@@ -22,7 +22,7 @@
namespace local_engine
{
-DB::Block toSampleBlock(const DB::NamesAndTypesList & type)
+DB::Block toSampleBlock(const RowType & type)
{
DB::ColumnsWithTypeAndName result;
result.reserve(type.size());
@@ -31,9 +31,9 @@ DB::Block toSampleBlock(const DB::NamesAndTypesList & type)
return result;
}
-DB::NamesAndTypesList blockToNameAndTypeList(const DB::Block & header)
+RowType blockToRowType(const DB::Block & header)
{
- DB::NamesAndTypesList types;
+ RowType types;
for (const auto & name : header.getNames())
{
const auto * column = header.findByName(name);
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.h
b/cpp-ch/local-engine/Common/BlockTypeUtils.h
index ea7bb9f7e6..d0829a3fb0 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.h
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.h
@@ -28,6 +28,11 @@
namespace local_engine
{
+/**
+ * A simple alias for NamesAndTypesList
+ */
+using RowType = DB::NamesAndTypesList;
+
inline DB::DataTypePtr BIGINT()
{
return std::make_shared<DB::DataTypeInt64>();
@@ -87,8 +92,8 @@ inline DB::ColumnWithTypeAndName toColumnType(const
DB::NameAndTypePair & type)
return DB::ColumnWithTypeAndName(type.type, type.name);
}
-DB::Block toSampleBlock(const DB::NamesAndTypesList & type);
-DB::NamesAndTypesList blockToNameAndTypeList(const DB::Block & header);
+DB::Block toSampleBlock(const RowType & type);
+RowType blockToRowType(const DB::Block & header);
DB::DataTypePtr wrapNullableType(bool nullable, DB::DataTypePtr nested_type);
inline DB::DataTypePtr wrapNullableType(DB::DataTypePtr nested_type)
@@ -128,7 +133,7 @@ inline DB::NamesWithAliases buildNamesWithAliases(const
DB::Block & input, const
return aliases;
}
-// begion of CppToDataType
+// begin of CppToDataType
template <typename T>
struct CppToDataType;
diff --git a/cpp-ch/local-engine/Common/PlanUtil.cpp
b/cpp-ch/local-engine/Common/PlanUtil.cpp
index ebc3b6da3d..1efe063383 100644
--- a/cpp-ch/local-engine/Common/PlanUtil.cpp
+++ b/cpp-ch/local-engine/Common/PlanUtil.cpp
@@ -112,7 +112,7 @@ DB::IQueryPlanStep * addRemoveNullableStep(DB::QueryPlan &
plan, const DB::Conte
DB::IQueryPlanStep * renamePlanHeader(DB::QueryPlan & plan, const
BuildNamesWithAliases & buildAliases, const String & step_desc)
{
- DB::ActionsDAG
actions_dag{blockToNameAndTypeList(plan.getCurrentHeader())};
+ DB::ActionsDAG actions_dag{blockToRowType(plan.getCurrentHeader())};
DB::NamesWithAliases aliases;
buildAliases(plan.getCurrentHeader(), aliases);
actions_dag.project(aliases);
diff --git a/cpp-ch/local-engine/Parser/FunctionExecutor.cpp
b/cpp-ch/local-engine/Parser/FunctionExecutor.cpp
index 93f30d55d5..a6b795350f 100644
--- a/cpp-ch/local-engine/Parser/FunctionExecutor.cpp
+++ b/cpp-ch/local-engine/Parser/FunctionExecutor.cpp
@@ -73,7 +73,7 @@ void FunctionExecutor::buildHeader()
void FunctionExecutor::parseExpression()
{
- DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)};
+ DB::ActionsDAG actions_dag{blockToRowType(header)};
/// Notice keep_result must be true, because result_node of current
function must be output node in actions_dag
const auto * node =
expression_parser->parseFunction(expression.scalar_function(), actions_dag,
true);
result_name = node->result_name;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
index 8ae7db7d75..6f6bd2d3a4 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
@@ -28,13 +28,13 @@
#include <Parser/TypeParser.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Storages/SourceFromJavaIter.h>
+#include <Storages/SourceFromRange.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
#include <google/protobuf/wrappers.pb.h>
#include <rapidjson/document.h>
#include <Common/BlockTypeUtils.h>
#include <Common/DebugUtils.h>
-#include <Storages/SourceFromRange.h>
namespace DB
{
@@ -198,7 +198,7 @@ QueryPlanStepPtr
ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
if (rel.has_filter())
{
- DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)};
+ DB::ActionsDAG actions_dag{blockToRowType(header)};
const DB::ActionsDAG::Node * filter_node =
parseExpression(actions_dag, rel.filter());
actions_dag.addOrReplaceInOutputs(*filter_node);
assert(filter_node ==
&(actions_dag.findInOutputs(filter_node->result_name)));
diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
index d5a8dba1f5..73640da238 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
@@ -70,7 +70,7 @@ DB::ProcessorPtr make_sink(
DB::ExpressionActionsPtr create_rename_action(const DB::Block & input, const
DB::Block & output)
{
- ActionsDAG actions_dag{blockToNameAndTypeList(input)};
+ ActionsDAG actions_dag{blockToRowType(input)};
actions_dag.project(buildNamesWithAliases(input, output));
return std::make_shared<DB::ExpressionActions>(std::move(actions_dag));
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
b/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
index 4248723417..639c1f043f 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
@@ -18,7 +18,7 @@ set(ARROW_INCLUDE_DIR
"${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src")
add_headers_and_sources(substrait_source .)
add_headers_and_sources(substrait_source Delta)
add_headers_and_sources(substrait_source Delta/Bitmap)
-add_headers_and_sources(substrait_source iceberg)
+add_headers_and_sources(substrait_source Iceberg)
add_library(substrait_source ${substrait_source_sources})
target_compile_options(
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index b9e425a210..fedd043500 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -21,15 +21,15 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/ReadBufferFromString.h>
+#include <Parser/SubstraitParserUtils.h>
+#include <Storages/SubstraitSource/Delta/DeltaParquetMeta.h>
+#include <Storages/SubstraitSource/Delta/DeltaReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergReader.h>
#include <Storages/SubstraitSource/ParquetFormatFile.h>
-#include <Storages/SubstraitSource/iceberg/IcebergReader.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <Common/CHUtil.h>
#include <Common/Exception.h>
#include <Common/GlutenStringUtils.h>
-#include <Parser/SubstraitParserUtils.h>
-#include <Storages/SubstraitSource/Delta/DeltaParquetMeta.h>
-#include <Storages/SubstraitSource/Delta/DeltaReader.h>
namespace DB
{
@@ -243,6 +243,7 @@ NormalFileReader::NormalFileReader(
const FormatFile::InputFormatPtr & input_format_)
: BaseReader(file_, to_read_header_, output_header_),
input_format(input_format_)
{
+ assert(input_format);
}
bool NormalFileReader::pull(DB::Chunk & chunk)
@@ -308,7 +309,8 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
row_index_filter_type = toString(column.value());
}
}
- return delta::DeltaReader::create(file, to_read_header_,
output_header_, input_format, row_index_ids_encoded, row_index_filter_type);
+ return delta::DeltaReader::create(
+ file, to_read_header_, output_header_, input_format,
row_index_ids_encoded, row_index_filter_type);
}
return std::make_unique<NormalFileReader>(file, to_read_header_,
output_header_, input_format);
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.cpp
similarity index 99%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp
rename to
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.cpp
index 8f3ddb9910..8455d60bc4 100644
---
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp
+++
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.cpp
@@ -23,7 +23,7 @@
#include <Functions/FunctionFactory.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
-#include <Storages/SubstraitSource/iceberg/SimpleParquetReader.h>
+#include <Storages/SubstraitSource/Iceberg/SimpleParquetReader.h>
#include <Common/BlockTypeUtils.h>
using namespace DB;
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h
similarity index 100%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h
rename to
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h
similarity index 100%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h
rename to
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.cpp
similarity index 96%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp
rename to cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.cpp
index 09bc4e0b7b..3de8520703 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.cpp
@@ -20,8 +20,8 @@
#include <DataTypes/DataTypeNullable.h>
#include <Storages/Parquet/ParquetMeta.h>
#include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
-#include <Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h>
-#include <Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.h>
+#include <Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h>
+#include <Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.h>
#include <Common/BlockTypeUtils.h>
using namespace DB;
@@ -60,14 +60,11 @@ std::unique_ptr<IcebergReader> IcebergReader::create(
? nullptr
: EqualityDeleteFileReader::createDeleteExpr(context,
file_->getFileSchema(), delete_files, it_equal->second, new_header);
+ auto input = input_format_callback(new_header);
+ if (!input)
+ return nullptr;
return std::make_unique<IcebergReader>(
- file_,
- new_header,
- output_header_,
- input_format_callback(new_header),
- delete_expr,
- std::move(delete_bitmap_array),
- to_read_header_.columns());
+ file_, new_header, output_header_, input, delete_expr,
std::move(delete_bitmap_array), to_read_header_.columns());
}
IcebergReader::IcebergReader(
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
similarity index 100%
rename from cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.h
rename to cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.cpp
similarity index 96%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.cpp
rename to
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.cpp
index 235a907603..3db44699dc 100644
---
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.cpp
+++
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.cpp
@@ -20,8 +20,8 @@
#include <Functions/FunctionFactory.h>
#include <Storages/Parquet/ParquetMeta.h>
#include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
-#include <Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h>
-#include <Storages/SubstraitSource/iceberg/SimpleParquetReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h>
+#include <Storages/SubstraitSource/Iceberg/SimpleParquetReader.h>
#include <Common/BlockTypeUtils.h>
using namespace DB;
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.h
similarity index 100%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.h
rename to
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.h
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.cpp
similarity index 100%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.cpp
rename to
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.cpp
diff --git
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.h
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.h
similarity index 100%
rename from
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.h
rename to
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.h
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 71abd0eb84..afa349743a 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -230,9 +230,8 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
auto in = read_buffer_builder->build(file_info);
auto result = collectRequiredRowGroups(*in, file_info);
- size_t rows = 0;
- for (const auto & rowgroup : result.readRowGroups)
- rows += rowgroup.num_rows;
+ size_t rows = std::ranges::fold_left(
+ result.readRowGroups, static_cast<size_t>(0), [](size_t sum, const
auto & row_group) { return sum + row_group.num_rows; });
{
std::lock_guard lock(mutex);
diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
index 0ffa2fb7e4..a1b72dba51 100644
--- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
@@ -23,14 +23,21 @@
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Storages/Output/NormalFileWriter.h>
#include <Storages/Parquet/ParquetMeta.h>
#include <Storages/Parquet/VectorizedParquetRecordReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
+#include <Storages/SubstraitSource/substrait_fwd.h>
#include <benchmark/benchmark.h>
#include <parquet/arrow/reader.h>
#include <substrait/plan.pb.h>
+#include <tests/utils/TempFilePath.h>
#include <tests/utils/gluten_test_util.h>
+#include <Poco/Path.h>
+#include <Poco/URI.h>
#include <Poco/Util/MapConfiguration.h>
+#include <Common/BlockTypeUtils.h>
#include <Common/DebugUtils.h>
#include <Common/QueryContext.h>
@@ -219,7 +226,7 @@ void
BM_ColumnIndexRead_Filter_ReturnAllResult(benchmark::State & state)
"benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
const std::string filter1 = "l_shipdate is not null AND l_shipdate <=
toDate32('1998-09-01')";
const substrait::ReadRel::LocalFiles files = createLocalFiles(filename,
true);
- const AnotherRowType schema =
local_engine::test::readParquetSchema(filename);
+ const local_engine::RowType schema =
local_engine::test::readParquetSchema(filename);
auto pushDown = local_engine::test::parseFilter(filter1, schema);
const Block header = {local_engine::toSampleBlock(schema)};
@@ -236,7 +243,7 @@ void
BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
"benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
const std::string filter1 = "l_orderkey is not null AND l_orderkey >
300977829";
const substrait::ReadRel::LocalFiles files = createLocalFiles(filename,
true);
- const AnotherRowType schema =
local_engine::test::readParquetSchema(filename);
+ const local_engine::RowType schema =
local_engine::test::readParquetSchema(filename);
auto pushDown = local_engine::test::parseFilter(filter1, schema);
const Block header = {local_engine::toSampleBlock(schema)};
@@ -245,6 +252,253 @@ void
BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
local_engine::QueryContext::globalMutableContext()->setConfig(Poco::AutoPtr(new
Poco::Util::MapConfiguration()));
}
+//// Iceberg perf test
+///
+
+size_t readFileWithDeletesAndGetRowCount(
+ const std::string & file, const DB::Block & header, const
local_engine::SubstraitIcebergDeleteFile * delete_file)
+{
+ using namespace DB;
+ using namespace local_engine;
+ using namespace local_engine::test;
+
+ substrait::ReadRel::LocalFiles files;
+ substrait::ReadRel::LocalFiles::FileOrFiles * file_item =
files.add_items();
+ file_item->set_uri_file("file://" + file);
+ file_item->set_start(0);
+ file_item->set_length(std::filesystem::file_size(file));
+
+ substrait::ReadRel::LocalFiles::FileOrFiles::IcebergReadOptions
iceberg_options;
+ substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions
parquet_format;
+ iceberg_options.mutable_parquet()->CopyFrom(parquet_format);
+
+ if (delete_file)
+ iceberg_options.add_delete_files()->CopyFrom(*delete_file);
+
+ file_item->mutable_iceberg()->CopyFrom(iceberg_options);
+
+
+ auto builder = std::make_unique<QueryPipelineBuilder>();
+
builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(),
header, files)));
+ auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
+ auto reader = PullingPipelineExecutor(pipeline);
+
+
+ size_t total_read_rows = 0;
+ DB::Block res;
+ while (reader.pull(res))
+ total_read_rows += res.rows();
+ return total_read_rows;
+}
+
+std::pair<size_t, int64_t> calculateRowsAndDeleteCount(benchmark::State &
state, const std::string & file_path, const DB::Block & header)
+{
+ using namespace DB;
+ using namespace local_engine;
+
+ FormatSettings format_settings;
+
+ ParquetMetaBuilder metaBuilder{
+ .case_insensitive =
format_settings.parquet.case_insensitive_column_matching,
+ .allow_missing_columns =
format_settings.parquet.allow_missing_columns};
+ ReadBufferFromFilePRead fileReader(file_path);
+ metaBuilder.build(fileReader, header);
+
+ size_t total_rows = std::ranges::fold_left(
+ metaBuilder.readRowGroups, static_cast<size_t>(0), [](size_t sum,
const auto & row_group) { return sum + row_group.num_rows; });
+
+ // Calculate delete count based on percentage
+ int64_t delete_percentage = state.range(0);
+ int64_t delete_count = total_rows * delete_percentage / 100;
+
+ return {total_rows, delete_count};
+}
+
+// Helper function to write a Parquet file with position deletes
+std::shared_ptr<local_engine::test::TempFilePath>
+writePositionDeleteFile(const std::string & base_file_path, const
std::vector<int64_t> & delete_positions)
+{
+ using namespace local_engine;
+ using namespace local_engine::test;
+ using namespace local_engine::iceberg;
+
+ auto delete_file_path = TempFilePath::tmp("parquet");
+
+ // Create the file path column with base file path repeated
+ std::string uri_file_path = "file://" + base_file_path;
+ auto file_path_vector = createColumn<std::string>(delete_positions.size(),
[&](size_t /*row*/) { return uri_file_path; });
+
+ // Create the position column with delete positions
+ auto deletePosVector = createColumn<int64_t>(delete_positions);
+
+ // Create the block with both columns
+ DB::Block delete_block{
+ {file_path_vector,
+ IcebergMetadataColumn::icebergDeleteFilePathColumn()->type,
+ IcebergMetadataColumn::icebergDeleteFilePathColumn()->name},
+ {deletePosVector,
IcebergMetadataColumn::icebergDeletePosColumn()->type,
IcebergMetadataColumn::icebergDeletePosColumn()->name}};
+
+ // Write the block to the delete file
+ const DB::ContextPtr context = QueryContext::globalContext();
+ const Poco::Path file{delete_file_path->string()};
+ const Poco::URI fileUri{file};
+
+ auto writer = NormalFileWriter::create(context, fileUri.toString(),
delete_block, "parquet");
+ writer->write(delete_block);
+ writer->close();
+
+ return delete_file_path;
+}
+std::pair<local_engine::SubstraitIcebergDeleteFile,
std::shared_ptr<local_engine::test::TempFilePath>>
+createPositionDeleteFile(int64_t delete_count, size_t total_rows, const
std::string & base_file_path)
+{
+ if (delete_count == 0)
+ return {{}, nullptr};
+
+ assert(delete_count > 0);
+ using namespace local_engine;
+ using namespace local_engine::test;
+
+ std::vector<int64_t> delete_positions;
+ delete_positions.reserve(delete_count);
+
+ std::vector<int64_t> all_positions(total_rows);
+ std::iota(all_positions.begin(), all_positions.end(), 0);
+
+ std::mt19937 g(std::random_device{}());
+ std::ranges::shuffle(all_positions, g);
+
+ delete_positions.assign(all_positions.begin(), all_positions.begin() +
delete_count);
+ std::ranges::sort(delete_positions);
+
+ std::shared_ptr<TempFilePath> delete_file_path =
writePositionDeleteFile(base_file_path, delete_positions);
+
+ SubstraitIcebergDeleteFile delete_file;
+ delete_file.set_filecontent(IcebergReadOptions::POSITION_DELETES);
+ delete_file.set_filepath("file://" + delete_file_path->string());
+ delete_file.set_recordcount(delete_count);
+
delete_file.set_filesize(std::filesystem::file_size(delete_file_path->string()));
+
+ substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions
parquet_format;
+ delete_file.mutable_parquet()->CopyFrom(parquet_format);
+
+ return {delete_file, delete_file_path};
+}
+
+// Helper function to write a Parquet file with equality deletes
+std::shared_ptr<local_engine::test::TempFilePath>
+writeEqualityDeleteFile(const std::vector<int64_t> & delete_values, const
std::string & column_name = "l_shipdate")
+{
+ using namespace local_engine;
+ using namespace local_engine::test;
+
+ auto delete_file_path = TempFilePath::tmp("parquet");
+
+ // Create the column with values to be deleted
+ auto delete_values_vector = createColumn<int64_t>(delete_values);
+
+ // Create the block with the delete values
+ DB::Block delete_block{{delete_values_vector,
std::make_shared<DB::DataTypeInt64>(), column_name}};
+
+ // Write the block to the delete file
+ const DB::ContextPtr context = QueryContext::globalContext();
+ const Poco::Path file{delete_file_path->string()};
+ const Poco::URI fileUri{file};
+
+ auto writer = NormalFileWriter::create(context, fileUri.toString(),
delete_block, "parquet");
+ writer->write(delete_block);
+ writer->close();
+
+ return delete_file_path;
+}
+
+std::pair<local_engine::SubstraitIcebergDeleteFile,
std::shared_ptr<local_engine::test::TempFilePath>>
+createEqualityDeleteFile(int64_t delete_count)
+{
+ if (delete_count == 0)
+ return {{}, nullptr};
+
+ assert(delete_count > 0);
+ using namespace local_engine;
+ using namespace local_engine::test;
+
+ std::vector<int64_t> delete_values;
+ delete_values.reserve(delete_count);
+
+ // For simplicity in the benchmark, we uniformly select deletion values
from the minimum and maximum values.
+ // Therefore, some deletion values do not exist, and we cannot determine
whether the benchmarking is correct
+ // based on the number of deletions.
+ //
+ // This is acceptable for identifying performance issues with
EqualityDeletes.
+
+ // +-------+-------------------------+---------------+---------------+
+ // |count()|countDistinct(l_orderkey)|min(l_orderkey)|max(l_orderkey)|
+ // +-------+-------------------------+---------------+---------------+
+ // |8333867| 8105793| 7| 599999972|
+ // +-------+-------------------------+---------------+---------------+
+ constexpr int64_t min_orderkey = 7;
+ constexpr int64_t max_orderkey = 599999972;
+ std::uniform_int_distribution<int64_t> distrib(min_orderkey, max_orderkey);
+
+ std::mt19937 gen(std::random_device{}());
+ for (int64_t i = 0; i < delete_count; ++i)
+ delete_values.push_back(distrib(gen));
+ std::ranges::sort(delete_values);
+
+
+ std::shared_ptr<TempFilePath> delete_file_path =
writeEqualityDeleteFile(delete_values, "l_orderkey");
+
+ SubstraitIcebergDeleteFile delete_file;
+ delete_file.set_filecontent(IcebergReadOptions::EQUALITY_DELETES);
+ delete_file.set_filepath("file://" + delete_file_path->string());
+ delete_file.set_recordcount(delete_count);
+
delete_file.set_filesize(std::filesystem::file_size(delete_file_path->string()));
+ delete_file.add_equalityfieldids(1); // l_orderkey
+
+ substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions
parquet_format;
+ delete_file.mutable_parquet()->CopyFrom(parquet_format);
+
+ return {delete_file, delete_file_path};
+}
+
+template <bool is_position_delete>
+void BM_IcebergReadWithDeletes(benchmark::State & state)
+{
+ using namespace DB;
+ using namespace local_engine;
+ using namespace local_engine::test;
+
+ std::string file
+ =
third_party_data("benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
+ Block header{ColumnWithTypeAndName(DataTypeDate32().createColumn(),
std::make_shared<DataTypeDate32>(), "l_shipdate")};
+ auto [total_rows, delete_count] = calculateRowsAndDeleteCount(state, file,
header);
+ Block res;
+
+ auto [delete_file, delete_file_path]
+ = is_position_delete ? createPositionDeleteFile(delete_count,
total_rows, file) : createEqualityDeleteFile(delete_count);
+
+ for (auto _ : state)
+ {
+ size_t total_read_rows = readFileWithDeletesAndGetRowCount(file,
header, delete_count ? &delete_file : nullptr);
+
+ if (is_position_delete && total_read_rows != total_rows - delete_count)
+ throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected {}, but
got {} ", total_rows - delete_count, total_read_rows);
+
+ // see `createEqualityDeleteFile` for `total_read_rows < total_rows -
delete_count`
+ if (!is_position_delete && total_read_rows < total_rows - delete_count)
+ throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Read at least {},
but got {} ", total_rows - delete_count, total_read_rows);
+ }
+}
+
+void BM_IcebergReadWithPositionDeletes(benchmark::State & state)
+{
+ return BM_IcebergReadWithDeletes<true>(state);
+}
+void BM_IcebergReadWithEqualityDeletes(benchmark::State & state)
+{
+ return BM_IcebergReadWithDeletes<false>(state);
+}
+
}
BENCHMARK(BM_ColumnIndexRead_Old)->Unit(benchmark::kMillisecond)->Iterations(20);
@@ -254,3 +508,5 @@
BENCHMARK(BM_ColumnIndexRead_Filter_ReturnHalfResult)->Unit(benchmark::kMillisec
BENCHMARK(BM_ParquetReadDate32)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_OptimizedParquetReadString)->Unit(benchmark::kMillisecond)->Iterations(10);
BENCHMARK(BM_OptimizedParquetReadDate32)->Unit(benchmark::kMillisecond)->Iterations(200);
+BENCHMARK(BM_IcebergReadWithPositionDeletes)->Unit(benchmark::kMillisecond)->Iterations(10)->Arg(0)->Arg(1)->Arg(10)->Arg(50)->Arg(100);
+BENCHMARK(BM_IcebergReadWithEqualityDeletes)->Unit(benchmark::kMillisecond)->Iterations(10)->Arg(0)->Arg(1)->Arg(5)->Arg(10)->Arg(50);
diff --git a/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp
b/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp
index b9fe953a16..5d9e45aab7 100644
--- a/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp
+++ b/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp
@@ -25,17 +25,16 @@
#include <Storages/Output/NormalFileWriter.h>
#include <Storages/SubstraitSource/FileReader.h>
#include <Storages/SubstraitSource/FormatFile.h>
+#include <Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h>
#include <Storages/SubstraitSource/ReadBufferBuilder.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
-#include <Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h>
-#include <Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h>
#include <gtest/gtest.h>
+#include <tests/utils/QueryAssertions.h>
#include <tests/utils/ReaderTestBase.h>
#include <tests/utils/TempFilePath.h>
-#include <utils/QueryAssertions.h>
-#include <utils/gluten_test_util.h>
+#include <tests/utils/gluten_test_util.h>
#include <Common/DebugUtils.h>
-#include <Common/QueryContext.h>
namespace local_engine
{
@@ -969,7 +968,7 @@ TEST_F(IcebergTest, positionalDeletesMultipleSplits)
assertMultipleSplits({}, 10, 3);
}
-TEST_F(IcebergTest, tmp2)
+TEST_F(IcebergTest, basic_utils_test)
{
{
@@ -1000,6 +999,12 @@ TEST_F(IcebergTest, tmp2)
context_->setSetting("input_format_parquet_use_native_reader_with_filter_push_down",
DB::Field(false));
}
+ {
+ std::string file
+ =
third_party_data("benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
+ auto y = runClickhouseSQL(fmt::format("select count(*),
count(distinct l_orderkey), min(l_orderkey), max(l_orderkey) from file('{}')",
file));
+ headBlock(y, 100 , 100);
+ }
{
std::shared_ptr<TempFilePath> dataFilePath = writeDataFiles(rowCount,
4)[0];
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
index d65a0dc265..06a1045a97 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
@@ -346,15 +346,15 @@ local_engine::ColumnIndexStore buildTestColumnIndexStore()
return result;
}
-AnotherRowType buildTestRowType()
+local_engine::RowType buildTestRowType()
{
- AnotherRowType result;
- result.emplace_back(toAnotherFieldType(d1));
- result.emplace_back(toAnotherFieldType(d2));
- result.emplace_back(toAnotherFieldType(d3));
- result.emplace_back(toAnotherFieldType(d4));
- result.emplace_back(toAnotherFieldType(d5));
- result.emplace_back(toAnotherFieldType(d6));
+ local_engine::RowType result;
+ result.emplace_back(toNameTypePair(d1));
+ result.emplace_back(toNameTypePair(d2));
+ result.emplace_back(toNameTypePair(d3));
+ result.emplace_back(toNameTypePair(d4));
+ result.emplace_back(toNameTypePair(d5));
+ result.emplace_back(toNameTypePair(d6));
return result;
}
@@ -395,7 +395,7 @@ void assertRows(const local_engine::RowRanges & ranges,
const std::vector<size_t
local_engine::RowRanges calculateRowRangesForTest(const std::string & exp)
{
- static const AnotherRowType name_and_types = buildTestRowType();
+ static const local_engine::RowType name_and_types = buildTestRowType();
static const local_engine::ColumnIndexStore column_index_store =
buildTestColumnIndexStore();
const local_engine::ColumnIndexFilter filter(
local_engine::test::parseFilter(exp, name_and_types).value(),
local_engine::QueryContext::globalContext());
@@ -550,7 +550,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
{
// COLUMN5 is not found in the column_index_store,
- const AnotherRowType upper_name_and_types{{"COLUMN5", BIGINT()}};
+ const RowType upper_name_and_types{{"COLUMN5", BIGINT()}};
const local_engine::ColumnIndexFilter filter_upper(
local_engine::test::parseFilter("COLUMN5 in (7, 20)",
upper_name_and_types).value(),
local_engine::QueryContext::globalContext());
@@ -560,7 +560,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
}
{
- const AnotherRowType lower_name_and_types{{"column5", BIGINT()}};
+ const RowType lower_name_and_types{{"column5", BIGINT()}};
const local_engine::ColumnIndexFilter filter_lower(
local_engine::test::parseFilter("column5 in (7, 20)",
lower_name_and_types).value(),
local_engine::QueryContext::globalContext());
@@ -1124,7 +1124,7 @@ TEST(ColumnIndex, VectorizedParquetRecordReader)
const FormatSettings format_settings{};
- static const AnotherRowType name_and_types{{"11", BIGINT()}};
+ static const RowType name_and_types{{"11", BIGINT()}};
const auto filterAction = test::parseFilter("`11` = 10 or `11` = 50",
name_and_types);
auto column_index_filter =
std::make_shared<ColumnIndexFilter>(filterAction.value(),
local_engine::QueryContext::globalContext());
diff --git a/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp
b/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp
index 22f88724fc..063dd4c991 100644
--- a/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp
+++ b/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp
@@ -45,7 +45,7 @@ extern const int LOGICAL_ERROR;
namespace local_engine::test
{
using namespace DB;
-std::optional<ActionsDAG> parseFilter(const std::string & filter, const
AnotherRowType & name_and_types)
+std::optional<ActionsDAG> parseFilter(const std::string & filter, const
RowType & name_and_types)
{
using namespace DB;
@@ -212,7 +212,7 @@ DB::DataTypePtr toDataType(const parquet::ColumnDescriptor
& type)
assert(false);
}
-AnotherRowType readParquetSchema(const std::string & file, const
FormatSettings & settings)
+RowType readParquetSchema(const std::string & file, const FormatSettings &
settings)
{
const auto in = std::make_shared<DB::ReadBufferFromFile>(file);
DB::ParquetSchemaReader schema_reader(*in, settings);
diff --git a/cpp-ch/local-engine/tests/utils/gluten_test_util.h
b/cpp-ch/local-engine/tests/utils/gluten_test_util.h
index 3752c5bf8f..05ec901846 100644
--- a/cpp-ch/local-engine/tests/utils/gluten_test_util.h
+++ b/cpp-ch/local-engine/tests/utils/gluten_test_util.h
@@ -38,8 +38,7 @@ class LocalExecutor;
}
using BlockRowType = DB::ColumnsWithTypeAndName;
using BlockFieldType = DB::ColumnWithTypeAndName;
-using AnotherRowType = DB::NamesAndTypesList;
-using AnotherFieldType = DB::NameAndTypePair;
+using FieldType = DB::NameAndTypePair;
namespace parquet
{
@@ -69,9 +68,9 @@ std::shared_ptr<arrow::io::RandomAccessFile>
asArrowFileForParquet(DB::ReadBuffe
DB::DataTypePtr toDataType(const parquet::ColumnDescriptor & type);
-AnotherRowType readParquetSchema(const std::string & file, const
DB::FormatSettings & settings = DB::FormatSettings{});
+RowType readParquetSchema(const std::string & file, const DB::FormatSettings &
settings = DB::FormatSettings{});
-std::optional<DB::ActionsDAG> parseFilter(const std::string & filter, const
AnotherRowType & name_and_types);
+std::optional<DB::ActionsDAG> parseFilter(const std::string & filter, const
RowType & name_and_types);
std::pair<substrait::Plan, std::unique_ptr<LocalExecutor>>
create_plan_and_executor(std::string_view json_plan, std::string_view split,
const std::optional<DB::ContextPtr> & context = std::nullopt);
@@ -99,14 +98,14 @@ inline std::string replaceLocalFilesWithTPCH(const
std::string_view haystack)
return boost::replace_all_copy(std::string{haystack}, wildcard, replaced);
}
-inline AnotherFieldType toAnotherFieldType(const parquet::ColumnDescriptor &
type)
+inline FieldType toNameTypePair(const parquet::ColumnDescriptor & type)
{
return {type.name(), local_engine::test::toDataType(type)};
}
-inline AnotherRowType toAnotherRowType(const DB::Block & header)
+inline local_engine::RowType toRowType(const DB::Block & header)
{
- AnotherRowType types;
+ local_engine::RowType types;
for (const auto & name : header.getNames())
{
const auto * column = header.findByName(name);
@@ -115,8 +114,16 @@ inline AnotherRowType toAnotherRowType(const DB::Block &
header)
return types;
}
+inline local_engine::RowType ROW(std::vector<std::string> && input,
std::vector<DB::DataTypePtr> && type)
+{
+ DB::NamesAndTypesList result;
+ for (size_t i = 0; i < input.size(); ++i)
+ result.emplace_back(input[i], type[i]);
+ return result;
+}
+
template <class Predicate>
-BlockRowType toBlockRowType(const AnotherRowType & type, Predicate predicate)
+BlockRowType toBlockRowType(const local_engine::RowType & type, Predicate
predicate)
{
BlockRowType result;
result.reserve(type.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]