This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new b2706cba5a [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241129) 
(#8087)
b2706cba5a is described below

commit b2706cba5a9efa95d249a32523bd03eb3b3290fe
Author: Kyligence Git <[email protected]>
AuthorDate: Fri Nov 29 18:56:01 2024 -0600

    [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241129) (#8087)
    
    * [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241129)
    
    * Fix Build AND UT Due to [Added cache for primary 
index](https://github.com/ClickHouse/ClickHouse/pull/72102)
    
    * Fix Build and UT due to [no auto write buffer finalization in 
destructors](https://github.com/ClickHouse/ClickHouse/pull/68800)
    
    - Make LocalPartitionWriter::evictPartitions called, e.g. 
set(GlutenConfig.COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD.key, (1024*1024).toString)
    
    * Fix Build due to [Save several minutes of build 
time](https://github.com/ClickHouse/ClickHouse/pull/72046)
    
    * Fix Benchmark Build due to [Scatter blocks in hash join without 
copying](https://github.com/ClickHouse/ClickHouse/pull/67782)
    
    (cherry picked from commit 8d566d6a8b8785e4072ffd6f774eb83b07ac3d8d)
    
    * Fix Benchmark Build
    
    * Fix endless loop due to 
https://github.com/ClickHouse/ClickHouse/pull/70598
    
    * [Refactor #8100] using CHConf.setCHConfig()
    
    * fix style
    
    ---------
    
    Co-authored-by: kyligence-git <[email protected]>
    Co-authored-by: Chang Chen <[email protected]>
---
 .../GlutenClickHouseTPCHBucketSuite.scala          |  5 ++---
 .../tpcds/GlutenClickHouseTPCDSParquetSuite.scala  |  5 ++---
 ...ckHouseTPCHColumnarShuffleParquetAQESuite.scala |  1 +
 .../GlutenClickHouseTPCHParquetBucketSuite.scala   |  5 ++---
 cpp-ch/clickhouse.version                          |  4 ++--
 cpp-ch/local-engine/Common/CHUtil.cpp              | 26 ++++++++++++++++++----
 cpp-ch/local-engine/Common/GlutenSignalHandler.cpp |  4 +---
 .../local-engine/Functions/FunctionGreatestLeast.h |  9 +++++---
 .../Functions/SparkFunctionArrayJoin.cpp           | 13 ++++++-----
 .../Functions/SparkFunctionArraysOverlap.cpp       | 11 ++++-----
 .../SparkFunctionCheckDecimalOverflow.cpp          |  1 +
 .../SparkFunctionDecimalBinaryArithmetic.cpp       |  3 +--
 .../Functions/SparkFunctionMakeDecimal.cpp         |  4 ++--
 .../Functions/SparkFunctionMapToString.h           |  7 +++---
 .../Functions/SparkFunctionSplitByRegexp.cpp       |  2 +-
 .../local-engine/Parser/SerializedPlanParser.cpp   | 11 +++++----
 .../Parser/scalar_function_parser/arrayExcept.cpp  |  1 +
 .../Parser/scalar_function_parser/arrayRemove.cpp  |  2 +-
 .../Parser/scalar_function_parser/arrayRepeat.cpp  |  3 ++-
 .../Parser/scalar_function_parser/bitLength.cpp    |  5 +++--
 .../Parser/scalar_function_parser/elementAt.cpp    |  2 ++
 .../Parser/scalar_function_parser/length.cpp       |  5 +++--
 .../Parser/scalar_function_parser/locate.cpp       |  3 ++-
 .../Parser/scalar_function_parser/octetLength.cpp  |  5 +++--
 cpp-ch/local-engine/Shuffle/PartitionWriter.cpp    |  6 +++++
 cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp    |  1 +
 cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp      | 17 ++++++--------
 cpp-ch/local-engine/Shuffle/ShuffleWriter.h        |  2 +-
 cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp  | 20 ++++++++---------
 .../Storages/MergeTree/MergeSparkMergeTreeTask.cpp |  8 ++++++-
 .../Storages/MergeTree/MergeSparkMergeTreeTask.h   |  3 ++-
 .../Storages/MergeTree/MetaDataHelper.cpp          |  1 +
 .../Storages/MergeTree/SparkStorageMergeTree.cpp   |  1 -
 .../Storages/Output/NormalFileWriter.cpp           |  4 ++++
 .../Storages/Output/NormalFileWriter.h             |  4 +---
 .../Storages/Output/OutputFormatFile.h             |  7 +++++-
 .../tests/benchmark_cast_float_function.cpp        |  4 ++--
 .../local-engine/tests/benchmark_local_engine.cpp  |  2 +-
 .../tests/benchmark_spark_floor_function.cpp       |  8 +++----
 .../tests/benchmark_to_datetime_function.cpp       |  4 ++--
 .../tests/benchmark_unix_timestamp_function.cpp    |  8 +++----
 cpp-ch/local-engine/tests/gluten_test_util.h       |  2 +-
 cpp-ch/local-engine/tests/gtest_ch_functions.cpp   | 10 ++++-----
 cpp-ch/local-engine/tests/gtest_local_engine.cpp   |  1 +
 44 files changed, 151 insertions(+), 99 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
index 7bccb6dfb5..90e09e75f1 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -40,6 +40,7 @@ class GlutenClickHouseTPCHBucketSuite
   override protected val queriesResults: String = rootPath + 
"bucket-queries-output"
 
   override protected def sparkConf: SparkConf = {
+    import org.apache.gluten.backendsapi.clickhouse.CHConf._
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
       .set("spark.io.compression.codec", "LZ4")
@@ -47,9 +48,7 @@ class GlutenClickHouseTPCHBucketSuite
       .set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket 
join
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", 
"sparkMurmurHash3_32")
-      .set(
-        
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
-        "true")
+      .setCHConfig("enable_grace_aggregate_spill_test", "true")
   }
 
   override protected val createNullableTables = true
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
index 0ba7de90c6..f0025cf30c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpcds/GlutenClickHouseTPCDSParquetSuite.scala
@@ -30,6 +30,7 @@ class GlutenClickHouseTPCDSParquetSuite extends 
GlutenClickHouseTPCDSAbstractSui
 
   /** Run Gluten + ClickHouse Backend with SortShuffleManager */
   override protected def sparkConf: SparkConf = {
+    import org.apache.gluten.backendsapi.clickhouse.CHConf._
     super.sparkConf
       .set("spark.shuffle.manager", "sort")
       .set("spark.io.compression.codec", "snappy")
@@ -38,9 +39,7 @@ class GlutenClickHouseTPCDSParquetSuite extends 
GlutenClickHouseTPCDSAbstractSui
       .set("spark.memory.offHeap.size", "4g")
       .set("spark.gluten.sql.validation.logLevel", "ERROR")
       .set("spark.gluten.sql.validation.printStackOnFailure", "true")
-      .set(
-        
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
-        "true")
+      .setCHConfig("enable_grace_aggregate_spill_test", "true")
   }
 
   executeTPCDSTest(false)
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index ad9cb854d9..b4186fee66 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -51,6 +51,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", 
"sparkMurmurHash3_32")
       .setCHConfig("enable_streaming_aggregating", true)
+      .set(GlutenConfig.COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD.key, (1024 * 
1024).toString)
   }
 
   override protected def createTPCHNotNullTables(): Unit = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
index a257e2ed50..7a927bf23a 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHParquetBucketSuite.scala
@@ -46,6 +46,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
   protected val bucketTableDataPath: String = basePath + "/tpch-parquet-bucket"
 
   override protected def sparkConf: SparkConf = {
+    import org.apache.gluten.backendsapi.clickhouse.CHConf._
     super.sparkConf
       .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
       .set("spark.io.compression.codec", "LZ4")
@@ -53,9 +54,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
       .set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket 
join
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", 
"sparkMurmurHash3_32")
-      .set(
-        
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
-        "true")
+      .setCHConfig("enable_grace_aggregate_spill_test", "true")
   }
 
   override protected val createNullableTables = true
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 003f111133..edb13fdc57 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
-CH_BRANCH=rebase_ch/20241118
-CH_COMMIT=a5944dfb7b3
+CH_BRANCH=rebase_ch/20241129
+CH_COMMIT=101ba3f944d1
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 88c5303c50..310b39d3e5 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -76,6 +76,12 @@
 
 namespace DB
 {
+namespace ServerSetting
+{
+extern const ServerSettingsString primary_index_cache_policy;
+extern const ServerSettingsUInt64 primary_index_cache_size;
+extern const ServerSettingsDouble primary_index_cache_size_ratio;
+}
 namespace Setting
 {
 extern const SettingsUInt64 prefer_external_sort_block_bytes;
@@ -712,11 +718,13 @@ void BackendInitializerUtil::initSettings(const 
SparkConfigs::ConfigMap & spark_
     settings.set("input_format_parquet_enable_row_group_prefetch", false);
     settings.set("output_format_parquet_use_custom_encoder", false);
 
-    /// update per https://github.com/ClickHouse/ClickHouse/pull/71539
+    /// Set false after https://github.com/ClickHouse/ClickHouse/pull/71539
     /// if true, we can't get correct metrics for the query
     settings[Setting::query_plan_merge_filters] = false;
+
     /// We now set BuildQueryPipelineSettings according to config.
-    settings[Setting::compile_expressions] = true;
+    // TODO: FIXME. Set false after 
https://github.com/ClickHouse/ClickHouse/pull/70598.
+    settings[Setting::compile_expressions] = false;
     settings[Setting::short_circuit_function_evaluation] = 
ShortCircuitFunctionEvaluation::DISABLE;
     ///
 
@@ -820,6 +828,10 @@ void 
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
     /// Make sure global_context and shared_context are constructed only once.
     if (auto global_context = QueryContext::globalMutableContext(); 
!global_context)
     {
+        ServerSettings server_settings;
+        server_settings.loadSettingsFromConfig(*config);
+
+        auto log = getLogger("CHUtil");
         global_context = QueryContext::createGlobal();
         global_context->makeGlobalContext();
         global_context->setConfig(config);
@@ -844,10 +856,16 @@ void 
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
         size_t mark_cache_size = config->getUInt64("mark_cache_size", 
DEFAULT_MARK_CACHE_MAX_SIZE);
         double mark_cache_size_ratio = 
config->getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO);
         if (!mark_cache_size)
-            LOG_ERROR(&Poco::Logger::get("CHUtil"), "Too low mark cache size 
will lead to severe performance degradation.");
-
+            LOG_ERROR(log, "Mark cache is disabled, it will lead to severe 
performance degradation.");
+        LOG_INFO(log, "mark cache size to {}.", 
formatReadableSizeWithBinarySuffix(mark_cache_size));
         global_context->setMarkCache(mark_cache_policy, mark_cache_size, 
mark_cache_size_ratio);
 
+        String primary_index_cache_policy = 
server_settings[ServerSetting::primary_index_cache_policy];
+        size_t primary_index_cache_size = 
server_settings[ServerSetting::primary_index_cache_size];
+        double primary_index_cache_size_ratio = 
server_settings[ServerSetting::primary_index_cache_size_ratio];
+        LOG_INFO(log, "Primary index cache size to {}.", 
formatReadableSizeWithBinarySuffix(primary_index_cache_size));
+        global_context->setPrimaryIndexCache(primary_index_cache_policy, 
primary_index_cache_size, primary_index_cache_size_ratio);
+
         String index_uncompressed_cache_policy
             = config->getString("index_uncompressed_cache_policy", 
DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY);
         size_t index_uncompressed_cache_size
diff --git a/cpp-ch/local-engine/Common/GlutenSignalHandler.cpp 
b/cpp-ch/local-engine/Common/GlutenSignalHandler.cpp
index 44c43fcb65..712d8ddcf5 100644
--- a/cpp-ch/local-engine/Common/GlutenSignalHandler.cpp
+++ b/cpp-ch/local-engine/Common/GlutenSignalHandler.cpp
@@ -104,7 +104,7 @@ static void writeSignalIDtoSignalPipe(int sig)
     char buf[signal_pipe_buf_size];
     WriteBufferFromFileDescriptor out(writeFD(), signal_pipe_buf_size, buf);
     writeBinary(sig, out);
-    out.next();
+    out.finalize();
     errno = saved_errno;
 }
 
@@ -251,9 +251,7 @@ private:
             query = thread_ptr->getQueryForLog();
 
             if (auto logs_queue = thread_ptr->getInternalTextLogsQueue())
-            {
                 CurrentThread::attachInternalTextLogsQueue(logs_queue, 
LogsLevel::trace);
-            }
         }
         std::string signal_description = "Unknown signal";
 
diff --git a/cpp-ch/local-engine/Functions/FunctionGreatestLeast.h 
b/cpp-ch/local-engine/Functions/FunctionGreatestLeast.h
index 6930c1d75b..e9b66df84e 100644
--- a/cpp-ch/local-engine/Functions/FunctionGreatestLeast.h
+++ b/cpp-ch/local-engine/Functions/FunctionGreatestLeast.h
@@ -14,9 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <Functions/LeastGreatestGeneric.h>
-#include <DataTypes/getLeastSupertype.h>
+#pragma once
 #include <DataTypes/DataTypeNullable.h>
+#include <DataTypes/getLeastSupertype.h>
+#include <Functions/FunctionBinaryArithmetic.h>
+#include <Functions/FunctionFactory.h>
+#include <Functions/LeastGreatestGeneric.h>
 
 namespace DB
 {
@@ -64,7 +67,7 @@ private:
                 else
                 {
                     auto cmp_result = 
converted_columns[arg]->compareAt(row_num, row_num, 
*converted_columns[best_arg], 1);
-                    if (cmp_result <  0)
+                    if (cmp_result < 0)
                         best_arg = arg;
                 }
             }
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp
index 4c2847d9f9..bf65b25347 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp
@@ -14,13 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <base/StringRef.h>
-#include <Interpreters/Context_fwd.h>
-#include <Columns/ColumnString.h>
+#include <Columns/ColumnArray.h>
+#include <Columns/ColumnConst.h>
 #include <Columns/ColumnNullable.h>
-#include <Functions/IFunction.h>
-#include <Functions/FunctionFactory.h>
+#include <Columns/ColumnString.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/DataTypeString.h>
+#include <Functions/FunctionFactory.h>
+#include <Functions/IFunction.h>
+#include <Interpreters/Context_fwd.h>
+#include <base/StringRef.h>
 
 using namespace DB;
 
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionArraysOverlap.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionArraysOverlap.cpp
index e43b528231..ea841632a9 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionArraysOverlap.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionArraysOverlap.cpp
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include <Columns/ColumnString.h>
+#include <Columns/ColumnArray.h>
 #include <Columns/ColumnNullable.h>
-#include <Functions/IFunction.h>
-#include <Functions/FunctionFactory.h>
-#include <DataTypes/DataTypeString.h>
+#include <Columns/ColumnString.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/DataTypesNumber.h>
+#include <Functions/FunctionFactory.h>
+#include <Functions/IFunction.h>
 
 using namespace DB;
 
@@ -92,7 +93,7 @@ public:
                         {
                             res_data[i] = 1;
                             null_map_data[i] = 0;
-                            break;  
+                            break;
                         }
                     }
                 }
diff --git 
a/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp
index c75d25b6ef..8b5a7eff65 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionCheckDecimalOverflow.cpp
@@ -19,6 +19,7 @@
 #include <Columns/ColumnDecimal.h>
 #include <Columns/ColumnNullable.h>
 #include <Columns/ColumnsNumber.h>
+#include <Core/DecimalFunctions.h>
 #include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/DataTypesDecimal.h>
 #include <Functions/FunctionFactory.h>
diff --git 
a/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
index 830fc0e652..f89943fc7a 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionDecimalBinaryArithmetic.cpp
@@ -19,6 +19,7 @@
 #include <Columns/ColumnDecimal.h>
 #include <Columns/ColumnNullable.h>
 #include <Columns/ColumnsNumber.h>
+#include <Core/DecimalFunctions.h>
 #include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/DataTypesDecimal.h>
 #include <Functions/FunctionFactory.h>
@@ -26,8 +27,6 @@
 #include <Functions/IFunction.h>
 #include <Functions/castTypeToEither.h>
 #include <Common/CurrentThread.h>
-#include <Common/GlutenDecimalUtils.h>
-#include <Common/ProfileEvents.h>
 #include <Common/Stopwatch.h>
 
 namespace DB
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionMakeDecimal.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionMakeDecimal.cpp
index 795e2b0be3..f136f587c5 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionMakeDecimal.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionMakeDecimal.cpp
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 #include <Columns/ColumnNullable.h>
+#include <Core/DecimalFunctions.h>
 #include <DataTypes/DataTypesDecimal.h>
 #include <Functions/FunctionFactory.h>
 #include <Functions/FunctionHelpers.h>
 #include <Functions/IFunction.h>
-#include "SparkFunctionCheckDecimalOverflow.h"
-
+#include <Functions/SparkFunctionCheckDecimalOverflow.h>
 
 namespace DB
 {
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionMapToString.h 
b/cpp-ch/local-engine/Functions/SparkFunctionMapToString.h
index 3f8a0c97dc..5541245244 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionMapToString.h
+++ b/cpp-ch/local-engine/Functions/SparkFunctionMapToString.h
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
+#pragma once
 #include <memory>
+#include <Columns/ColumnMap.h>
 #include <Columns/ColumnNullable.h>
 #include <Columns/ColumnStringHelpers.h>
-#include <Columns/ColumnMap.h>
 #include <Columns/ColumnsNumber.h>
 #include <DataTypes/DataTypeNullable.h>
-#include <DataTypes/DataTypesNumber.h>
 #include <DataTypes/DataTypeString.h>
+#include <DataTypes/DataTypesNumber.h>
 #include <Formats/FormatFactory.h>
 #include <Functions/FunctionFactory.h>
+#include <Functions/FunctionHelpers.h>
 #include <IO/WriteHelpers.h>
 
 namespace DB
diff --git a/cpp-ch/local-engine/Functions/SparkFunctionSplitByRegexp.cpp 
b/cpp-ch/local-engine/Functions/SparkFunctionSplitByRegexp.cpp
index 66f37c6203..1868c40c0f 100644
--- a/cpp-ch/local-engine/Functions/SparkFunctionSplitByRegexp.cpp
+++ b/cpp-ch/local-engine/Functions/SparkFunctionSplitByRegexp.cpp
@@ -20,8 +20,8 @@
 #include <Functions/FunctionFactory.h>
 #include <Functions/FunctionHelpers.h>
 #include <Functions/FunctionTokens.h>
+#include <Functions/IFunctionAdaptors.h>
 #include <Functions/Regexps.h>
-#include <Common/StringUtils.h>
 #include <base/map.h>
 #include <Common/assert_cast.h>
 
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp 
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 95086121d4..820a99ad3b 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -20,6 +20,7 @@
 #include <string>
 #include <string_view>
 #include <AggregateFunctions/AggregateFunctionFactory.h>
+#include <Columns/ColumnConst.h>
 #include <Core/Block.h>
 #include <Core/ColumnWithTypeAndName.h>
 #include <Core/Names.h>
@@ -160,15 +161,17 @@ void SerializedPlanParser::adjustOutput(const 
DB::QueryPlanPtr & query_plan, con
             else
             {
                 need_final_project = true;
-                bool need_const = origin_column.column && 
isColumnConst(*origin_column.column);
-                if (need_const)
+                if (origin_column.column && 
isColumnConst(*origin_column.column))
                 {
+                    /// For const column, we need to cast it individually. 
Otherwise, the const column will be converted to full column in
+                    /// ActionsDAG::makeConvertingActions.
+                    /// Note: creating fianl_column with Field of 
origin_column will cause Exception in some case.
                     const DB::ContextPtr context = 
DB::CurrentThread::get().getQueryContext();
                     const FunctionOverloadResolverPtr & cast_resolver = 
FunctionFactory::instance().get("CAST", context);
                     const DataTypePtr string_type = 
std::make_shared<DataTypeString>();
                     ColumnWithTypeAndName to_type_column = 
{string_type->createColumnConst(1, final_type->getName()), string_type, 
"__cast_const__"};
                     FunctionBasePtr cast_function = 
cast_resolver->build({origin_column, to_type_column});
-                    ColumnPtr const_col = 
ColumnConst::create(cast_function->execute({origin_column, to_type_column}, 
final_type, 1), 1);
+                    ColumnPtr const_col = 
ColumnConst::create(cast_function->execute({origin_column, to_type_column}, 
final_type, 1, false), 1);
                     ColumnWithTypeAndName final_column(const_col, final_type, 
origin_column.name);
                     final_columns.emplace_back(std::move(final_column));
                 }
@@ -310,7 +313,7 @@ DB::QueryPipelineBuilderPtr 
SerializedPlanParser::buildQueryPipeline(DB::QueryPl
     BuildQueryPipelineSettings build_settings = 
BuildQueryPipelineSettings::fromContext(context);
     build_settings.process_list_element = query_status;
     build_settings.progress_callback = nullptr;
-    return query_plan.buildQueryPipeline(optimization_settings,build_settings);
+    return query_plan.buildQueryPipeline(optimization_settings, 
build_settings);
 }
 
 std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const 
std::string_view plan)
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/arrayExcept.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/arrayExcept.cpp
index 8857fbbf5d..a9a0f305a0 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/arrayExcept.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/arrayExcept.cpp
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 #include <DataTypes/DataTypeArray.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <Functions/FunctionsMiscellaneous.h>
 #include <Parser/FunctionParser.h>
 #include <Common/Exception.h>
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRemove.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRemove.cpp
index f45544cfa4..27bd9f84a9 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRemove.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRemove.cpp
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 #include <DataTypes/DataTypeArray.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <Functions/FunctionsMiscellaneous.h>
 #include <Parser/FunctionParser.h>
 #include <Common/Exception.h>
 #include <Common/assert_cast.h>
-
 namespace DB
 {
 namespace ErrorCodes
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRepeat.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRepeat.cpp
index 581fd6f665..04f4f64e7b 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRepeat.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/arrayRepeat.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 #include <DataTypes/DataTypeArray.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <Functions/FunctionsMiscellaneous.h>
 #include <Parser/FunctionParser.h>
 #include <Common/Exception.h>
@@ -56,7 +57,7 @@ public:
         const auto * const_zero_node = addColumnToActionsDAG(actions_dag, 
n_not_null_arg->result_type, {0});
         const auto * greatest_node = toFunctionNode(actions_dag, "greatest", 
{n_not_null_arg, const_zero_node});
         const auto * range_node = toFunctionNode(actions_dag, "range", 
{greatest_node});
-        const auto & range_type = assert_cast<const DataTypeArray & 
>(*removeNullable(range_node->result_type));
+        const auto & range_type = assert_cast<const DataTypeArray 
&>(*removeNullable(range_node->result_type));
 
         // Create lambda function x -> elem
         ActionsDAG lambda_actions_dag;
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/bitLength.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/bitLength.cpp
index ebd89f8fa8..d663052a8e 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/bitLength.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/bitLength.cpp
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-#include <Parser/FunctionParser.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/IDataType.h>
+#include <Parser/FunctionParser.h>
 
 namespace DB
 {
@@ -57,7 +58,7 @@ public:
         const auto * const_eight_node = addColumnToActionsDAG(actions_dag, 
std::make_shared<DataTypeInt32>(), 8);
         const auto * result_node = toFunctionNode(actions_dag, "multiply", 
{octet_length_node, const_eight_node});
 
-        return convertNodeTypeIfNeeded(substrait_func, result_node, 
actions_dag);;
+        return convertNodeTypeIfNeeded(substrait_func, result_node, 
actions_dag);
     }
 };
 
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/elementAt.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/elementAt.cpp
index 1586870541..80797dedca 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/elementAt.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/elementAt.cpp
@@ -16,7 +16,9 @@
  */
 
 #include <DataTypes/DataTypeMap.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/IDataType.h>
+#include <Functions/FunctionHelpers.h>
 #include <Parser/FunctionParser.h>
 
 namespace DB
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/length.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/length.cpp
index 34a7348b9a..b4e4ad119f 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/length.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/length.cpp
@@ -15,9 +15,10 @@
  * limitations under the License.
  */
 
-#include <Parser/FunctionParser.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/IDataType.h>
 #include <Parser/ExpressionParser.h>
+#include <Parser/FunctionParser.h>
 
 namespace DB
 {
@@ -71,7 +72,7 @@ public:
         else
             result_node = toFunctionNode(actions_dag, "char_length", 
{new_arg});
 
-        return convertNodeTypeIfNeeded(substrait_func, result_node, 
actions_dag);;
+        return convertNodeTypeIfNeeded(substrait_func, result_node, 
actions_dag);
     }
 };
 
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/locate.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/locate.cpp
index c4f0234957..a9ad4d9aee 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/locate.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/locate.cpp
@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
+#include <DataTypes/DataTypeNullable.h>
+#include <Parser/ExpressionParser.h>
 #include <Parser/FunctionParser.h>
 #include <Common/BlockTypeUtils.h>
 #include <Common/CHUtil.h>
-#include <Parser/ExpressionParser.h>
 
 namespace DB
 {
diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/octetLength.cpp 
b/cpp-ch/local-engine/Parser/scalar_function_parser/octetLength.cpp
index 1dd61587e0..d57d8d661b 100644
--- a/cpp-ch/local-engine/Parser/scalar_function_parser/octetLength.cpp
+++ b/cpp-ch/local-engine/Parser/scalar_function_parser/octetLength.cpp
@@ -15,8 +15,9 @@
  * limitations under the License.
  */
 
-#include <Parser/FunctionParser.h>
+#include <DataTypes/DataTypeNullable.h>
 #include <DataTypes/IDataType.h>
+#include <Parser/FunctionParser.h>
 
 namespace DB
 {
@@ -52,7 +53,7 @@ public:
             new_arg = toFunctionNode(actions_dag, "CAST", {arg, 
string_type_node});
         }
         const auto * octet_length_node = toFunctionNode(actions_dag, 
"octet_length", {new_arg});
-        return convertNodeTypeIfNeeded(substrait_func, octet_length_node, 
actions_dag);;
+        return convertNodeTypeIfNeeded(substrait_func, octet_length_node, 
actions_dag);
     }
 };
 
diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp 
b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
index aa9e15b9f0..43459f20c5 100644
--- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
@@ -172,6 +172,8 @@ size_t LocalPartitionWriter::evictPartitions()
         split_result->total_compress_time += 
compressed_output.getCompressTime();
         split_result->total_write_time += compressed_output.getWriteTime();
         split_result->total_serialize_time += 
serialization_time_watch.elapsedNanoseconds();
+        compressed_output.finalize();
+        output.finalize();
     };
 
     Stopwatch spill_time_watch;
@@ -342,6 +344,8 @@ size_t MemorySortLocalPartitionWriter::evictPartitions()
         split_result->total_compress_time += 
compressed_output.getCompressTime();
         split_result->total_io_time += compressed_output.getWriteTime();
         split_result->total_serialize_time += 
serialization_time_watch.elapsedNanoseconds();
+        compressed_output.finalize();
+        output.finalize();
     };
 
     Stopwatch spill_time_watch;
@@ -428,6 +432,8 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions()
         split_result->total_compress_time += 
compressed_output.getCompressTime();
         split_result->total_io_time += compressed_output.getWriteTime();
         split_result->total_serialize_time += 
serialization_time_watch.elapsedNanoseconds();
+        compressed_output.finalize();
+        output.finalize();
     };
 
     Stopwatch spill_time_watch;
diff --git a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp 
b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
index 02baa4a9c0..ab4cfc18c8 100644
--- a/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
+++ b/cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
@@ -17,6 +17,7 @@
 #include "SelectorBuilder.h"
 #include <limits>
 #include <memory>
+#include <Columns/ColumnConst.h>
 #include <Columns/ColumnMap.h>
 #include <Columns/ColumnNullable.h>
 #include <DataTypes/DataTypeArray.h>
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp 
b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
index 8aa624ff99..7167dabfad 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
+++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
@@ -53,23 +53,20 @@ void ShuffleWriter::write(const Block & block)
         native_writer->write(block);
     }
 }
-void ShuffleWriter::flush()
+void ShuffleWriter::flush() const
 {
     if (native_writer)
-    {
         native_writer->flush();
-    }
 }
+
 ShuffleWriter::~ShuffleWriter()
 {
     if (native_writer)
-    {
         native_writer->flush();
-        if (compression_enable)
-        {
-            compressed_out->finalize();
-        }
-        write_buffer->finalize();
-    }
+
+    if (compression_enable)
+        compressed_out->finalize();
+
+    write_buffer->finalize();
 }
 }
diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h 
b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
index 541e93e034..94886210c1 100644
--- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
+++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h
@@ -27,7 +27,7 @@ public:
         jobject output_stream, jbyteArray buffer, const std::string & 
codecStr, jint level, bool enable_compression, size_t customize_buffer_size);
     virtual ~ShuffleWriter();
     void write(const DB::Block & block);
-    void flush();
+    void flush() const;
 
 private:
     std::unique_ptr<DB::WriteBuffer> compressed_out;
diff --git a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp 
b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp
index a78d615be6..c40b474e7a 100644
--- a/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp
+++ b/cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp
@@ -16,15 +16,15 @@
  */
 #include "SparkExchangeSink.h"
 
+#include <Processors/Sinks/NullSink.h>
 #include <Processors/Transforms/AggregatingTransform.h>
+#include <QueryPipeline/QueryPipelineBuilder.h>
 #include <Shuffle/PartitionWriter.h>
-#include <jni/jni_common.h>
+#include <Storages/IO/AggregateSerializationUtils.h>
+#include <boost/algorithm/string/case_conv.hpp>
 #include <jni/CelebornClient.h>
+#include <jni/jni_common.h>
 #include <Poco/StringTokenizer.h>
-#include <Processors/Sinks/NullSink.h>
-#include <QueryPipeline/QueryPipelineBuilder.h>
-#include <boost/algorithm/string/case_conv.hpp>
-#include <Storages/IO/AggregateSerializationUtils.h>
 
 
 namespace DB
@@ -74,7 +74,7 @@ void SparkExchangeSink::consume(Chunk chunk)
 void SparkExchangeSink::onFinish()
 {
     Stopwatch wall_time;
-    if (!dynamic_cast<LocalPartitionWriter*>(partition_writer.get()))
+    if (!dynamic_cast<LocalPartitionWriter *>(partition_writer.get()))
     {
         partition_writer->evictPartitions();
     }
@@ -222,8 +222,7 @@ void SparkExchangeManager::finish()
         std::vector<Spillable::ExtraData> extra_datas;
         for (const auto & writer : partition_writers)
         {
-            LocalPartitionWriter * local_partition_writer = 
dynamic_cast<LocalPartitionWriter *>(writer.get());
-            if (local_partition_writer)
+            if (LocalPartitionWriter * local_partition_writer = 
dynamic_cast<LocalPartitionWriter *>(writer.get()))
             {
                 
extra_datas.emplace_back(local_partition_writer->getExtraData());
             }
@@ -232,12 +231,13 @@ void SparkExchangeManager::finish()
             chassert(extra_datas.size() == partition_writers.size());
         WriteBufferFromFile output(options.data_file, options.io_buffer_size);
         split_result.partition_lengths = mergeSpills(output, infos, 
extra_datas);
+        output.finalize();
     }
 
     split_result.wall_time += wall_time.elapsedNanoseconds();
 }
 
-void checkPartitionLengths(const std::vector<UInt64> & partition_length,size_t 
partition_num)
+void checkPartitionLengths(const std::vector<UInt64> & partition_length, 
size_t partition_num)
 {
     if (partition_num != partition_length.size())
     {
@@ -284,7 +284,7 @@ void SparkExchangeManager::mergeSplitResult()
 std::vector<SpillInfo> SparkExchangeManager::gatherAllSpillInfo() const
 {
     std::vector<SpillInfo> res;
-    for (const auto& writer : partition_writers)
+    for (const auto & writer : partition_writers)
     {
         if (Spillable * spillable = dynamic_cast<Spillable *>(writer.get()))
         {
diff --git a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp
index cecb630874..ee6930e4de 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.cpp
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 #include "MergeSparkMergeTreeTask.h"
-#include <Storages/MergeTree/SparkStorageMergeTree.h>
 
 #include <Interpreters/TransactionLog.h>
 #include <Storages/MergeTree/MergeTreeData.h>
 #include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
+#include <Storages/MergeTree/SparkStorageMergeTree.h>
 #include <Common/ProfileEvents.h>
 #include <Common/ProfileEventsScope.h>
 #include <Common/ThreadFuzzer.h>
@@ -94,6 +94,12 @@ bool MergeSparkMergeTreeTask::executeStep()
 }
 
 
+void MergeSparkMergeTreeTask::cancel() noexcept
+{
+    if (merge_task)
+        merge_task->cancel();
+}
+
 void MergeSparkMergeTreeTask::prepare()
 {
     future_part = merge_mutate_entry->future_part;
diff --git a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h 
b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h
index ac167da3fb..60b3328f0d 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/MergeSparkMergeTreeTask.h
@@ -65,6 +65,7 @@ public:
         txn_holder = std::move(txn_holder_);
         txn = std::move(txn_);
     }
+    void cancel() noexcept override;
 
 private:
     void prepare();
@@ -116,7 +117,7 @@ private:
 using MergeSparkMergeTreeTaskPtr = std::shared_ptr<MergeSparkMergeTreeTask>;
 
 
-[[ maybe_unused ]] static void executeHere(MergeSparkMergeTreeTaskPtr task)
+[[maybe_unused]] static void executeHere(MergeSparkMergeTreeTaskPtr task)
 {
     while (task->executeStep()) {}
 }
diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
index 958421022b..84dbc3a8d3 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
@@ -190,6 +190,7 @@ void restoreMetaData<LOCAL>(
                     auto item_path = part_path / item.first;
                     auto out = metadata_disk->writeFile(item_path);
                     out->write(item.second.data(), item.second.size());
+                    out->finalize();
                 }
             };
             thread_pool.scheduleOrThrow(job);
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index 17587e5200..5669489f54 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -504,7 +504,6 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeDataWriter::writeTempPart(
         txn ? txn->tid : Tx::PrehistoricTID,
         false,
         false,
-        false,
         context->getWriteSettings());
 
     out->writeWithPermutation(block, perm_ptr);
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
index e5a2d89f26..ad2e3abf7b 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
@@ -191,7 +191,11 @@ void NormalFileWriter::close()
     /// When insert into a table with empty dataset, NormalFileWriter::consume 
would be never called.
     /// So we need to skip when writer is nullptr.
     if (writer)
+    {
         writer->finish();
+        assert(output_format);
+        output_format->finalizeOutput();
+    }
 }
 
 OutputFormatFilePtr createOutputFormatFile(
diff --git a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h 
b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index d557037418..8cfe079d92 100644
--- a/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -293,9 +293,7 @@ protected:
     {
         if (output_format_) [[unlikely]]
         {
-            output_format_->output->finalize();
-            output_format_->output->flush();
-            output_format_->write_buffer->finalize();
+            output_format_->finalizeOutput();
             assert(delta_stats_.row_count > 0);
             if (stats_)
                 stats_->collectStats(relative_path_, partition_id_, 
delta_stats_);
diff --git a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h 
b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
index e94923f77a..915f9a7e7e 100644
--- a/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
+++ b/cpp-ch/local-engine/Storages/Output/OutputFormatFile.h
@@ -29,9 +29,14 @@ class OutputFormatFile
 public:
     struct OutputFormat
     {
-    public:
         DB::OutputFormatPtr output;
         std::unique_ptr<DB::WriteBuffer> write_buffer;
+        void finalizeOutput() const
+        {
+            output->finalize();
+            output->flush();
+            write_buffer->finalize();
+        }
     };
     using OutputFormatPtr = std::shared_ptr<OutputFormat>;
 
diff --git a/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp 
b/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp
index 4ef9b5771a..a50bcf170e 100644
--- a/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_cast_float_function.cpp
@@ -52,7 +52,7 @@ static void BM_CHCastFloatToInt(benchmark::State & state)
     args.emplace_back(type_name_col);
     auto executable = function->build(args);
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 static void BM_SparkCastFloatToInt(benchmark::State & state)
@@ -63,7 +63,7 @@ static void BM_SparkCastFloatToInt(benchmark::State & state)
     Block block = createDataBlock(30000000);
     auto executable = function->build(block.getColumnsWithTypeAndName());
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 BENCHMARK(BM_CHCastFloatToInt)->Unit(benchmark::kMillisecond)->Iterations(100);
diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp 
b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp
index 13e74abaee..eacfb1781b 100644
--- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp
@@ -846,7 +846,7 @@ QueryPlanPtr joinPlan(QueryPlanPtr left, QueryPlanPtr 
right, String left_key, St
     auto hash_join = std::make_shared<HashJoin>(join, 
right->getCurrentHeader());
 
     QueryPlanStepPtr join_step
-        = std::make_unique<JoinStep>(left->getCurrentHeader(), 
right->getCurrentHeader(), hash_join, block_size, 1, false);
+        = std::make_unique<JoinStep>(left->getCurrentHeader(), 
right->getCurrentHeader(), hash_join, block_size, 0, 1, false);
 
     std::vector<QueryPlanPtr> plans;
     plans.emplace_back(std::move(left));
diff --git a/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp 
b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
index ef961f21cb..a672fdee35 100644
--- a/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_spark_floor_function.cpp
@@ -66,7 +66,7 @@ static void BM_CHFloorFunction_For_Int64(benchmark::State & 
state)
     auto executable = function->build(int64_block.getColumnsWithTypeAndName());
     for (auto _ : state)
     {
-        auto result = 
executable->execute(int64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), int64_block.rows());
+        auto result = 
executable->execute(int64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), int64_block.rows(), false);
         benchmark::DoNotOptimize(result);
     }
 }
@@ -80,7 +80,7 @@ static void BM_CHFloorFunction_For_Float64(benchmark::State & 
state)
     auto executable = 
function->build(float64_block.getColumnsWithTypeAndName());
     for (auto _ : state)
     {
-        auto result = 
executable->execute(float64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), float64_block.rows());
+        auto result = 
executable->execute(float64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), float64_block.rows(), false);
         benchmark::DoNotOptimize(result);
     }
 }
@@ -94,7 +94,7 @@ static void BM_SparkFloorFunction_For_Int64(benchmark::State 
& state)
     auto executable = function->build(int64_block.getColumnsWithTypeAndName());
     for (auto _ : state)
     {
-        auto result = 
executable->execute(int64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), int64_block.rows());
+        auto result = 
executable->execute(int64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), int64_block.rows(), false);
         benchmark::DoNotOptimize(result);
     }
 }
@@ -108,7 +108,7 @@ static void 
BM_SparkFloorFunction_For_Float64(benchmark::State & state)
     auto executable = 
function->build(float64_block.getColumnsWithTypeAndName());
     for (auto _ : state)
     {
-        auto result = 
executable->execute(float64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), float64_block.rows());
+        auto result = 
executable->execute(float64_block.getColumnsWithTypeAndName(), 
executable->getResultType(), float64_block.rows(), false);
         benchmark::DoNotOptimize(result);
     }
 }
diff --git a/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp 
b/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp
index c721251633..49f9dde989 100644
--- a/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_to_datetime_function.cpp
@@ -45,7 +45,7 @@ static void BM_CHParseDateTime64(benchmark::State & state)
     Block block = createDataBlock(30000000);
     auto executable = function->build(block.getColumnsWithTypeAndName());
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 
@@ -57,7 +57,7 @@ static void BM_SparkParseDateTime64(benchmark::State & state)
     Block block = createDataBlock(30000000);
     auto executable = function->build(block.getColumnsWithTypeAndName());
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 BENCHMARK(BM_CHParseDateTime64)->Unit(benchmark::kMillisecond)->Iterations(50);
diff --git a/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp 
b/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp
index e7abfda7a2..a7dc3ffa2b 100644
--- a/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp
+++ b/cpp-ch/local-engine/tests/benchmark_unix_timestamp_function.cpp
@@ -49,7 +49,7 @@ static void BM_CHUnixTimestamp_For_Date32(benchmark::State & 
state)
     Block block = createDataBlock("Date32", 30000000);
     auto executable = function->build(block.getColumnsWithTypeAndName());
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 static void BM_CHUnixTimestamp_For_Date(benchmark::State & state)
@@ -60,7 +60,7 @@ static void BM_CHUnixTimestamp_For_Date(benchmark::State & 
state)
     Block block = createDataBlock("Date", 30000000);
     auto executable = function->build(block.getColumnsWithTypeAndName());
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 static void BM_SparkUnixTimestamp_For_Date32(benchmark::State & state)
@@ -71,7 +71,7 @@ static void BM_SparkUnixTimestamp_For_Date32(benchmark::State 
& state)
     Block block = createDataBlock("Date32", 30000000);
     auto executable = function->build(block.getColumnsWithTypeAndName());
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 static void BM_SparkUnixTimestamp_For_Date(benchmark::State & state)
@@ -82,7 +82,7 @@ static void BM_SparkUnixTimestamp_For_Date(benchmark::State & 
state)
     Block block = createDataBlock("Date", 30000000);
     auto executable = function->build(block.getColumnsWithTypeAndName());
     for (auto _ : state) [[maybe_unused]]
-        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+        auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 }
 
 
BENCHMARK(BM_CHUnixTimestamp_For_Date32)->Unit(benchmark::kMillisecond)->Iterations(100);
diff --git a/cpp-ch/local-engine/tests/gluten_test_util.h 
b/cpp-ch/local-engine/tests/gluten_test_util.h
index 799a6d7967..a616126629 100644
--- a/cpp-ch/local-engine/tests/gluten_test_util.h
+++ b/cpp-ch/local-engine/tests/gluten_test_util.h
@@ -16,9 +16,9 @@
  */
 
 #pragma once
+#include "testConfig.h"
 
 #include <string>
-#include <testConfig.h>
 #include <Core/Block.h>
 #include <Core/ColumnsWithTypeAndName.h>
 #include <Core/NamesAndTypes.h>
diff --git a/cpp-ch/local-engine/tests/gtest_ch_functions.cpp 
b/cpp-ch/local-engine/tests/gtest_ch_functions.cpp
index e905bc1787..3b91e07994 100644
--- a/cpp-ch/local-engine/tests/gtest_ch_functions.cpp
+++ b/cpp-ch/local-engine/tests/gtest_ch_functions.cpp
@@ -47,7 +47,7 @@ TEST(TestFuntion, Hash)
     std::cerr << "input:\n";
     debug::headBlock(block);
     auto executable = function->build(block.getColumnsWithTypeAndName());
-    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
     std::cerr << "output:\n";
     debug::headColumn(result);
     ASSERT_EQ(result->getUInt(0), result->getUInt(1));
@@ -89,7 +89,7 @@ TEST(TestFunction, In)
     std::cerr << "input:\n";
     debug::headBlock(block);
     auto executable = function->build(block.getColumnsWithTypeAndName());
-    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
     std::cerr << "output:\n";
     debug::headColumn(result);
     ASSERT_EQ(result->getUInt(3), 0);
@@ -133,7 +133,7 @@ TEST(TestFunction, NotIn1)
     std::cerr << "input:\n";
     debug::headBlock(block);
     auto executable = function->build(block.getColumnsWithTypeAndName());
-    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
     std::cerr << "output:\n";
     debug::headColumn(result);
     ASSERT_EQ(result->getUInt(3), 1);
@@ -176,14 +176,14 @@ TEST(TestFunction, NotIn2)
     std::cerr << "input:\n";
     debug::headBlock(block);
     auto executable = function->build(block.getColumnsWithTypeAndName());
-    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows());
+    auto result = executable->execute(block.getColumnsWithTypeAndName(), 
executable->getResultType(), block.rows(), false);
 
     auto function_not = factory.get("not", 
local_engine::QueryContext::globalContext());
     auto type_bool = DataTypeFactory::instance().get("UInt8");
     ColumnsWithTypeAndName columns2 = {ColumnWithTypeAndName(result, 
type_bool, "string0")};
     Block block2(columns2);
     auto executable2 = function_not->build(block2.getColumnsWithTypeAndName());
-    auto result2 = executable2->execute(block2.getColumnsWithTypeAndName(), 
executable2->getResultType(), block2.rows());
+    auto result2 = executable2->execute(block2.getColumnsWithTypeAndName(), 
executable2->getResultType(), block2.rows(), false);
     std::cerr << "output:\n";
     debug::headColumn(result2);
     ASSERT_EQ(result2->getUInt(3), 1);
diff --git a/cpp-ch/local-engine/tests/gtest_local_engine.cpp 
b/cpp-ch/local-engine/tests/gtest_local_engine.cpp
index 5f9b6f280e..06e94e051b 100644
--- a/cpp-ch/local-engine/tests/gtest_local_engine.cpp
+++ b/cpp-ch/local-engine/tests/gtest_local_engine.cpp
@@ -67,6 +67,7 @@ TEST(ReadBufferFromFile, seekBackwards)
         WriteBufferFromFile out(tmp_file->path());
         for (size_t i = 0; i < N; ++i)
             writeIntBinary(i, out);
+        out.finalize();
     }
 
     ReadBufferFromFile in(tmp_file->path(), BUF_SIZE);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to