This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ac8e03a1d6 [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250116) 
(#8544)
ac8e03a1d6 is described below

commit ac8e03a1d6dd3327416779f5c4986ee4fe73bb29
Author: Kyligence Git <[email protected]>
AuthorDate: Thu Jan 16 07:28:03 2025 -0600

    [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250116) (#8544)
    
    * [GLUTEN-1632][CH]Daily Update Clickhouse Version (20250116)
    
    * Fix ut due to https://github.com/ClickHouse/ClickHouse/pull/73651
    
    * In C++, The declaration order determines the construction order: member 
variables are constructed in the order they are declared, and the order in the 
initialization list does not affect this. The destruction order is the reverse 
of the construction order: the last constructed member is the first to be 
destructed.
    
    since output depends on write_buffer, we need declare write_buffer first.
    
    * Another way to fix https://github.com/ClickHouse/ClickHouse/pull/73651, 
this also fix "write into hdfs in spark 3.5"
    
    * fix style
    
    ---------
    
    Co-authored-by: kyligence-git <[email protected]>
    Co-authored-by: Chang Chen <[email protected]>
---
 .../backendsapi/clickhouse/CHIteratorApi.scala     |  8 +--
 .../backendsapi/clickhouse/RuntimeSettings.scala   | 11 ++++
 .../sql/execution/CHColumnarWriteFilesExec.scala   |  1 -
 .../GlutenClickHouseExcelFormatSuite.scala         | 58 +++++++++++++++++++---
 cpp-ch/clickhouse.version                          |  4 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              | 22 +++++---
 .../Storages/Output/NormalFileWriter.h             | 12 ++++-
 .../Storages/Output/OutputFormatFile.h             |  7 ++-
 .../Storages/Output/ParquetOutputFormatFile.cpp    |  1 -
 9 files changed, 100 insertions(+), 24 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index c6ec95ded5..0b20e5aeea 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -106,12 +106,8 @@ class CHIteratorApi extends IteratorApi with Logging with 
LogLevelUtil {
       updateInputMetrics,
       updateInputMetrics.map(_ => context.taskMetrics().inputMetrics).orNull)
 
-    context.addTaskFailureListener(
-      (ctx, _) => {
-        if (ctx.isInterrupted()) {
-          iter.cancel()
-        }
-      })
+    context.addTaskFailureListener((ctx, _) => { iter.cancel() })
+
     context.addTaskCompletionListener[Unit](_ => iter.close())
     new CloseableCHColumnBatchIterator(iter, Some(pipelineTime))
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
index 4365b2987c..412ce145ed 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/RuntimeSettings.scala
@@ -36,6 +36,17 @@ object RuntimeSettings {
       
.doc("https://clickhouse.com/docs/en/operations/settings/query-complexity#settings-max_bytes_before_external_sort";)
       .longConf
       .createWithDefault(0)
+
+  // TODO: support check value
+  val OUTPUT_FORMAT_COMPRESSION_LEVEL =
+    buildConf(runtimeSettings("output_format_compression_level"))
+      
.doc(s"""https://clickhouse.com/docs/en/operations/settings/settings#output_format_compression_level
+              | Notes: we always use Snappy compression, and Snappy doesn't 
support compression level.
+              | Currently, we ONLY set it in UT.
+              |""".stripMargin)
+      .longConf
+      .createWithDefault(Integer.MIN_VALUE & 0xffffffffL)
+  // .checkValue(v => v >= 0, "COMPRESSION LEVEL must be greater than 0")
   // scalastyle:on line.size.limit
 
   /** Gluten Configuration */
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
index d43d23bfc8..efc7138fe9 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWriteFilesExec.scala
@@ -104,7 +104,6 @@ class CHColumnarWriteFilesRDD(
          * otherwise, we need to access ColumnarBatch row by row, which is not 
efficient.
          */
         val writeResults = 
CHExecUtil.c2r(resultColumnarBatch).map(_.copy()).toSeq
-        // TODO: we need close iterator here before processing the result.
         // TODO: task commit time
         // TODO: get the schema from result ColumnarBatch and verify it.
         assert(!iter.hasNext)
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
index 76afd602ce..495e896150 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseExcelFormatSuite.scala
@@ -16,10 +16,11 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.backendsapi.clickhouse.CHConf
+import org.apache.gluten.backendsapi.clickhouse.{CHConf, RuntimeSettings}
 import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.exception.GlutenException
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.{functions, DataFrame, Row}
 import org.apache.spark.sql.execution.LocalTableScanExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -1464,16 +1465,14 @@ class GlutenClickHouseExcelFormatSuite
     fileName
   }
 
-  /** TODO: fix the issue and test in spark 3.5 */
-  testSparkVersionLE33("write into hdfs") {
+  test("write into hdfs") {
 
     /**
      * There is a bug in pipeline write to HDFS; when a pipeline returns 
column batch, it doesn't
      * close the hdfs file, and hence the file is not flushed.HDFS file is 
closed when LocalExecutor
      * is destroyed, but before that, the file moved by spark committer.
      */
-    val tableName = "write_into_hdfs"
-    val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/"
+    val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/write_into_hdfs/"
     val format = "parquet"
     val sql =
       s"""
@@ -1485,4 +1484,51 @@ class GlutenClickHouseExcelFormatSuite
       testFileFormatBase(tablePath, format, sql, df => {})
     }
   }
+
+  // TODO: pass spark configuration to FileFormatWriter in Spark 3.3 and 3.2
+  testWithSpecifiedSparkVersion(
+    "write failed if set wrong snappy compression codec level",
+    Some("3.5")) {
+    // TODO: remove duplicated test codes
+    val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/failed_test/"
+    val format = "parquet"
+    val sql =
+      s"""
+         | select *
+         | from $format.`$tablePath`
+         | where long_field > 30
+         |""".stripMargin
+
+    withSQLConf(
+      (GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"),
+      (
+        RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key,
+        
RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.defaultValue.get.toString)
+    ) {
+      testFileFormatBase(tablePath, format, sql, df => {})
+    }
+
+    // we can't pass the configuration to FileFormatWriter in Spark 3.3 and 3.2
+    withSQLConf(
+      (GlutenConfig.NATIVE_WRITER_ENABLED.key, "true"),
+      (RuntimeSettings.OUTPUT_FORMAT_COMPRESSION_LEVEL.key, "3")
+    ) {
+      val sparkError = intercept[SparkException] {
+        testFileFormatBase(tablePath, format, sql, df => {})
+      }
+
+      // throw at org.apache.spark.sql.execution.CHColumnarWriteFilesRDD
+      val causeOuter = sparkError.getCause
+      assert(causeOuter.isInstanceOf[SparkException])
+      assert(causeOuter.getMessage.contains("Task failed while writing rows to 
output path: hdfs"))
+
+      // throw at the writing file
+      val causeInner = causeOuter.getCause
+      assert(causeInner.isInstanceOf[GlutenException])
+      assert(
+        causeInner.getMessage.contains(
+          "Invalid: Codec 'snappy' doesn't support setting a compression 
level"))
+    }
+
+  }
 }
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 45287fab51..dfeb3287e3 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20250115
-CH_COMMIT=8e0d5eaf0fc
+CH_BRANCH=rebase_ch/20250116
+CH_COMMIT=a260339b40c
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index b2dfa3cf44..37d1a1bbd7 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -58,6 +58,7 @@
 #include <Storages/MergeTree/StorageMergeTreeFactory.h>
 #include <Storages/Output/WriteBufferBuilder.h>
 #include <Storages/SubstraitSource/ReadBufferBuilder.h>
+#include <arrow/util/compression.h>
 #include <boost/algorithm/string/case_conv.hpp>
 #include <sys/resource.h>
 #include <Poco/Logger.h>
@@ -91,6 +92,7 @@ extern const SettingsDouble 
max_bytes_ratio_before_external_sort;
 extern const SettingsBool query_plan_merge_filters;
 extern const SettingsBool compile_expressions;
 extern const SettingsShortCircuitFunctionEvaluation 
short_circuit_function_evaluation;
+extern const SettingsUInt64 output_format_compression_level;
 }
 namespace ErrorCodes
 {
@@ -640,23 +642,31 @@ void BackendInitializerUtil::initSettings(const 
SparkConfigs::ConfigMap & spark_
     settings.set("input_format_parquet_enable_row_group_prefetch", false);
     settings.set("output_format_parquet_use_custom_encoder", false);
 
+    //1.
     // TODO: we need set Setting::max_threads to 1 by default, but now we 
can't get correct metrics for the some query if we set it to 1.
     // settings[Setting::max_threads] = 1;
 
-    /// Set false after https://github.com/ClickHouse/ClickHouse/pull/71539
-    /// if true, we can't get correct metrics for the query
+    /// 2. After https://github.com/ClickHouse/ClickHouse/pull/71539
+    /// Set false to query_plan_merge_filters.
+    /// If true, we can't get correct metrics for the query
     settings[Setting::query_plan_merge_filters] = false;
 
+    /// 3. After https://github.com/ClickHouse/ClickHouse/pull/70598.
+    /// Set false to compile_expressions to avoid dead loop.
+    /// TODO: FIXME set true again.
     /// We now set BuildQueryPipelineSettings according to config.
-    // TODO: FIXME. Set false after 
https://github.com/ClickHouse/ClickHouse/pull/70598.
     settings[Setting::compile_expressions] = false;
     settings[Setting::short_circuit_function_evaluation] = 
ShortCircuitFunctionEvaluation::DISABLE;
-    ///
 
-    // After https://github.com/ClickHouse/ClickHouse/pull/73422
-    // Since we already set max_bytes_before_external_sort, set 
max_bytes_ratio_before_external_sort to 0
+    /// 4. After https://github.com/ClickHouse/ClickHouse/pull/73422
+    /// Since we already set max_bytes_before_external_sort, set 
max_bytes_ratio_before_external_sort to 0
     settings[Setting::max_bytes_ratio_before_external_sort] = 0.;
 
+    /// 5. After https://github.com/ClickHouse/ClickHouse/pull/73651.
+    /// See following settings, we always use Snappy compression for Parquet, 
however after https://github.com/ClickHouse/ClickHouse/pull/73651,
+    /// output_format_compression_level is set to 3, which is wrong, since 
snappy does not support it.
+    settings[Setting::output_format_compression_level] = 
arrow::util::kUseDefaultCompressionLevel;
+
     for (const auto & [key, value] : spark_conf_map)
     {
         // Firstly apply 
spark.gluten.sql.columnar.backend.ch.runtime_config.local_engine.settings.* to 
settings
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 9e848f82d7..d12ccaae60 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -371,14 +371,24 @@ protected:
     }
     void onFinish() override
     {
-        if (output_format_) [[unlikely]]
+        if (output_format_)
         {
             output_format_->finalizeOutput();
+            /// We need close reset output_format_ here before return to 
spark, because the file is closed in ~WriteBufferFromHDFSImpl().
+            /// So that Spark Commit protocol can move the file safely.
+            output_format_.reset();
             assert(delta_stats_.row_count > 0);
             if (stats_)
                 stats_->collectStats(relative_path_, partition_id_, 
delta_stats_);
         }
     }
+    void onCancel() noexcept override
+    {
+        if (output_format_) {
+            output_format_->cancel();
+            output_format_.reset();
+        }
+    }
 };
 
 class SparkPartitionedBaseSink : public DB::PartitionedSink
diff --git a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h 
b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
index 915f9a7e7e..a2a7ab052e 100644
--- a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
+++ b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
@@ -29,14 +29,19 @@ class OutputFormatFile
 public:
     struct OutputFormat
     {
-        DB::OutputFormatPtr output;
         std::unique_ptr<DB::WriteBuffer> write_buffer;
+        DB::OutputFormatPtr output;
         void finalizeOutput() const
         {
             output->finalize();
             output->flush();
             write_buffer->finalize();
         }
+        void cancel()
+        {
+            output.reset();
+            write_buffer->finalize();
+        }
     };
     using OutputFormatPtr = std::shared_ptr<OutputFormat>;
 
diff --git a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp 
b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp
index f3ac41c19a..684c433db3 100644
--- a/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/Output/ParquetOutputFormatFile.cpp
@@ -25,7 +25,6 @@
 #    include <Processors/Formats/Impl/ArrowBufferedStreams.h>
 #    include <Processors/Formats/Impl/CHColumnToArrowColumn.h>
 #    include <Processors/Formats/Impl/ParquetBlockOutputFormat.h>
-#    include <parquet/arrow/writer.h>
 
 namespace local_engine
 {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to