This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f312ead50 [GLUTEN-8846][CH] [Part 3] Add benchmark for Icerberg 
Delete (#9192)
2f312ead50 is described below

commit 2f312ead50dedc1f9fd962a9fa88953de4a2d991
Author: Chang chen <[email protected]>
AuthorDate: Tue Apr 1 17:58:58 2025 +0800

    [GLUTEN-8846][CH] [Part 3] Add benchmark for Icerberg Delete (#9192)
    
    * a simple alias for NamesAndTypesList
    
    * if SparkEnv.get returns null which means something wrong at beforeAll()
    
    * If the 'lineitem' directory doesn't exist in HDFS, upload the 'lineitem' 
data from the local system.
    
    * Add BM_IcebergReadWithEqualityDeletes and 
BM_IcebergReadWithPositionDeletes
---
 .../GlutenClickHouseTPCHAbstractSuite.scala        |  31 +--
 .../execution/tpch/GlutenClickHouseHDFSSuite.scala |  19 +-
 cpp-ch/local-engine/Common/BlockTypeUtils.cpp      |   6 +-
 cpp-ch/local-engine/Common/BlockTypeUtils.h        |  11 +-
 cpp-ch/local-engine/Common/PlanUtil.cpp            |   2 +-
 cpp-ch/local-engine/Parser/FunctionExecutor.cpp    |   2 +-
 .../Parser/RelParsers/ReadRelParser.cpp            |   4 +-
 .../Parser/RelParsers/WriteRelParser.cpp           |   2 +-
 .../Storages/SubstraitSource/CMakeLists.txt        |   2 +-
 .../Storages/SubstraitSource/FileReader.cpp        |  12 +-
 .../EqualityDeleteFileReader.cpp                   |   2 +-
 .../EqualityDeleteFileReader.h                     |   0
 .../{iceberg => Iceberg}/IcebergMetadataColumn.h   |   0
 .../{iceberg => Iceberg}/IcebergReader.cpp         |  15 +-
 .../{iceberg => Iceberg}/IcebergReader.h           |   0
 .../PositionalDeleteFileReader.cpp                 |   4 +-
 .../PositionalDeleteFileReader.h                   |   0
 .../{iceberg => Iceberg}/SimpleParquetReader.cpp   |   0
 .../{iceberg => Iceberg}/SimpleParquetReader.h     |   0
 .../Storages/SubstraitSource/ParquetFormatFile.cpp |   5 +-
 .../local-engine/tests/benchmark_parquet_read.cpp  | 260 ++++++++++++++++++++-
 cpp-ch/local-engine/tests/gtest_iceberge_test.cpp  |  17 +-
 .../tests/gtest_parquet_columnindex.cpp            |  24 +-
 .../local-engine/tests/utils/gluten_test_util.cpp  |   4 +-
 cpp-ch/local-engine/tests/utils/gluten_test_util.h |  23 +-
 25 files changed, 369 insertions(+), 76 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
index 1529a867d3..a299795ad7 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHAbstractSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkEnv}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog}
@@ -38,6 +38,8 @@ abstract class GlutenClickHouseTPCHAbstractSuite
   protected val parquetTableDataPath: String =
     "../../../../gluten-core/src/test/resources/tpch-data"
 
+  final protected lazy val absoluteParquetPath = rootPath + 
parquetTableDataPath
+
   protected val tablesPath: String
   protected val tpchQueries: String
   protected val queriesResults: String
@@ -47,7 +49,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
     super.beforeAll()
 
     if (needCopyParquetToTablePath) {
-      val sourcePath = new File(rootPath + parquetTableDataPath)
+      val sourcePath = new File(absoluteParquetPath)
       FileUtils.copyDirectory(sourcePath, new File(tablesPath))
     }
 
@@ -68,7 +70,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
     spark.sql(s"use $parquetSourceDB")
 
     val parquetTablePath = basePath + "/tpch-data"
-    FileUtils.copyDirectory(new File(rootPath + parquetTableDataPath), new 
File(parquetTablePath))
+    FileUtils.copyDirectory(new File(absoluteParquetPath), new 
File(parquetTablePath))
 
     createNotNullTPCHTablesInParquet(parquetTablePath)
 
@@ -236,7 +238,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
     spark.sql(s"use $parquetSourceDB")
 
     val parquetTablePath = basePath + "/tpch-data"
-    FileUtils.copyDirectory(new File(rootPath + parquetTableDataPath), new 
File(parquetTablePath))
+    FileUtils.copyDirectory(new File(absoluteParquetPath), new 
File(parquetTablePath))
 
     createNotNullTPCHTablesInParquet(parquetTablePath)
 
@@ -575,16 +577,19 @@ abstract class GlutenClickHouseTPCHAbstractSuite
   }
 
   override protected def afterAll(): Unit = {
-    // guava cache invalidate event trigger remove operation may in seconds 
delay, so wait a bit
-    // normally this doesn't take more than 1s
-    eventually(timeout(60.seconds), interval(1.seconds)) {
-      // Spark listener message was not sent in time with ci env.
-      // In tpch case, there are more then 10 hbj data has build.
-      // Let's just verify it was cleaned ever.
-      assert(CHBroadcastBuildSideCache.size() <= 10)
-    }
 
-    ClickhouseSnapshot.clearAllFileStatusCache()
+    // if SparkEnv.get returns null which means something wrong at beforeAll()
+    if (SparkEnv.get != null) {
+      // guava cache invalidate event trigger remove operation may in seconds 
delay, so wait a bit
+      // normally this doesn't take more than 1s
+      eventually(timeout(60.seconds), interval(1.seconds)) {
+        // Spark listener message was not sent in time with ci env.
+        // In tpch case, there are more than 10 hbj data has built.
+        // Let's just verify it was cleaned ever.
+        assert(CHBroadcastBuildSideCache.size() <= 10)
+      }
+      ClickhouseSnapshot.clearAllFileStatusCache()
+    }
     DeltaLog.clearCache()
     super.afterAll()
   }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
index b16ec5fe57..31dbe38f10 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseHDFSSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 
 import org.apache.commons.io.IOUtils
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileUtil, Path}
 
 import java.nio.charset.Charset
 
@@ -70,6 +70,23 @@ class GlutenClickHouseHDFSSuite
 
   override def beforeAll(): Unit = {
     super.beforeAll()
+    val targetFile = new Path(s"$tablesPath/lineitem")
+    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+    val existed = fs.exists(targetFile)
+    // If the 'lineitem' directory doesn't exist in HDFS,
+    // upload the 'lineitem' data from the local system.
+    if (!existed) {
+      val localDataDir = new Path(s"$absoluteParquetPath/lineitem")
+      val localFs = 
localDataDir.getFileSystem(spark.sessionState.newHadoopConf())
+      FileUtil.copy(
+        localFs,
+        localDataDir,
+        fs,
+        targetFile,
+        false,
+        true,
+        spark.sessionState.newHadoopConf())
+    }
   }
 
   override protected def beforeEach(): Unit = {
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp 
b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
index b065f2a11b..e669e84a1b 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.cpp
@@ -22,7 +22,7 @@
 
 namespace local_engine
 {
-DB::Block toSampleBlock(const DB::NamesAndTypesList & type)
+DB::Block toSampleBlock(const RowType & type)
 {
     DB::ColumnsWithTypeAndName result;
     result.reserve(type.size());
@@ -31,9 +31,9 @@ DB::Block toSampleBlock(const DB::NamesAndTypesList & type)
     return result;
 }
 
-DB::NamesAndTypesList blockToNameAndTypeList(const DB::Block & header)
+RowType blockToRowType(const DB::Block & header)
 {
-    DB::NamesAndTypesList types;
+    RowType types;
     for (const auto & name : header.getNames())
     {
         const auto * column = header.findByName(name);
diff --git a/cpp-ch/local-engine/Common/BlockTypeUtils.h 
b/cpp-ch/local-engine/Common/BlockTypeUtils.h
index ea7bb9f7e6..d0829a3fb0 100644
--- a/cpp-ch/local-engine/Common/BlockTypeUtils.h
+++ b/cpp-ch/local-engine/Common/BlockTypeUtils.h
@@ -28,6 +28,11 @@
 
 namespace local_engine
 {
+/**
+ * A simple alias for NamesAndTypesList
+ */
+using RowType = DB::NamesAndTypesList;
+
 inline DB::DataTypePtr BIGINT()
 {
     return std::make_shared<DB::DataTypeInt64>();
@@ -87,8 +92,8 @@ inline DB::ColumnWithTypeAndName toColumnType(const 
DB::NameAndTypePair & type)
     return DB::ColumnWithTypeAndName(type.type, type.name);
 }
 
-DB::Block toSampleBlock(const DB::NamesAndTypesList & type);
-DB::NamesAndTypesList blockToNameAndTypeList(const DB::Block & header);
+DB::Block toSampleBlock(const RowType & type);
+RowType blockToRowType(const DB::Block & header);
 DB::DataTypePtr wrapNullableType(bool nullable, DB::DataTypePtr nested_type);
 
 inline DB::DataTypePtr wrapNullableType(DB::DataTypePtr nested_type)
@@ -128,7 +133,7 @@ inline DB::NamesWithAliases buildNamesWithAliases(const 
DB::Block & input, const
     return aliases;
 }
 
-// begion of CppToDataType
+// begin of CppToDataType
 template <typename T>
 struct CppToDataType;
 
diff --git a/cpp-ch/local-engine/Common/PlanUtil.cpp 
b/cpp-ch/local-engine/Common/PlanUtil.cpp
index ebc3b6da3d..1efe063383 100644
--- a/cpp-ch/local-engine/Common/PlanUtil.cpp
+++ b/cpp-ch/local-engine/Common/PlanUtil.cpp
@@ -112,7 +112,7 @@ DB::IQueryPlanStep * addRemoveNullableStep(DB::QueryPlan & 
plan, const DB::Conte
 
 DB::IQueryPlanStep * renamePlanHeader(DB::QueryPlan & plan, const 
BuildNamesWithAliases & buildAliases, const String & step_desc)
 {
-    DB::ActionsDAG 
actions_dag{blockToNameAndTypeList(plan.getCurrentHeader())};
+    DB::ActionsDAG actions_dag{blockToRowType(plan.getCurrentHeader())};
     DB::NamesWithAliases aliases;
     buildAliases(plan.getCurrentHeader(), aliases);
     actions_dag.project(aliases);
diff --git a/cpp-ch/local-engine/Parser/FunctionExecutor.cpp 
b/cpp-ch/local-engine/Parser/FunctionExecutor.cpp
index 93f30d55d5..a6b795350f 100644
--- a/cpp-ch/local-engine/Parser/FunctionExecutor.cpp
+++ b/cpp-ch/local-engine/Parser/FunctionExecutor.cpp
@@ -73,7 +73,7 @@ void FunctionExecutor::buildHeader()
 
 void FunctionExecutor::parseExpression()
 {
-    DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)};
+    DB::ActionsDAG actions_dag{blockToRowType(header)};
     /// Notice keep_result must be true, because result_node of current 
function must be output node in actions_dag
     const auto * node = 
expression_parser->parseFunction(expression.scalar_function(), actions_dag, 
true);
     result_name = node->result_name;
diff --git a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
index 8ae7db7d75..6f6bd2d3a4 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/ReadRelParser.cpp
@@ -28,13 +28,13 @@
 #include <Parser/TypeParser.h>
 #include <Processors/QueryPlan/ReadFromPreparedSource.h>
 #include <Storages/SourceFromJavaIter.h>
+#include <Storages/SourceFromRange.h>
 #include <Storages/SubstraitSource/SubstraitFileSource.h>
 #include <Storages/SubstraitSource/SubstraitFileSourceStep.h>
 #include <google/protobuf/wrappers.pb.h>
 #include <rapidjson/document.h>
 #include <Common/BlockTypeUtils.h>
 #include <Common/DebugUtils.h>
-#include <Storages/SourceFromRange.h>
 
 namespace DB
 {
@@ -198,7 +198,7 @@ QueryPlanStepPtr 
ReadRelParser::parseReadRelWithLocalFile(const substrait::ReadR
 
     if (rel.has_filter())
     {
-        DB::ActionsDAG actions_dag{blockToNameAndTypeList(header)};
+        DB::ActionsDAG actions_dag{blockToRowType(header)};
         const DB::ActionsDAG::Node * filter_node = 
parseExpression(actions_dag, rel.filter());
         actions_dag.addOrReplaceInOutputs(*filter_node);
         assert(filter_node == 
&(actions_dag.findInOutputs(filter_node->result_name)));
diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp 
b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
index d5a8dba1f5..73640da238 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
@@ -70,7 +70,7 @@ DB::ProcessorPtr make_sink(
 
 DB::ExpressionActionsPtr create_rename_action(const DB::Block & input, const 
DB::Block & output)
 {
-    ActionsDAG actions_dag{blockToNameAndTypeList(input)};
+    ActionsDAG actions_dag{blockToRowType(input)};
     actions_dag.project(buildNamesWithAliases(input, output));
     return std::make_shared<DB::ExpressionActions>(std::move(actions_dag));
 }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt 
b/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
index 4248723417..639c1f043f 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/CMakeLists.txt
@@ -18,7 +18,7 @@ set(ARROW_INCLUDE_DIR 
"${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src")
 add_headers_and_sources(substrait_source .)
 add_headers_and_sources(substrait_source Delta)
 add_headers_and_sources(substrait_source Delta/Bitmap)
-add_headers_and_sources(substrait_source iceberg)
+add_headers_and_sources(substrait_source Iceberg)
 
 add_library(substrait_source ${substrait_source_sources})
 target_compile_options(
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index b9e425a210..fedd043500 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -21,15 +21,15 @@
 #include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/DataTypesDecimal.h>
 #include <IO/ReadBufferFromString.h>
+#include <Parser/SubstraitParserUtils.h>
+#include <Storages/SubstraitSource/Delta/DeltaParquetMeta.h>
+#include <Storages/SubstraitSource/Delta/DeltaReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergReader.h>
 #include <Storages/SubstraitSource/ParquetFormatFile.h>
-#include <Storages/SubstraitSource/iceberg/IcebergReader.h>
 #include <boost/algorithm/string/case_conv.hpp>
 #include <Common/CHUtil.h>
 #include <Common/Exception.h>
 #include <Common/GlutenStringUtils.h>
-#include <Parser/SubstraitParserUtils.h>
-#include <Storages/SubstraitSource/Delta/DeltaParquetMeta.h>
-#include <Storages/SubstraitSource/Delta/DeltaReader.h>
 
 namespace DB
 {
@@ -243,6 +243,7 @@ NormalFileReader::NormalFileReader(
     const FormatFile::InputFormatPtr & input_format_)
     : BaseReader(file_, to_read_header_, output_header_), 
input_format(input_format_)
 {
+    assert(input_format);
 }
 
 bool NormalFileReader::pull(DB::Chunk & chunk)
@@ -308,7 +309,8 @@ std::unique_ptr<NormalFileReader> createNormalFileReader(
                     row_index_filter_type = toString(column.value());
             }
         }
-        return delta::DeltaReader::create(file, to_read_header_, 
output_header_, input_format, row_index_ids_encoded, row_index_filter_type);
+        return delta::DeltaReader::create(
+            file, to_read_header_, output_header_, input_format, 
row_index_ids_encoded, row_index_filter_type);
     }
 
     return std::make_unique<NormalFileReader>(file, to_read_header_, 
output_header_, input_format);
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp
 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.cpp
similarity index 99%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp
rename to 
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.cpp
index 8f3ddb9910..8455d60bc4 100644
--- 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.cpp
+++ 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.cpp
@@ -23,7 +23,7 @@
 #include <Functions/FunctionFactory.h>
 #include <Interpreters/ExpressionActions.h>
 #include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
-#include <Storages/SubstraitSource/iceberg/SimpleParquetReader.h>
+#include <Storages/SubstraitSource/Iceberg/SimpleParquetReader.h>
 #include <Common/BlockTypeUtils.h>
 using namespace DB;
 
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h
 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h
similarity index 100%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h
rename to 
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h
similarity index 100%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h
rename to 
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.cpp
similarity index 96%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp
rename to cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.cpp
index 09bc4e0b7b..3de8520703 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.cpp
@@ -20,8 +20,8 @@
 #include <DataTypes/DataTypeNullable.h>
 #include <Storages/Parquet/ParquetMeta.h>
 #include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
-#include <Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h>
-#include <Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.h>
+#include <Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h>
+#include <Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.h>
 #include <Common/BlockTypeUtils.h>
 
 using namespace DB;
@@ -60,14 +60,11 @@ std::unique_ptr<IcebergReader> IcebergReader::create(
         ? nullptr
         : EqualityDeleteFileReader::createDeleteExpr(context, 
file_->getFileSchema(), delete_files, it_equal->second, new_header);
 
+    auto input = input_format_callback(new_header);
+    if (!input)
+        return nullptr;
     return std::make_unique<IcebergReader>(
-        file_,
-        new_header,
-        output_header_,
-        input_format_callback(new_header),
-        delete_expr,
-        std::move(delete_bitmap_array),
-        to_read_header_.columns());
+        file_, new_header, output_header_, input, delete_expr, 
std::move(delete_bitmap_array), to_read_header_.columns());
 }
 
 IcebergReader::IcebergReader(
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
similarity index 100%
rename from cpp-ch/local-engine/Storages/SubstraitSource/iceberg/IcebergReader.h
rename to cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/IcebergReader.h
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.cpp
 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.cpp
similarity index 96%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.cpp
rename to 
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.cpp
index 235a907603..3db44699dc 100644
--- 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.cpp
+++ 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.cpp
@@ -20,8 +20,8 @@
 #include <Functions/FunctionFactory.h>
 #include <Storages/Parquet/ParquetMeta.h>
 #include <Storages/SubstraitSource/Delta/Bitmap/DeltaDVRoaringBitmapArray.h>
-#include <Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h>
-#include <Storages/SubstraitSource/iceberg/SimpleParquetReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h>
+#include <Storages/SubstraitSource/Iceberg/SimpleParquetReader.h>
 #include <Common/BlockTypeUtils.h>
 
 using namespace DB;
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.h
 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.h
similarity index 100%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/PositionalDeleteFileReader.h
rename to 
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/PositionalDeleteFileReader.h
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.cpp
similarity index 100%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.cpp
rename to 
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.cpp
diff --git 
a/cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.h
similarity index 100%
rename from 
cpp-ch/local-engine/Storages/SubstraitSource/iceberg/SimpleParquetReader.h
rename to 
cpp-ch/local-engine/Storages/SubstraitSource/Iceberg/SimpleParquetReader.h
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 71abd0eb84..afa349743a 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -230,9 +230,8 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
     auto in = read_buffer_builder->build(file_info);
     auto result = collectRequiredRowGroups(*in, file_info);
 
-    size_t rows = 0;
-    for (const auto & rowgroup : result.readRowGroups)
-        rows += rowgroup.num_rows;
+    size_t rows = std::ranges::fold_left(
+        result.readRowGroups, static_cast<size_t>(0), [](size_t sum, const 
auto & row_group) { return sum + row_group.num_rows; });
 
     {
         std::lock_guard lock(mutex);
diff --git a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp 
b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
index 0ffa2fb7e4..a1b72dba51 100644
--- a/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_parquet_read.cpp
@@ -23,14 +23,21 @@
 #include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
 #include <QueryPipeline/QueryPipeline.h>
 #include <QueryPipeline/QueryPipelineBuilder.h>
+#include <Storages/Output/NormalFileWriter.h>
 #include <Storages/Parquet/ParquetMeta.h>
 #include <Storages/Parquet/VectorizedParquetRecordReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h>
 #include <Storages/SubstraitSource/SubstraitFileSource.h>
+#include <Storages/SubstraitSource/substrait_fwd.h>
 #include <benchmark/benchmark.h>
 #include <parquet/arrow/reader.h>
 #include <substrait/plan.pb.h>
+#include <tests/utils/TempFilePath.h>
 #include <tests/utils/gluten_test_util.h>
+#include <Poco/Path.h>
+#include <Poco/URI.h>
 #include <Poco/Util/MapConfiguration.h>
+#include <Common/BlockTypeUtils.h>
 #include <Common/DebugUtils.h>
 #include <Common/QueryContext.h>
 
@@ -219,7 +226,7 @@ void 
BM_ColumnIndexRead_Filter_ReturnAllResult(benchmark::State & state)
         
"benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
     const std::string filter1 = "l_shipdate is not null AND l_shipdate <= 
toDate32('1998-09-01')";
     const substrait::ReadRel::LocalFiles files = createLocalFiles(filename, 
true);
-    const AnotherRowType schema = 
local_engine::test::readParquetSchema(filename);
+    const local_engine::RowType schema = 
local_engine::test::readParquetSchema(filename);
     auto pushDown = local_engine::test::parseFilter(filter1, schema);
     const Block header = {local_engine::toSampleBlock(schema)};
 
@@ -236,7 +243,7 @@ void 
BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
         
"benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
     const std::string filter1 = "l_orderkey is not null AND l_orderkey > 
300977829";
     const substrait::ReadRel::LocalFiles files = createLocalFiles(filename, 
true);
-    const AnotherRowType schema = 
local_engine::test::readParquetSchema(filename);
+    const local_engine::RowType schema = 
local_engine::test::readParquetSchema(filename);
     auto pushDown = local_engine::test::parseFilter(filter1, schema);
     const Block header = {local_engine::toSampleBlock(schema)};
 
@@ -245,6 +252,253 @@ void 
BM_ColumnIndexRead_Filter_ReturnHalfResult(benchmark::State & state)
     
local_engine::QueryContext::globalMutableContext()->setConfig(Poco::AutoPtr(new 
Poco::Util::MapConfiguration()));
 }
 
+//// Iceberg perf test
+///
+
+size_t readFileWithDeletesAndGetRowCount(
+    const std::string & file, const DB::Block & header, const 
local_engine::SubstraitIcebergDeleteFile * delete_file)
+{
+    using namespace DB;
+    using namespace local_engine;
+    using namespace local_engine::test;
+
+    substrait::ReadRel::LocalFiles files;
+    substrait::ReadRel::LocalFiles::FileOrFiles * file_item = 
files.add_items();
+    file_item->set_uri_file("file://" + file);
+    file_item->set_start(0);
+    file_item->set_length(std::filesystem::file_size(file));
+
+    substrait::ReadRel::LocalFiles::FileOrFiles::IcebergReadOptions 
iceberg_options;
+    substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions 
parquet_format;
+    iceberg_options.mutable_parquet()->CopyFrom(parquet_format);
+
+    if (delete_file)
+        iceberg_options.add_delete_files()->CopyFrom(*delete_file);
+
+    file_item->mutable_iceberg()->CopyFrom(iceberg_options);
+
+
+    auto builder = std::make_unique<QueryPipelineBuilder>();
+    
builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(),
 header, files)));
+    auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
+    auto reader = PullingPipelineExecutor(pipeline);
+
+
+    size_t total_read_rows = 0;
+    DB::Block res;
+    while (reader.pull(res))
+        total_read_rows += res.rows();
+    return total_read_rows;
+}
+
+std::pair<size_t, int64_t> calculateRowsAndDeleteCount(benchmark::State & 
state, const std::string & file_path, const DB::Block & header)
+{
+    using namespace DB;
+    using namespace local_engine;
+
+    FormatSettings format_settings;
+
+    ParquetMetaBuilder metaBuilder{
+        .case_insensitive = 
format_settings.parquet.case_insensitive_column_matching,
+        .allow_missing_columns = 
format_settings.parquet.allow_missing_columns};
+    ReadBufferFromFilePRead fileReader(file_path);
+    metaBuilder.build(fileReader, header);
+
+    size_t total_rows = std::ranges::fold_left(
+        metaBuilder.readRowGroups, static_cast<size_t>(0), [](size_t sum, 
const auto & row_group) { return sum + row_group.num_rows; });
+
+    // Calculate delete count based on percentage
+    int64_t delete_percentage = state.range(0);
+    int64_t delete_count = total_rows * delete_percentage / 100;
+
+    return {total_rows, delete_count};
+}
+
+// Helper function to write a Parquet file with position deletes
+std::shared_ptr<local_engine::test::TempFilePath>
+writePositionDeleteFile(const std::string & base_file_path, const 
std::vector<int64_t> & delete_positions)
+{
+    using namespace local_engine;
+    using namespace local_engine::test;
+    using namespace local_engine::iceberg;
+
+    auto delete_file_path = TempFilePath::tmp("parquet");
+
+    // Create the file path column with base file path repeated
+    std::string uri_file_path = "file://" + base_file_path;
+    auto file_path_vector = createColumn<std::string>(delete_positions.size(), 
[&](size_t /*row*/) { return uri_file_path; });
+
+    // Create the position column with delete positions
+    auto deletePosVector = createColumn<int64_t>(delete_positions);
+
+    // Create the block with both columns
+    DB::Block delete_block{
+        {file_path_vector,
+         IcebergMetadataColumn::icebergDeleteFilePathColumn()->type,
+         IcebergMetadataColumn::icebergDeleteFilePathColumn()->name},
+        {deletePosVector, 
IcebergMetadataColumn::icebergDeletePosColumn()->type, 
IcebergMetadataColumn::icebergDeletePosColumn()->name}};
+
+    // Write the block to the delete file
+    const DB::ContextPtr context = QueryContext::globalContext();
+    const Poco::Path file{delete_file_path->string()};
+    const Poco::URI fileUri{file};
+
+    auto writer = NormalFileWriter::create(context, fileUri.toString(), 
delete_block, "parquet");
+    writer->write(delete_block);
+    writer->close();
+
+    return delete_file_path;
+}
+std::pair<local_engine::SubstraitIcebergDeleteFile, 
std::shared_ptr<local_engine::test::TempFilePath>>
+createPositionDeleteFile(int64_t delete_count, size_t total_rows, const 
std::string & base_file_path)
+{
+    if (delete_count == 0)
+        return {{}, nullptr};
+
+    assert(delete_count > 0);
+    using namespace local_engine;
+    using namespace local_engine::test;
+
+    std::vector<int64_t> delete_positions;
+    delete_positions.reserve(delete_count);
+
+    std::vector<int64_t> all_positions(total_rows);
+    std::iota(all_positions.begin(), all_positions.end(), 0);
+
+    std::mt19937 g(std::random_device{}());
+    std::ranges::shuffle(all_positions, g);
+
+    delete_positions.assign(all_positions.begin(), all_positions.begin() + 
delete_count);
+    std::ranges::sort(delete_positions);
+
+    std::shared_ptr<TempFilePath> delete_file_path = 
writePositionDeleteFile(base_file_path, delete_positions);
+
+    SubstraitIcebergDeleteFile delete_file;
+    delete_file.set_filecontent(IcebergReadOptions::POSITION_DELETES);
+    delete_file.set_filepath("file://" + delete_file_path->string());
+    delete_file.set_recordcount(delete_count);
+    
delete_file.set_filesize(std::filesystem::file_size(delete_file_path->string()));
+
+    substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions 
parquet_format;
+    delete_file.mutable_parquet()->CopyFrom(parquet_format);
+
+    return {delete_file, delete_file_path};
+}
+
+// Helper function to write a Parquet file with equality deletes
+std::shared_ptr<local_engine::test::TempFilePath>
+writeEqualityDeleteFile(const std::vector<int64_t> & delete_values, const 
std::string & column_name = "l_shipdate")
+{
+    using namespace local_engine;
+    using namespace local_engine::test;
+
+    auto delete_file_path = TempFilePath::tmp("parquet");
+
+    // Create the column with values to be deleted
+    auto delete_values_vector = createColumn<int64_t>(delete_values);
+
+    // Create the block with the delete values
+    DB::Block delete_block{{delete_values_vector, 
std::make_shared<DB::DataTypeInt64>(), column_name}};
+
+    // Write the block to the delete file
+    const DB::ContextPtr context = QueryContext::globalContext();
+    const Poco::Path file{delete_file_path->string()};
+    const Poco::URI fileUri{file};
+
+    auto writer = NormalFileWriter::create(context, fileUri.toString(), 
delete_block, "parquet");
+    writer->write(delete_block);
+    writer->close();
+
+    return delete_file_path;
+}
+
+std::pair<local_engine::SubstraitIcebergDeleteFile, 
std::shared_ptr<local_engine::test::TempFilePath>>
+createEqualityDeleteFile(int64_t delete_count)
+{
+    if (delete_count == 0)
+        return {{}, nullptr};
+
+    assert(delete_count > 0);
+    using namespace local_engine;
+    using namespace local_engine::test;
+
+    std::vector<int64_t> delete_values;
+    delete_values.reserve(delete_count);
+
+    // For simplicity in the benchmark, we uniformly select deletion values 
from the minimum and maximum values.
+    // Therefore, some deletion values do not exist, and we cannot determine 
whether the benchmarking is correct
+    // based on the number of deletions.
+    //
+    // This is acceptable for identifying performance issues with 
EqualityDeletes.
+
+    // +-------+-------------------------+---------------+---------------+
+    // |count()|countDistinct(l_orderkey)|min(l_orderkey)|max(l_orderkey)|
+    // +-------+-------------------------+---------------+---------------+
+    // |8333867|                  8105793|              7|      599999972|
+    // +-------+-------------------------+---------------+---------------+
+    constexpr int64_t min_orderkey = 7;
+    constexpr int64_t max_orderkey = 599999972;
+    std::uniform_int_distribution<int64_t> distrib(min_orderkey, max_orderkey);
+
+    std::mt19937 gen(std::random_device{}());
+    for (int64_t i = 0; i < delete_count; ++i)
+        delete_values.push_back(distrib(gen));
+    std::ranges::sort(delete_values);
+
+
+    std::shared_ptr<TempFilePath> delete_file_path = 
writeEqualityDeleteFile(delete_values, "l_orderkey");
+
+    SubstraitIcebergDeleteFile delete_file;
+    delete_file.set_filecontent(IcebergReadOptions::EQUALITY_DELETES);
+    delete_file.set_filepath("file://" + delete_file_path->string());
+    delete_file.set_recordcount(delete_count);
+    
delete_file.set_filesize(std::filesystem::file_size(delete_file_path->string()));
+    delete_file.add_equalityfieldids(1); // l_orderkey
+
+    substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions 
parquet_format;
+    delete_file.mutable_parquet()->CopyFrom(parquet_format);
+
+    return {delete_file, delete_file_path};
+}
+
+template <bool is_position_delete>
+void BM_IcebergReadWithDeletes(benchmark::State & state)
+{
+    using namespace DB;
+    using namespace local_engine;
+    using namespace local_engine::test;
+
+    std::string file
+        = 
third_party_data("benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
+    Block header{ColumnWithTypeAndName(DataTypeDate32().createColumn(), 
std::make_shared<DataTypeDate32>(), "l_shipdate")};
+    auto [total_rows, delete_count] = calculateRowsAndDeleteCount(state, file, 
header);
+    Block res;
+
+    auto [delete_file, delete_file_path]
+        = is_position_delete ? createPositionDeleteFile(delete_count, 
total_rows, file) : createEqualityDeleteFile(delete_count);
+
+    for (auto _ : state)
+    {
+        size_t total_read_rows = readFileWithDeletesAndGetRowCount(file, 
header, delete_count ? &delete_file : nullptr);
+
+        if (is_position_delete && total_read_rows != total_rows - delete_count)
+            throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Expected {}, but 
got {} ", total_rows - delete_count, total_read_rows);
+
+        // see `createEqualityDeleteFile` for `total_read_rows < total_rows - 
delete_count`
+        if (!is_position_delete && total_read_rows < total_rows - delete_count)
+            throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Read at least {}, 
but got {} ", total_rows - delete_count, total_read_rows);
+    }
+}
+
+void BM_IcebergReadWithPositionDeletes(benchmark::State & state)
+{
+    return BM_IcebergReadWithDeletes<true>(state);
+}
+void BM_IcebergReadWithEqualityDeletes(benchmark::State & state)
+{
+    return BM_IcebergReadWithDeletes<false>(state);
+}
+
 }
 
 
BENCHMARK(BM_ColumnIndexRead_Old)->Unit(benchmark::kMillisecond)->Iterations(20);
@@ -254,3 +508,5 @@ 
BENCHMARK(BM_ColumnIndexRead_Filter_ReturnHalfResult)->Unit(benchmark::kMillisec
 BENCHMARK(BM_ParquetReadDate32)->Unit(benchmark::kMillisecond)->Iterations(10);
 
BENCHMARK(BM_OptimizedParquetReadString)->Unit(benchmark::kMillisecond)->Iterations(10);
 
BENCHMARK(BM_OptimizedParquetReadDate32)->Unit(benchmark::kMillisecond)->Iterations(200);
+BENCHMARK(BM_IcebergReadWithPositionDeletes)->Unit(benchmark::kMillisecond)->Iterations(10)->Arg(0)->Arg(1)->Arg(10)->Arg(50)->Arg(100);
+BENCHMARK(BM_IcebergReadWithEqualityDeletes)->Unit(benchmark::kMillisecond)->Iterations(10)->Arg(0)->Arg(1)->Arg(5)->Arg(10)->Arg(50);
diff --git a/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp 
b/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp
index b9fe953a16..5d9e45aab7 100644
--- a/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp
+++ b/cpp-ch/local-engine/tests/gtest_iceberge_test.cpp
@@ -25,17 +25,16 @@
 #include <Storages/Output/NormalFileWriter.h>
 #include <Storages/SubstraitSource/FileReader.h>
 #include <Storages/SubstraitSource/FormatFile.h>
+#include <Storages/SubstraitSource/Iceberg/EqualityDeleteFileReader.h>
+#include <Storages/SubstraitSource/Iceberg/IcebergMetadataColumn.h>
 #include <Storages/SubstraitSource/ReadBufferBuilder.h>
 #include <Storages/SubstraitSource/SubstraitFileSource.h>
-#include <Storages/SubstraitSource/iceberg/EqualityDeleteFileReader.h>
-#include <Storages/SubstraitSource/iceberg/IcebergMetadataColumn.h>
 #include <gtest/gtest.h>
+#include <tests/utils/QueryAssertions.h>
 #include <tests/utils/ReaderTestBase.h>
 #include <tests/utils/TempFilePath.h>
-#include <utils/QueryAssertions.h>
-#include <utils/gluten_test_util.h>
+#include <tests/utils/gluten_test_util.h>
 #include <Common/DebugUtils.h>
-#include <Common/QueryContext.h>
 
 namespace local_engine
 {
@@ -969,7 +968,7 @@ TEST_F(IcebergTest, positionalDeletesMultipleSplits)
     assertMultipleSplits({}, 10, 3);
 }
 
-TEST_F(IcebergTest, tmp2)
+TEST_F(IcebergTest, basic_utils_test)
 {
 
     {
@@ -1000,6 +999,12 @@ TEST_F(IcebergTest, tmp2)
         
context_->setSetting("input_format_parquet_use_native_reader_with_filter_push_down",
 DB::Field(false));
     }
 
+    {
+        std::string file
+        = 
third_party_data("benchmark/column_index/lineitem/part-00000-9395e12a-3620-4085-9677-c63b920353f4-c000.snappy.parquet");
+         auto y = runClickhouseSQL(fmt::format("select count(*), 
count(distinct l_orderkey), min(l_orderkey), max(l_orderkey) from file('{}')", 
file));
+        headBlock(y, 100 , 100);
+    }
 
     {
         std::shared_ptr<TempFilePath> dataFilePath = writeDataFiles(rowCount, 
4)[0];
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp 
b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
index d65a0dc265..06a1045a97 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
@@ -346,15 +346,15 @@ local_engine::ColumnIndexStore buildTestColumnIndexStore()
     return result;
 }
 
-AnotherRowType buildTestRowType()
+local_engine::RowType buildTestRowType()
 {
-    AnotherRowType result;
-    result.emplace_back(toAnotherFieldType(d1));
-    result.emplace_back(toAnotherFieldType(d2));
-    result.emplace_back(toAnotherFieldType(d3));
-    result.emplace_back(toAnotherFieldType(d4));
-    result.emplace_back(toAnotherFieldType(d5));
-    result.emplace_back(toAnotherFieldType(d6));
+    local_engine::RowType result;
+    result.emplace_back(toNameTypePair(d1));
+    result.emplace_back(toNameTypePair(d2));
+    result.emplace_back(toNameTypePair(d3));
+    result.emplace_back(toNameTypePair(d4));
+    result.emplace_back(toNameTypePair(d5));
+    result.emplace_back(toNameTypePair(d6));
     return result;
 }
 
@@ -395,7 +395,7 @@ void assertRows(const local_engine::RowRanges & ranges, 
const std::vector<size_t
 
 local_engine::RowRanges calculateRowRangesForTest(const std::string & exp)
 {
-    static const AnotherRowType name_and_types = buildTestRowType();
+    static const local_engine::RowType name_and_types = buildTestRowType();
     static const local_engine::ColumnIndexStore column_index_store = 
buildTestColumnIndexStore();
     const local_engine::ColumnIndexFilter filter(
         local_engine::test::parseFilter(exp, name_and_types).value(), 
local_engine::QueryContext::globalContext());
@@ -550,7 +550,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
 
     {
         // COLUMN5 is not found in the column_index_store,
-        const AnotherRowType upper_name_and_types{{"COLUMN5", BIGINT()}};
+        const RowType upper_name_and_types{{"COLUMN5", BIGINT()}};
         const local_engine::ColumnIndexFilter filter_upper(
             local_engine::test::parseFilter("COLUMN5 in (7, 20)", 
upper_name_and_types).value(),
             local_engine::QueryContext::globalContext());
@@ -560,7 +560,7 @@ TEST(ColumnIndex, FilteringWithNotFoundColumnName)
     }
 
     {
-        const AnotherRowType lower_name_and_types{{"column5", BIGINT()}};
+        const RowType lower_name_and_types{{"column5", BIGINT()}};
         const local_engine::ColumnIndexFilter filter_lower(
             local_engine::test::parseFilter("column5 in (7, 20)", 
lower_name_and_types).value(),
             local_engine::QueryContext::globalContext());
@@ -1124,7 +1124,7 @@ TEST(ColumnIndex, VectorizedParquetRecordReader)
     const FormatSettings format_settings{};
 
 
-    static const AnotherRowType name_and_types{{"11", BIGINT()}};
+    static const RowType name_and_types{{"11", BIGINT()}};
     const auto filterAction = test::parseFilter("`11` = 10 or `11` = 50", 
name_and_types);
     auto column_index_filter = 
std::make_shared<ColumnIndexFilter>(filterAction.value(), 
local_engine::QueryContext::globalContext());
 
diff --git a/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp 
b/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp
index 22f88724fc..063dd4c991 100644
--- a/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp
+++ b/cpp-ch/local-engine/tests/utils/gluten_test_util.cpp
@@ -45,7 +45,7 @@ extern const int LOGICAL_ERROR;
 namespace local_engine::test
 {
 using namespace DB;
-std::optional<ActionsDAG> parseFilter(const std::string & filter, const 
AnotherRowType & name_and_types)
+std::optional<ActionsDAG> parseFilter(const std::string & filter, const 
RowType & name_and_types)
 {
     using namespace DB;
 
@@ -212,7 +212,7 @@ DB::DataTypePtr toDataType(const parquet::ColumnDescriptor 
& type)
     assert(false);
 }
 
-AnotherRowType readParquetSchema(const std::string & file, const 
FormatSettings & settings)
+RowType readParquetSchema(const std::string & file, const FormatSettings & 
settings)
 {
     const auto in = std::make_shared<DB::ReadBufferFromFile>(file);
     DB::ParquetSchemaReader schema_reader(*in, settings);
diff --git a/cpp-ch/local-engine/tests/utils/gluten_test_util.h 
b/cpp-ch/local-engine/tests/utils/gluten_test_util.h
index 3752c5bf8f..05ec901846 100644
--- a/cpp-ch/local-engine/tests/utils/gluten_test_util.h
+++ b/cpp-ch/local-engine/tests/utils/gluten_test_util.h
@@ -38,8 +38,7 @@ class LocalExecutor;
 }
 using BlockRowType = DB::ColumnsWithTypeAndName;
 using BlockFieldType = DB::ColumnWithTypeAndName;
-using AnotherRowType = DB::NamesAndTypesList;
-using AnotherFieldType = DB::NameAndTypePair;
+using FieldType = DB::NameAndTypePair;
 
 namespace parquet
 {
@@ -69,9 +68,9 @@ std::shared_ptr<arrow::io::RandomAccessFile> 
asArrowFileForParquet(DB::ReadBuffe
 
 DB::DataTypePtr toDataType(const parquet::ColumnDescriptor & type);
 
-AnotherRowType readParquetSchema(const std::string & file, const 
DB::FormatSettings & settings = DB::FormatSettings{});
+RowType readParquetSchema(const std::string & file, const DB::FormatSettings & 
settings = DB::FormatSettings{});
 
-std::optional<DB::ActionsDAG> parseFilter(const std::string & filter, const 
AnotherRowType & name_and_types);
+std::optional<DB::ActionsDAG> parseFilter(const std::string & filter, const 
RowType & name_and_types);
 
 std::pair<substrait::Plan, std::unique_ptr<LocalExecutor>>
 create_plan_and_executor(std::string_view json_plan, std::string_view split, 
const std::optional<DB::ContextPtr> & context = std::nullopt);
@@ -99,14 +98,14 @@ inline std::string replaceLocalFilesWithTPCH(const 
std::string_view haystack)
     return boost::replace_all_copy(std::string{haystack}, wildcard, replaced);
 }
 
-inline AnotherFieldType toAnotherFieldType(const parquet::ColumnDescriptor & 
type)
+inline FieldType toNameTypePair(const parquet::ColumnDescriptor & type)
 {
     return {type.name(), local_engine::test::toDataType(type)};
 }
 
-inline AnotherRowType toAnotherRowType(const DB::Block & header)
+inline local_engine::RowType toRowType(const DB::Block & header)
 {
-    AnotherRowType types;
+    local_engine::RowType types;
     for (const auto & name : header.getNames())
     {
         const auto * column = header.findByName(name);
@@ -115,8 +114,16 @@ inline AnotherRowType toAnotherRowType(const DB::Block & 
header)
     return types;
 }
 
+inline local_engine::RowType ROW(std::vector<std::string> && input, 
std::vector<DB::DataTypePtr> && type)
+{
+    DB::NamesAndTypesList result;
+    for (size_t i = 0; i < input.size(); ++i)
+        result.emplace_back(input[i], type[i]);
+    return result;
+}
+
 template <class Predicate>
-BlockRowType toBlockRowType(const AnotherRowType & type, Predicate predicate)
+BlockRowType toBlockRowType(const local_engine::RowType & type, Predicate 
predicate)
 {
     BlockRowType result;
     result.reserve(type.size());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to