This is an automated email from the ASF dual-hosted git repository.
taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 504815fdae [GLUTEN-8021][CH] Fix ORC read/write mismatch and parquet
read failure when column with complex types contains null (#8023)
504815fdae is described below
commit 504815fdaec5ee3779e68a00daa5f7e3f51741f2
Author: 李扬 <[email protected]>
AuthorDate: Fri Nov 29 17:41:53 2024 +0800
[GLUTEN-8021][CH] Fix ORC read/write mismatch and parquet read failure when
column with complex types contains null (#8023)
* fix issue https://github.com/apache/incubator-gluten/issues/8022 and
https://github.com/apache/incubator-gluten/issues/8021
* fix bugs
* fix issue https://github.com/apache/incubator-gluten/issues/8032
* update ch version
* fix failed uts
* fix failure in spark3.5
* adjust style
---
.../GlutenClickHouseNativeWriteTableSuite.scala | 61 +++++++++-
cpp-ch/clickhouse.version | 2 +-
cpp-ch/local-engine/Common/CHUtil.cpp | 1 +
.../Storages/Output/NormalFileWriter.cpp | 123 +++++++++++++++++++++
4 files changed, 185 insertions(+), 2 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
index d0d03eaf1e..03d27f33b1 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
@@ -903,7 +903,7 @@ class GlutenClickHouseNativeWriteTableSuite
}
test("GLUTEN-2584: fix native write and read mismatch about complex types") {
- def table(format: String): String = s"t_$format"
+ def table(format: String): String = s"t_2584_$format"
def create(format: String, table_name: Option[String] = None): String =
s"""CREATE TABLE ${table_name.getOrElse(table(format))}(
| id INT,
@@ -934,4 +934,63 @@ class GlutenClickHouseNativeWriteTableSuite
}
)
}
+
+ test(
+ "GLUTEN-8021/8022/8032: fix orc read/write mismatch and parquet" +
+ "read exception when written complex column contains null") {
+ def table(format: String): String = s"t_8021_$format"
+ def create(format: String, table_name: Option[String] = None): String =
+ s"""CREATE TABLE ${table_name.getOrElse(table(format))}(
+ |id int,
+ |x int,
+ |y int,
+ |mp map<string, string>,
+ |arr array<int>,
+ |tup struct<x:int, y:int>,
+ |arr_mp array<map<string, string>>,
+ |mp_arr map<string, array<int>>,
+ |tup_arr struct<a: array<int>>,
+ |tup_map struct<m: map<string, string>>
+ |) stored as $format""".stripMargin
+ def insert(format: String, table_name: Option[String] = None): String =
+ s"""INSERT OVERWRITE TABLE ${table_name.getOrElse(table(format))}
+ |SELECT
+ | id, x, y,
+ | str_to_map(concat('x:', x, ',y:', y)) AS mp,
+ | IF(id % 4 = 0, NULL, array(x, y)) AS arr,
+ | IF(id % 4 = 1, NULL, struct(x, y)) AS tup,
+ | IF(id % 4 = 2, NULL, array(str_to_map(concat('x:', x, ',y:', y))))
AS arr_mp,
+ | IF(id % 4 = 3, NULL, map('x', array(x), 'y', array(y))) AS mp_arr,
+ | IF(id % 4 = 0, NULL, named_struct('a', array(x, y))) AS tup_arr,
+ | IF(id % 4 = 1, NULL, named_struct('m',
+ | str_to_map(concat('x:', x, ',y:', y)))) AS tup_map
+ |FROM (
+ | SELECT
+ | id,
+ | IF(id % 3 = 1, NULL, id + 1) AS x,
+ | IF(id % 3 = 1, NULL, id + 2) AS y
+ | FROM range(100)
+ |) AS data_source;""".stripMargin
+
+ // TODO fix it in spark3.5
+ if (!isSparkVersionGE("3.5")) {
+ nativeWrite2(
+ format => (table(format), create(format), insert(format)),
+ (table_name, format) => {
+ val vanilla_table = s"${table_name}_v"
+ val vanilla_create = create(format, Some(vanilla_table))
+ vanillaWrite {
+ withDestinationTable(vanilla_table, Option(vanilla_create)) {
+ checkInsertQuery(insert(format, Some(vanilla_table)),
checkNative = false)
+ }
+ }
+ val rowsFromOriginTable =
+ spark.sql(s"select * from $vanilla_table").collect()
+ val dfFromWriteTable =
+ spark.sql(s"select * from $table_name")
+ checkAnswer(dfFromWriteTable, rowsFromOriginTable)
+ }
+ )
+ }
+ }
}
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 62a70f06c1..003f111133 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241118
-CH_COMMIT=7f22fe487c88d3b988ea82a5c34882da23ea6289
+CH_COMMIT=a5944dfb7b3
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index de1a263218..88c5303c50 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -710,6 +710,7 @@ void BackendInitializerUtil::initSettings(const
SparkConfigs::ConfigMap & spark_
settings.set("max_parsing_threads", 1);
settings.set("max_download_threads", 1);
settings.set("input_format_parquet_enable_row_group_prefetch", false);
+ settings.set("output_format_parquet_use_custom_encoder", false);
/// update per https://github.com/ClickHouse/ClickHouse/pull/71539
/// if true, we can't get correct metrics for the query
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
index e9fd4f358a..e5a2d89f26 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
@@ -19,13 +19,134 @@
#include <QueryPipeline/QueryPipeline.h>
#include <Poco/URI.h>
#include <Common/DebugUtils.h>
+#include <Columns/ColumnConst.h>
+#include <Columns/ColumnArray.h>
+#include <Columns/ColumnMap.h>
namespace local_engine
{
+using namespace DB;
+
const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"};
const std::string
SparkPartitionedBaseSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
+/// For Nullable(Map(K, V)) or Nullable(Array(T)), if the i-th row is null, we
must make sure its nested data is empty.
+/// It is for ORC/Parquet writing compatiability. For more details, refer to
+/// https://github.com/apache/incubator-gluten/issues/8022 and
https://github.com/apache/incubator-gluten/issues/8021
+static ColumnPtr truncateNestedDataIfNull(const ColumnPtr & column)
+{
+ if (const auto * col_const = checkAndGetColumn<ColumnConst>(column.get()))
+ {
+ size_t s = col_const->size();
+ auto new_data =
truncateNestedDataIfNull(col_const->getDataColumnPtr());
+ return ColumnConst::create(std::move(new_data), s);
+ }
+ else if (const auto * col_array =
checkAndGetColumn<ColumnArray>(column.get()))
+ {
+ auto new_data = truncateNestedDataIfNull(col_array->getDataPtr());
+ return ColumnArray::create(std::move(new_data),
col_array->getOffsetsPtr());
+ }
+ else if (const auto * col_map = checkAndGetColumn<ColumnMap>(column.get()))
+ {
+ auto new_nested =
truncateNestedDataIfNull(col_map->getNestedColumnPtr());
+ return ColumnMap::create(std::move(new_nested));
+ }
+ else if (const auto * col_tuple =
checkAndGetColumn<ColumnTuple>(column.get()))
+ {
+ Columns new_columns;
+ for (size_t i = 0; i < col_tuple->tupleSize(); ++i)
+
new_columns.emplace_back(truncateNestedDataIfNull(col_tuple->getColumnPtr(i)));
+ return ColumnTuple::create(std::move(new_columns));
+ }
+ else if (const auto * col_nullable =
checkAndGetColumn<ColumnNullable>(column.get()))
+ {
+ const auto & null_map = col_nullable->getNullMapData();
+ auto nested =
truncateNestedDataIfNull(col_nullable->getNestedColumnPtr());
+ const auto * nested_array =
checkAndGetColumn<ColumnArray>(nested.get());
+ const auto * nested_map = checkAndGetColumn<ColumnMap>(nested.get());
+ const auto * nested_tuple =
checkAndGetColumn<ColumnTuple>(nested.get());
+
+ if (!memoryIsZero(null_map.data(), 0, null_map.size()) &&
(nested_array || nested_map || nested_tuple))
+ {
+ /// Process Nullable(Array) or Nullable(Map)
+ if (nested_array || nested_map)
+ {
+ if (!nested_array)
+ nested_array =
checkAndGetColumn<ColumnArray>(&nested_map->getNestedColumn());
+
+ const auto & offsets = nested_array->getOffsets();
+ size_t total_data_size = 0;
+ for (size_t i = 0; i < null_map.size(); ++i)
+ total_data_size += (offsets[i] - offsets[i - 1]) *
(!null_map[i]);
+
+ auto new_nested_array = nested_array->cloneEmpty();
+ new_nested_array->reserve(nested_array->size());
+ auto & new_nested_array_data = assert_cast<ColumnArray
&>(*new_nested_array).getData();
+ new_nested_array_data.reserve(total_data_size);
+
+ for (size_t i = 0; i < null_map.size(); ++i)
+ if (null_map[i])
+ new_nested_array->insertDefault();
+ else
+ new_nested_array->insertFrom(*nested_array, i);
+
+ if (nested_map)
+ {
+ auto new_nested_map =
ColumnMap::create(std::move(new_nested_array));
+ return ColumnNullable::create(std::move(new_nested_map),
col_nullable->getNullMapColumnPtr());
+ }
+ else
+ {
+ return ColumnNullable::create(std::move(new_nested_array),
col_nullable->getNullMapColumnPtr());
+ }
+ }
+ else
+ {
+ /// Process Nullable(Tuple)
+ const auto & nested_columns = nested_tuple->getColumns();
+ Columns new_nested_columns(nested_columns.size());
+ for (size_t i = 0; i < nested_columns.size(); ++i)
+ {
+ const auto & nested_column = nested_columns[i];
+ TypeIndex type_index = nested_column->getDataType();
+ if (const auto * nullable_nested_column =
checkAndGetColumn<ColumnNullable>(nested_column.get()))
+ type_index =
nullable_nested_column->getNestedColumnPtr()->getDataType();
+
+ bool should_truncate = type_index == TypeIndex::Array ||
type_index == TypeIndex::Map || type_index == TypeIndex::Tuple;
+ if (should_truncate)
+ {
+ auto new_nested_column = nested_column->cloneEmpty();
+ new_nested_column->reserve(nested_column->size());
+ for (size_t j = 0; j < null_map.size(); ++j)
+ {
+ if (null_map[j])
+ new_nested_column->insertDefault();
+ else
+ new_nested_column->insertFrom(*nested_column,
j);
+ }
+ new_nested_columns[i] = std::move(new_nested_column);
+ }
+ else
+ {
+ new_nested_columns[i] = nested_column;
+ }
+ }
+
+ auto new_nested_tuple =
ColumnTuple::create(std::move(new_nested_columns));
+ return ColumnNullable::create(std::move(new_nested_tuple),
col_nullable->getNullMapColumnPtr());
+ }
+ }
+ else
+ {
+ auto new_nested = truncateNestedDataIfNull(nested);
+ return ColumnNullable::create(std::move(new_nested),
col_nullable->getNullMapColumnPtr());
+ }
+ }
+ else
+ return column;
+}
+
NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const
DB::ContextPtr & context_) : file(file_), context(context_)
{
}
@@ -51,6 +172,8 @@ void NormalFileWriter::write(DB::Block & block)
continue;
const auto & preferred_column =
preferred_schema.getByPosition(index++);
+ /// Make sure nested array or map data is empty when the row is null
in Nullable(Map(K, V)) or Nullable(Array(T)).
+ column.column = truncateNestedDataIfNull(column.column);
column.column = DB::castColumn(column, preferred_column.type);
column.name = preferred_column.name;
column.type = preferred_column.type;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]