This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 01d08bfa9 [GLUTEN-3582][CH] Using ParquetBlockInputFormat instead of
VectorizedParquetBlockInputFormat for complex type (#5995)
01d08bfa9 is described below
commit 01d08bfa9d92fa47035f6d66f269d67a64aa4826
Author: Chang chen <[email protected]>
AuthorDate: Wed Jun 5 18:23:21 2024 +0800
[GLUTEN-3582][CH] Using ParquetBlockInputFormat instead of
VectorizedParquetBlockInputFormat for complex type (#5995)
[CH] Using ParquetBlockInputFormat instead of
VectorizedParquetBlockInputFormat for complex type
---
.../execution/GlutenClickHouseHiveTableSuite.scala | 1 +
.../Storages/SubstraitSource/ParquetFormatFile.cpp | 20 ++++++++++++---
.../Storages/SubstraitSource/ParquetFormatFile.h | 4 ++-
cpp-ch/local-engine/tests/gtest_parquet_read.cpp | 29 ++++++++++++++++++++++
4 files changed, 50 insertions(+), 4 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
index 3c993b622..9b52f6a8c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
@@ -111,6 +111,7 @@ class GlutenClickHouseHiveTableSuite
getClass.getResource("/").getPath +
"tests-working-home/spark-warehouse")
.set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.gluten.supported.hive.udfs", "my_add")
+
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format",
"true")
.setMaster("local[*]")
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
index 2e0f00045..f557df5b2 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.cpp
@@ -22,7 +22,7 @@
#include <numeric>
#include <utility>
-#include <DataTypes/DataTypesNumber.h>
+#include <DataTypes/DataTypeNullable.h>
#include <Formats/FormatFactory.h>
#include <Formats/FormatSettings.h>
#include <IO/SeekableReadBuffer.h>
@@ -46,12 +46,13 @@ extern const int UNKNOWN_TYPE;
namespace local_engine
{
+
ParquetFormatFile::ParquetFormatFile(
const DB::ContextPtr & context_,
const substrait::ReadRel::LocalFiles::FileOrFiles & file_info_,
const ReadBufferBuilderPtr & read_buffer_builder_,
bool use_local_format_)
- : FormatFile(context_, file_info_, read_buffer_builder_),
use_local_format(use_local_format_)
+ : FormatFile(context_, file_info_, read_buffer_builder_),
use_pageindex_reader(use_local_format_)
{
}
@@ -85,7 +86,7 @@ FormatFile::InputFormatPtr
ParquetFormatFile::createInputFormat(const DB::Block
std::ranges::set_difference(total_row_group_indices,
required_row_group_indices, std::back_inserter(skip_row_group_indices));
format_settings.parquet.skip_row_groups =
std::unordered_set<int>(skip_row_group_indices.begin(),
skip_row_group_indices.end());
- if (use_local_format)
+ if (use_pageindex_reader && pageindex_reader_support(header))
res->input =
std::make_shared<VectorizedParquetBlockInputFormat>(*(res->read_buffer),
header, format_settings);
else
res->input =
std::make_shared<DB::ParquetBlockInputFormat>(*(res->read_buffer), header,
format_settings, 1, 8192);
@@ -112,6 +113,19 @@ std::optional<size_t> ParquetFormatFile::getTotalRows()
return total_rows;
}
}
+bool ParquetFormatFile::pageindex_reader_support(const DB::Block & header)
+{
+ const auto result = std::ranges::find_if(
+ header,
+ [](DB::ColumnWithTypeAndName const & col)
+ {
+ const DB::DataTypePtr type_not_nullable =
DB::removeNullable(col.type);
+ const DB::WhichDataType which(type_not_nullable);
+ return DB::isArray(which) || DB::isMap(which) ||
DB::isTuple(which);
+ });
+
+ return result == header.end();
+}
std::vector<RowGroupInformation>
ParquetFormatFile::collectRequiredRowGroups(int & total_row_groups) const
{
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
index 045f0049d..ba7f28883 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/ParquetFormatFile.h
@@ -55,8 +55,10 @@ public:
String getFileFormat() const override { return "Parquet"; }
+ static bool pageindex_reader_support(const DB::Block & header);
+
private:
- bool use_local_format;
+ bool use_pageindex_reader;
std::mutex mutex;
std::optional<size_t> total_rows;
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
index 94f28763e..9623ffa98 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp
@@ -15,6 +15,9 @@
* limitations under the License.
*/
+#include <Storages/SubstraitSource/ParquetFormatFile.h>
+
+
#include "config.h"
#if USE_PARQUET
@@ -139,6 +142,32 @@ TEST(ParquetRead, ReadSchema)
readSchema("alltypes/alltypes_null.parquet");
}
+TEST(ParquetRead, VerifyPageindexReaderSupport)
+{
+ EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("alltypes/alltypes_notnull.parquet")))));
+ EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("alltypes/alltypes_null.parquet")))));
+
+
+ EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("array.parquet")))));
+ EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("date.parquet")))));
+ EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("datetime64.parquet")))));
+ EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("decimal.parquet")))));
+ EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("iris.parquet")))));
+ EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("map.parquet")))));
+ EXPECT_TRUE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("sample.parquet")))));
+ EXPECT_FALSE(local_engine::ParquetFormatFile::pageindex_reader_support(
+
toBlockRowType(local_engine::test::readParquetSchema(local_engine::test::data_file("struct.parquet")))));
+}
+
TEST(ParquetRead, ReadDataNotNull)
{
const std::map<String, Field> fields{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]