This is an automated email from the ASF dual-hosted git repository.

taiyangli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 504815fdae [GLUTEN-8021][CH] Fix ORC read/write mismatch and parquet 
read failure when column with complex types contains null (#8023)
504815fdae is described below

commit 504815fdaec5ee3779e68a00daa5f7e3f51741f2
Author: 李扬 <[email protected]>
AuthorDate: Fri Nov 29 17:41:53 2024 +0800

    [GLUTEN-8021][CH] Fix ORC read/write mismatch and parquet read failure when 
column with complex types contains null (#8023)
    
    * fix issue https://github.com/apache/incubator-gluten/issues/8022 and 
https://github.com/apache/incubator-gluten/issues/8021
    
    * fix bugs
    
    * fix issue https://github.com/apache/incubator-gluten/issues/8032
    
    * update ch version
    
    * fix failed uts
    
    * fix failure in spark3.5
    
    * adjust style
---
 .../GlutenClickHouseNativeWriteTableSuite.scala    |  61 +++++++++-
 cpp-ch/clickhouse.version                          |   2 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              |   1 +
 .../Storages/Output/NormalFileWriter.cpp           | 123 +++++++++++++++++++++
 4 files changed, 185 insertions(+), 2 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
index d0d03eaf1e..03d27f33b1 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseNativeWriteTableSuite.scala
@@ -903,7 +903,7 @@ class GlutenClickHouseNativeWriteTableSuite
   }
 
   test("GLUTEN-2584: fix native write and read mismatch about complex types") {
-    def table(format: String): String = s"t_$format"
+    def table(format: String): String = s"t_2584_$format"
     def create(format: String, table_name: Option[String] = None): String =
       s"""CREATE TABLE ${table_name.getOrElse(table(format))}(
          |  id INT,
@@ -934,4 +934,63 @@ class GlutenClickHouseNativeWriteTableSuite
       }
     )
   }
+
+  test(
+    "GLUTEN-8021/8022/8032: fix orc read/write mismatch and parquet" +
+      "read exception when written complex column contains null") {
+    def table(format: String): String = s"t_8021_$format"
+    def create(format: String, table_name: Option[String] = None): String =
+      s"""CREATE TABLE ${table_name.getOrElse(table(format))}(
+         |id int,
+         |x int,
+         |y int,
+         |mp map<string, string>,
+         |arr array<int>,
+         |tup struct<x:int, y:int>,
+         |arr_mp array<map<string, string>>,
+         |mp_arr map<string, array<int>>,
+         |tup_arr struct<a: array<int>>,
+         |tup_map struct<m: map<string, string>>
+         |) stored as $format""".stripMargin
+    def insert(format: String, table_name: Option[String] = None): String =
+      s"""INSERT OVERWRITE TABLE ${table_name.getOrElse(table(format))}
+         |SELECT
+         |  id, x, y,
+         |  str_to_map(concat('x:', x, ',y:', y)) AS mp,
+         |  IF(id % 4 = 0, NULL, array(x, y)) AS arr,
+         |  IF(id % 4 = 1, NULL, struct(x, y)) AS tup,
+         |  IF(id % 4 = 2, NULL, array(str_to_map(concat('x:', x, ',y:', y)))) 
AS arr_mp,
+         |  IF(id % 4 = 3, NULL, map('x', array(x), 'y', array(y))) AS mp_arr,
+         |  IF(id % 4 = 0, NULL, named_struct('a', array(x, y))) AS tup_arr,
+         |  IF(id % 4 = 1, NULL, named_struct('m',
+         |  str_to_map(concat('x:', x, ',y:', y)))) AS tup_map
+         |FROM (
+         |  SELECT
+         |    id,
+         |    IF(id % 3 = 1, NULL, id + 1) AS x,
+         |    IF(id % 3 = 1, NULL, id + 2) AS y
+         |  FROM range(100)
+         |) AS data_source;""".stripMargin
+
+    // TODO fix it in spark3.5
+    if (!isSparkVersionGE("3.5")) {
+      nativeWrite2(
+        format => (table(format), create(format), insert(format)),
+        (table_name, format) => {
+          val vanilla_table = s"${table_name}_v"
+          val vanilla_create = create(format, Some(vanilla_table))
+          vanillaWrite {
+            withDestinationTable(vanilla_table, Option(vanilla_create)) {
+              checkInsertQuery(insert(format, Some(vanilla_table)), 
checkNative = false)
+            }
+          }
+          val rowsFromOriginTable =
+            spark.sql(s"select * from $vanilla_table").collect()
+          val dfFromWriteTable =
+            spark.sql(s"select * from $table_name")
+          checkAnswer(dfFromWriteTable, rowsFromOriginTable)
+        }
+      )
+    }
+  }
 }
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 62a70f06c1..003f111133 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
 CH_BRANCH=rebase_ch/20241118
-CH_COMMIT=7f22fe487c88d3b988ea82a5c34882da23ea6289
+CH_COMMIT=a5944dfb7b3
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index de1a263218..88c5303c50 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -710,6 +710,7 @@ void BackendInitializerUtil::initSettings(const 
SparkConfigs::ConfigMap & spark_
     settings.set("max_parsing_threads", 1);
     settings.set("max_download_threads", 1);
     settings.set("input_format_parquet_enable_row_group_prefetch", false);
+    settings.set("output_format_parquet_use_custom_encoder", false);
 
     /// update per https://github.com/ClickHouse/ClickHouse/pull/71539
     /// if true, we can't get correct metrics for the query
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
index e9fd4f358a..e5a2d89f26 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
@@ -19,13 +19,134 @@
 #include <QueryPipeline/QueryPipeline.h>
 #include <Poco/URI.h>
 #include <Common/DebugUtils.h>
+#include <Columns/ColumnConst.h>
+#include <Columns/ColumnArray.h>
+#include <Columns/ColumnMap.h>
 
 namespace local_engine
 {
 
+using namespace DB;
+
 const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"};
 const std::string 
SparkPartitionedBaseSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
 
+/// For Nullable(Map(K, V)) or Nullable(Array(T)), if the i-th row is null, we 
must make sure its nested data is empty.
+/// It is for ORC/Parquet writing compatiability. For more details, refer to
+/// https://github.com/apache/incubator-gluten/issues/8022 and 
https://github.com/apache/incubator-gluten/issues/8021
+static ColumnPtr truncateNestedDataIfNull(const ColumnPtr & column)
+{
+    if (const auto * col_const = checkAndGetColumn<ColumnConst>(column.get()))
+    {
+        size_t s = col_const->size();
+        auto new_data = 
truncateNestedDataIfNull(col_const->getDataColumnPtr());
+        return ColumnConst::create(std::move(new_data), s);
+    }
+    else if (const auto * col_array = 
checkAndGetColumn<ColumnArray>(column.get()))
+    {
+        auto new_data = truncateNestedDataIfNull(col_array->getDataPtr());
+        return ColumnArray::create(std::move(new_data), 
col_array->getOffsetsPtr());
+    }
+    else if (const auto * col_map = checkAndGetColumn<ColumnMap>(column.get()))
+    {
+        auto new_nested = 
truncateNestedDataIfNull(col_map->getNestedColumnPtr());
+        return ColumnMap::create(std::move(new_nested));
+    }
+    else if (const auto * col_tuple = 
checkAndGetColumn<ColumnTuple>(column.get()))
+    {
+        Columns new_columns;
+        for (size_t i = 0; i < col_tuple->tupleSize(); ++i)
+            
new_columns.emplace_back(truncateNestedDataIfNull(col_tuple->getColumnPtr(i)));
+        return ColumnTuple::create(std::move(new_columns));
+    }
+    else if (const auto * col_nullable = 
checkAndGetColumn<ColumnNullable>(column.get()))
+    {
+        const auto & null_map = col_nullable->getNullMapData();
+        auto nested = 
truncateNestedDataIfNull(col_nullable->getNestedColumnPtr());
+        const auto * nested_array = 
checkAndGetColumn<ColumnArray>(nested.get());
+        const auto * nested_map = checkAndGetColumn<ColumnMap>(nested.get());
+        const auto * nested_tuple = 
checkAndGetColumn<ColumnTuple>(nested.get());
+
+        if (!memoryIsZero(null_map.data(), 0, null_map.size()) && 
(nested_array || nested_map || nested_tuple))
+        {
+            /// Process Nullable(Array) or Nullable(Map)
+            if (nested_array || nested_map)
+            {
+                if (!nested_array)
+                    nested_array = 
checkAndGetColumn<ColumnArray>(&nested_map->getNestedColumn());
+
+                const auto & offsets = nested_array->getOffsets();
+                size_t total_data_size = 0;
+                for (size_t i = 0; i < null_map.size(); ++i)
+                    total_data_size += (offsets[i] - offsets[i - 1]) * 
(!null_map[i]);
+
+                auto new_nested_array = nested_array->cloneEmpty();
+                new_nested_array->reserve(nested_array->size());
+                auto & new_nested_array_data = assert_cast<ColumnArray 
&>(*new_nested_array).getData();
+                new_nested_array_data.reserve(total_data_size);
+
+                for (size_t i = 0; i < null_map.size(); ++i)
+                    if (null_map[i])
+                        new_nested_array->insertDefault();
+                    else
+                        new_nested_array->insertFrom(*nested_array, i);
+
+                if (nested_map)
+                {
+                    auto new_nested_map = 
ColumnMap::create(std::move(new_nested_array));
+                    return ColumnNullable::create(std::move(new_nested_map), 
col_nullable->getNullMapColumnPtr());
+                }
+                else
+                {
+                    return ColumnNullable::create(std::move(new_nested_array), 
col_nullable->getNullMapColumnPtr());
+                }
+            }
+            else
+            {
+                /// Process Nullable(Tuple)
+                const auto & nested_columns = nested_tuple->getColumns();
+                Columns new_nested_columns(nested_columns.size());
+                for (size_t i = 0; i < nested_columns.size(); ++i)
+                {
+                    const auto & nested_column = nested_columns[i];
+                    TypeIndex type_index = nested_column->getDataType();
+                    if (const auto * nullable_nested_column = 
checkAndGetColumn<ColumnNullable>(nested_column.get()))
+                        type_index = 
nullable_nested_column->getNestedColumnPtr()->getDataType();
+
+                    bool should_truncate = type_index == TypeIndex::Array || 
type_index == TypeIndex::Map || type_index == TypeIndex::Tuple;
+                    if (should_truncate)
+                    {
+                        auto new_nested_column = nested_column->cloneEmpty();
+                        new_nested_column->reserve(nested_column->size());
+                        for (size_t j = 0; j < null_map.size(); ++j)
+                        {
+                            if (null_map[j])
+                                new_nested_column->insertDefault();
+                            else
+                                new_nested_column->insertFrom(*nested_column, 
j);
+                        }
+                        new_nested_columns[i] = std::move(new_nested_column);
+                    }
+                    else
+                    {
+                        new_nested_columns[i] = nested_column;
+                    }
+                }
+
+                auto new_nested_tuple = 
ColumnTuple::create(std::move(new_nested_columns));
+                return ColumnNullable::create(std::move(new_nested_tuple), 
col_nullable->getNullMapColumnPtr());
+            }
+        }
+        else
+        {
+            auto new_nested = truncateNestedDataIfNull(nested);
+            return ColumnNullable::create(std::move(new_nested), 
col_nullable->getNullMapColumnPtr());
+        }
+    }
+    else
+        return column;
+}
+
 NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const 
DB::ContextPtr & context_) : file(file_), context(context_)
 {
 }
@@ -51,6 +172,8 @@ void NormalFileWriter::write(DB::Block & block)
             continue;
 
         const auto & preferred_column = 
preferred_schema.getByPosition(index++);
+        /// Make sure nested array or map data is empty when the row is null 
in Nullable(Map(K, V)) or Nullable(Array(T)).
+        column.column = truncateNestedDataIfNull(column.column);
         column.column = DB::castColumn(column, preferred_column.type);
         column.name = preferred_column.name;
         column.type = preferred_column.type;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to