This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 01d08bfa9 [GLUTEN-3582][CH] Using ParquetBlockInputFormat instead of 
VectorizedParquetBlockInputFormat for complex type (#5995)
01d08bfa9 is described below

commit 01d08bfa9d92fa47035f6d66f269d67a64aa4826
Author: Chang chen <[email protected]>
AuthorDate: Wed Jun 5 18:23:21 2024 +0800

    [GLUTEN-3582][CH] Using ParquetBlockInputFormat instead of 
VectorizedParquetBlockInputFormat for complex type (#5995)
    
    [CH] Using ParquetBlockInputFormat instead of 
VectorizedParquetBlockInputFormat for complex type
---
 .../execution/GlutenClickHouseHiveTableSuite.scala |  1 +
 .../Storages/SubstraitSource/ParquetFormatFile.cpp | 20 ++++++++++++---
 .../Storages/SubstraitSource/ParquetFormatFile.h   |  4 ++-
 cpp-ch/local-engine/tests/gtest_parquet_read.cpp   | 29 ++++++++++++++++++++++
 4 files changed, 50 insertions(+), 4 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
index 3c993b622..9b52f6a8c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
@@ -111,6 +111,7 @@ class GlutenClickHouseHiveTableSuite
         getClass.getResource("/").getPath + 
"tests-working-home/spark-warehouse")
       .set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
       .set("spark.gluten.supported.hive.udfs", "my_add")
+      
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", 
"true")
       .setMaster("local[*]")
   }
 
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 2e0f00045..f557df5b2 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -22,7 +22,7 @@
 #include <numeric>
 #include <utility>
 
-#include <DataTypes/DataTypesNumber.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <Formats/FormatFactory.h>
 #include <Formats/FormatSettings.h>
 #include <IO/SeekableReadBuffer.h>
@@ -46,12 +46,13 @@ extern const int UNKNOWN_TYPE;
 
 namespace local_engine
 {
+
 ParquetFormatFile::ParquetFormatFile(
     const DB::ContextPtr & context_,
     const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_,
     const ReadBufferBuilderPtr & read_buffer_builder_,
     bool use_local_format_)
-    : FormatFile(context_, file_info_, read_buffer_builder_), 
use_local_format(use_local_format_)
+    : FormatFile(context_, file_info_, read_buffer_builder_), 
use_pageindex_reader(use_local_format_)
 {
 }
 
@@ -85,7 +86,7 @@ FormatFile::InputFormatPtr 
ParquetFormatFile::createInputFormat(const DB::Block
     std::ranges::set_difference(total_row_group_indices, 
required_row_group_indices, std::back_inserter(skip_row_group_indices));
 
     format_settings.parquet.skip_row_groups = 
std::unordered_set<int>(skip_row_group_indices.begin(), 
skip_row_group_indices.end());
-    if (use_local_format)
+    if (use_pageindex_reader && pageindex_reader_support(header))
         res->input = 
std::make_shared<VectorizedParquetBlockInputFormat>(*(res->read_buffer), 
header, format_settings);
     else
         res->input = 
std::make_shared<DB::ParquetBlockInputFormat>(*(res->read_buffer), header, 
format_settings, 1, 8192);
@@ -112,6 +113,19 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
         return total_rows;
     }
 }
+bool ParquetFormatFile::pageindex_reader_support(const DB::Block & header)
+{
+    const auto result = std::ranges::find_if(
+        header,
+        [](DB::ColumnWithTypeAndName const & col)
+        {
+            const DB::DataTypePtr type_not_nullable = 
DB::removeNullable(col.type);
+            const DB::WhichDataType which(type_not_nullable);
+            return DB::isArray(which) || DB::isMap(which) || 
DB::isTuple(which);
+        });
+
+    return result == header.end();
+}
 
 std::vector<RowGroupInformation> 
ParquetFormatFile::collectRequiredRowGroups(int & total_row_groups) const
 {
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h 
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 045f0049d..ba7f28883 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -55,8 +55,10 @@ public:
 
     String getFileFormat() const override { return "Parquet"; }
 
+    static bool pageindex_reader_support(const DB::Block & header);
+
 private:
-    bool use_local_format;
+    bool use_pageindex_reader;
     std::mutex mutex;
     std::optional<size_t> total_rows;
 
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp 
b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
index 94f28763e..9623ffa98 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
@@ -15,6 +15,9 @@
  * limitations under the License.
  */
 
+#include <Storages/SubstraitSource/ParquetFormatFile.h>
+
+
 #include "config.h"
 
 #if USE_PARQUET
@@ -139,6 +142,32 @@ TEST(ParquetRead, ReadSchema)
     readSchema("alltypes/alltypes_null.parquet");
 }
 
+TEST(ParquetRead, VerifyPageindexReaderSupport)
+{
+    EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("alltypes/alltypes_notnull.parquet")))));
+    EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("alltypes/alltypes_null.parquet")))));
+
+
+    EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("array.parquet")))));
+    EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("date.parquet")))));
+    EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("datetime64.parquet")))));
+    EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("decimal.parquet")))));
+    EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("iris.parquet")))));
+    EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("map.parquet")))));
+    EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("sample.parquet")))));
+    EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+        
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("struct.parquet")))));
+}
+
 TEST(ParquetRead, ReadDataNotNull)
 {
     const std::map<String, Field> fields{


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to