This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8efe3e4e5 [GLUTEN-5476][CH] Triger merge on insert task (#5529)
8efe3e4e5 is described below

commit 8efe3e4e551694f1596b1b76758f81ddffd14807
Author: Shuai li <[email protected]>
AuthorDate: Mon May 6 14:48:50 2024 +0800

    [GLUTEN-5476][CH] Triger merge on insert task (#5529)
    
    What changes were proposed in this pull request?
    (Fixes: #5476)
    
    How was this patch tested?
    Test by ut
---
 .../datasources/CHDatasourceJniWrapper.java        |   4 +-
 .../delta/ClickhouseOptimisticTransaction.scala    |  13 +-
 .../datasources/v1/CHMergeTreeWriterInjects.scala  |  25 +-
 .../GlutenClickHouseMergeTreeOptimizeSuite.scala   |  31 +++
 ...tenClickHouseMergeTreePathBasedWriteSuite.scala |   7 +-
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala |   3 +
 ...ergeTreeWriteOnObjectStorageAbstractSuite.scala |  93 -------
 .../GlutenClickHouseMergeTreeWriteSuite.scala      |   3 +
 .../GlutenClickHouseTableAfterRestart.scala        |   3 +
 ...lutenClickHouseWholeStageTransformerSuite.scala |   3 +
 cpp-ch/local-engine/Common/CHUtil.cpp              |  48 ++--
 cpp-ch/local-engine/Common/CHUtil.h                |  71 ++++-
 cpp-ch/local-engine/Common/QueryContext.cpp        |   1 +
 .../Storages/CustomStorageMergeTree.cpp            |  48 +++-
 .../local-engine/Storages/CustomStorageMergeTree.h |   5 +-
 .../Storages/Mergetree/MetaDataHelper.cpp          |  61 +++++
 .../Storages/Mergetree/MetaDataHelper.h            |  10 +
 .../Storages/Mergetree/SparkMergeTreeWriter.cpp    | 289 +++++++++++++++++++--
 .../Storages/Mergetree/SparkMergeTreeWriter.h      |  48 ++--
 .../jni/ReservationListenerWrapper.cpp             |   9 +
 .../local-engine/jni/ReservationListenerWrapper.h  |   4 +
 cpp-ch/local-engine/local_engine_jni.cpp           |  76 +++---
 22 files changed, 603 insertions(+), 252 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
 
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
index 9ca301efb..c041ee352 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
@@ -27,7 +27,9 @@ public class CHDatasourceJniWrapper {
       String uuid,
       String taskId,
       String partition_dir,
-      String bucket_dir);
+      String bucket_dir,
+      byte[] confArray,
+      long allocId);
 
   public native String nativeMergeMTParts(
       byte[] plan,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index c73b7e7af..e31560259 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.delta
 
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
 import org.apache.gluten.execution.ColumnarToRowExecBase
 
 import org.apache.spark.SparkException
@@ -109,7 +110,7 @@ class ClickhouseOptimisticTransaction(
 
       // Retain only a minimal selection of Spark writer options to avoid any 
potential
       // compatibility issues
-      val options = writeOptions match {
+      var options = writeOptions match {
         case None => Map.empty[String, String]
         case Some(writeOptions) =>
           writeOptions.options.filterKeys {
@@ -119,6 +120,16 @@ class ClickhouseOptimisticTransaction(
           }.toMap
       }
 
+      spark.conf.getAll.foreach(
+        entry => {
+          if (
+            
entry._1.startsWith(s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings")
+            || 
entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
+          ) {
+            options += (entry._1 -> entry._2)
+          }
+        })
+
       try {
         val tableV2 = ClickHouseTableV2.getTable(deltaLog)
         MergeTreeFileFormatWriter.write(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index 64aa8863b..8a61385fc 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -16,10 +16,13 @@
  */
 package org.apache.spark.sql.execution.datasources.v1
 
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.memory.alloc.CHNativeMemoryAllocators
 import org.apache.gluten.substrait.`type`.ColumnTypeNode
 import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
StringMapNode}
+import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode, 
ExtensionBuilder}
 import org.apache.gluten.substrait.plan.PlanBuilder
 import org.apache.gluten.substrait.rel.{ExtensionTableBuilder, RelBuilder}
 
@@ -39,7 +42,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
 import java.util.{ArrayList => JList, Map => JMap, UUID}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
+
 case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte])
 
 class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
@@ -47,10 +50,7 @@ class CHMergeTreeWriterInjects extends 
GlutenFormatWriterInjectsBase {
   override def nativeConf(
       options: Map[String, String],
       compressionCodec: String): JMap[String, String] = {
-    // pass options to native so that velox can take user-specified conf to 
write parquet,
-    // i.e., compression, block size, block rows.
-    val sparkOptions = new mutable.HashMap[String, String]()
-    sparkOptions.asJava
+    options.asJava
   }
 
   override def createOutputWriter(
@@ -95,7 +95,7 @@ class CHMergeTreeWriterInjects extends 
GlutenFormatWriterInjectsBase {
       clickhouseTableConfigs,
       tableSchema.toAttributes // use table schema instead of data schema
     )
-
+    val allocId = CHNativeMemoryAllocators.contextInstance.getNativeInstanceId
     val datasourceJniWrapper = new CHDatasourceJniWrapper()
     val instance =
       datasourceJniWrapper.nativeInitMergeTreeWriterWrapper(
@@ -104,7 +104,9 @@ class CHMergeTreeWriterInjects extends 
GlutenFormatWriterInjectsBase {
         uuid,
         context.getTaskAttemptID.getTaskID.getId.toString,
         
context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"),
-        
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str")
+        
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"),
+        buildNativeConf(nativeConf),
+        allocId
       )
 
     new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, 
instance, path)
@@ -121,6 +123,13 @@ class CHMergeTreeWriterInjects extends 
GlutenFormatWriterInjectsBase {
   override def getFormatName(): String = {
     "mergetree"
   }
+
+  private def buildNativeConf(confs: JMap[String, String]): Array[Byte] = {
+    val stringMapNode: StringMapNode = ExpressionBuilder.makeStringMap(confs)
+    val extensionNode: AdvancedExtensionNode = 
ExtensionBuilder.makeAdvancedExtension(
+      
BackendsApiManager.getTransformerApiInstance.packPBMessage(stringMapNode.toProtobuf))
+    PlanBuilder.makePlan(extensionNode).toProtobuf.toByteArray
+  }
 }
 
 object CHMergeTreeWriterInjects {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index 9635b9958..ae0cd170d 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -24,6 +24,8 @@ import io.delta.tables.ClickhouseTable
 
 import java.io.File
 
+import scala.concurrent.duration.DurationInt
+
 // Some sqls' line length exceeds 100
 // scalastyle:off line.size.limit
 
@@ -54,6 +56,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
         "spark.databricks.delta.retentionDurationCheck.enabled",
         "false"
       ) // otherwise RETAIN 0 HOURS will fail
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+        "false")
   }
 
   override protected def createTPCHNotNullTables(): Unit = {
@@ -426,5 +431,31 @@ class GlutenClickHouseMergeTreeOptimizeSuite
     val ret = spark.sql(s"select count(*) from 
clickhouse.`$dataPath`").collect()
     assert(ret.apply(0).get(0) == 600572)
   }
+
+  test("test mergetree insert with optimize basic") {
+    withSQLConf(
+      ("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
+      
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
 -> "true")
+    ) {
+      spark.sql(s"""
+                   |DROP TABLE IF EXISTS 
lineitem_mergetree_insert_optimize_basic;
+                   |""".stripMargin)
+
+      spark.sql(s"""
+                   |CREATE TABLE IF NOT EXISTS 
lineitem_mergetree_insert_optimize_basic
+                   |USING clickhouse
+                   |LOCATION 
'$basePath/lineitem_mergetree_insert_optimize_basic'
+                   | as select * from lineitem
+                   |""".stripMargin)
+
+      val ret = spark.sql("select count(*) from 
lineitem_mergetree_insert_optimize_basic").collect()
+      assert(ret.apply(0).get(0) == 600572)
+      eventually(timeout(60.seconds), interval(3.seconds)) {
+        assert(
+          new 
File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length 
== 2
+        )
+      }
+    }
+  }
 }
 // scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index d85217536..93f22baa2 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -57,6 +57,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
       .set(
         
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
         "100000")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+        "false")
   }
 
   override protected def createTPCHNotNullTables(): Unit = {
@@ -170,8 +173,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
       .format("clickhouse")
       .load(dataPath)
       .where("l_shipdate = date'1998-09-02'")
-      .collect()
-    assert(result.apply(0).get(0) == 110501)
+      .count()
+    assert(result == 183)
   }
 
   test("test mergetree path based write with dataframe api") {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 49011e031..ca5b39fff 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
       .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
       .set("spark.sql.adaptive.enabled", "true")
       .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"error")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+        "false")
   }
 
   override protected def beforeEach(): Unit = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
deleted file mode 100644
index 90db0f5d8..000000000
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.GlutenConfig
-
-import org.apache.spark.sql.SparkSession
-
-import _root_.org.apache.spark.SparkConf
-import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.commons.io.FileUtils
-
-import java.io.File
-
-// Some sqls' line length exceeds 100
-// scalastyle:off line.size.limit
-
-class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
-  extends GlutenClickHouseTPCHAbstractSuite
-  with AdaptiveSparkPlanHelper {
-  private var _spark: SparkSession = _
-
-  override protected def spark: SparkSession = _spark
-
-  override protected val needCopyParquetToTablePath = true
-
-  override protected val tablesPath: String = basePath + "/tpch-data"
-  override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
-  override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
-
-  override protected def initializeSession(): Unit = {
-    if (_spark == null) {
-      _spark = SparkSession
-        .builder()
-        .appName("Gluten-UT-RemoteHS")
-        .config(sparkConf)
-        .getOrCreate()
-    }
-  }
-
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf
-      .setMaster("local[2]")
-      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
-      .set("spark.io.compression.codec", "LZ4")
-      .set("spark.sql.shuffle.partitions", "5")
-      .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
-      .set("spark.sql.adaptive.enabled", "true")
-      .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level", 
"error")
-  }
-  override protected def createTPCHNotNullTables(): Unit = {
-    createNotNullTPCHTablesInParquet(tablesPath)
-  }
-
-  override protected def afterAll(): Unit = {
-    try {
-      super.afterAll()
-    } finally {
-      try {
-        if (_spark != null) {
-          try {
-            _spark.sessionState.catalog.reset()
-          } finally {
-            _spark.stop()
-            _spark = null
-          }
-        }
-      } finally {
-        SparkSession.clearActiveSession()
-        SparkSession.clearDefaultSession()
-      }
-    }
-
-    FileUtils.forceDelete(new File(basePath))
-    // init GlutenConfig in the next beforeAll
-    GlutenConfig.ins = null
-  }
-}
-// scalastyle:off line.size.limit
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 6f10035fa..439a1b58f 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteSuite
       .set(
         
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
         "100000")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+        "false")
   }
 
   override protected def createTPCHNotNullTables(): Unit = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
index 36002b7e5..a673d4ba3 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
@@ -58,6 +58,9 @@ class GlutenClickHouseTableAfterRestart
       .set(
         
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
         "100000")
+      .set(
+        
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+        "false")
   }
 
   override protected def createTPCHNotNullTables(): Unit = {
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index e455c956d..a891d6d10 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -161,6 +161,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends 
WholeStageTransformerSu
   }
 
   override def beforeAll(): Unit = {
+    // is not exist may cause some ut error
+    assert(new File("/data").exists())
+
     // prepare working paths
     val basePathDir = new File(basePath)
     if (basePathDir.exists()) {
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index ba86fe306..9704b3041 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -323,34 +323,6 @@ std::string PlanUtil::explainPlan(DB::QueryPlan & plan)
     return plan_str;
 }
 
-std::vector<MergeTreeUtil::Path> MergeTreeUtil::getAllMergeTreeParts(const 
Path & storage_path)
-{
-    if (!fs::exists(storage_path))
-        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree 
store path:{}", storage_path.string());
-
-    // TODO: May need to check the storage format version
-    std::vector<fs::path> res;
-    for (const auto & entry : fs::directory_iterator(storage_path))
-    {
-        auto filename = entry.path().filename();
-        if (filename == "format_version.txt" || filename == "detached" || 
filename == "_delta_log")
-            continue;
-        res.push_back(entry.path());
-    }
-    return res;
-}
-
-DB::NamesAndTypesList MergeTreeUtil::getSchemaFromMergeTreePart(const fs::path 
& part_path)
-{
-    DB::NamesAndTypesList names_types_list;
-    if (!fs::exists(part_path))
-        throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree 
store path:{}", part_path.string());
-    DB::ReadBufferFromFile readbuffer((part_path / "columns.txt").string());
-    names_types_list.readText(readbuffer);
-    return names_types_list;
-}
-
-
 NestedColumnExtractHelper::NestedColumnExtractHelper(const DB::Block & block_, 
bool case_insentive_)
     : block(block_), case_insentive(case_insentive_)
 {
@@ -594,10 +566,21 @@ void 
BackendInitializerUtil::initEnvs(DB::Context::ConfigurationPtr config)
         spark_user = spark_user_c_str;
 }
 
+DB::Field BackendInitializerUtil::toField(const String key, const String value)
+{
+    if (BOOL_VALUE_SETTINGS.contains(key))
+        return DB::Field(value == "true" || value == "1");
+    else if (LONG_VALUE_SETTINGS.contains(key))
+        return DB::Field(std::strtoll(value.c_str(), NULL, 10));
+    else
+        return DB::Field(value);
+}
+
 void BackendInitializerUtil::initSettings(std::map<std::string, std::string> & 
backend_conf_map, DB::Settings & settings)
 {
     /// Initialize default setting.
     settings.set("date_time_input_format", "best_effort");
+    settings.set("mergetree.merge_after_insert", true);
 
     for (const auto & [key, value] : backend_conf_map)
     {
@@ -609,7 +592,8 @@ void 
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
         }
         else if (key.starts_with(CH_RUNTIME_SETTINGS_PREFIX))
         {
-            settings.set(key.substr(CH_RUNTIME_SETTINGS_PREFIX.size()), value);
+            auto k = key.substr(CH_RUNTIME_SETTINGS_PREFIX.size());
+            settings.set(k, toField(k, value));
             LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} 
value:{}", key, value);
         }
         else if (key.starts_with(SPARK_HADOOP_PREFIX + S3A_PREFIX))
@@ -624,6 +608,12 @@ void 
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
             // 4. fs.s3a.bucket.bucket_name.assumed.role.externalId (non 
hadoop official)
             settings.set(key.substr(SPARK_HADOOP_PREFIX.length()), value);
         }
+        else if (key.starts_with(SPARK_DELTA_PREFIX))
+        {
+            auto k = key.substr(SPARK_DELTA_PREFIX.size());
+            settings.set(k, toField(k, value));
+            LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{} 
value:{}", key, value);
+        }
     }
 
     /// Finally apply some fixed kvs to settings.
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index 308e22422..574cdbe4c 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -32,6 +32,10 @@
 
 namespace local_engine
 {
+static const std::unordered_set<String> 
BOOL_VALUE_SETTINGS{"mergetree.merge_after_insert"};
+static const std::unordered_set<String> LONG_VALUE_SETTINGS{
+    "optimize.maxfilesize", "optimize.minFileSize", 
"mergetree.max_num_part_per_merge_task"};
+
 class BlockUtil
 {
 public:
@@ -98,14 +102,6 @@ public:
     static std::string explainPlan(DB::QueryPlan & plan);
 };
 
-class MergeTreeUtil
-{
-public:
-    using Path = std::filesystem::path;
-    static std::vector<Path> getAllMergeTreeParts(const Path & storage_path);
-    static DB::NamesAndTypesList getSchemaFromMergeTreePart(const Path & 
part_path);
-};
-
 class ActionsDAGUtil
 {
 public:
@@ -131,6 +127,8 @@ class JNIUtils;
 class BackendInitializerUtil
 {
 public:
+    static DB::Field toField(const String key, const String value);
+
     /// Initialize two kinds of resources
     /// 1. global level resources like global_context/shared_context, notice 
that they can only be initialized once in process lifetime
     /// 2. session level resources like settings/configs, they can be 
initialized multiple times following the lifetime of executor/driver
@@ -166,6 +164,7 @@ public:
     inline static const std::string HADOOP_S3_CLIENT_CACHE_IGNORE = 
"fs.s3a.client.cached.ignore";
     inline static const std::string SPARK_HADOOP_PREFIX = "spark.hadoop.";
     inline static const std::string S3A_PREFIX = "fs.s3a.";
+    inline static const std::string SPARK_DELTA_PREFIX = 
"spark.databricks.delta.";
 
     /// On yarn mode, native writing on hdfs cluster takes yarn container user 
as the user passed to libhdfs3, which
     /// will cause permission issue because yarn container user is not the 
owner of the hdfs dir to be written.
@@ -228,4 +227,60 @@ public:
     static UInt64 getMemoryRSS();
 };
 
+template <typename T>
+class ConcurrentDeque
+{
+public:
+    std::optional<T> pop_front()
+    {
+        std::lock_guard<std::mutex> lock(mtx);
+
+        if (deq.empty())
+            return {};
+
+        T t = deq.front();
+        deq.pop_front();
+        return t;
+    }
+
+    void emplace_back(T value)
+    {
+        std::lock_guard<std::mutex> lock(mtx);
+        deq.emplace_back(value);
+    }
+
+    void emplace_back(std::vector<T> values)
+    {
+        std::lock_guard<std::mutex> lock(mtx);
+        deq.insert(deq.end(), values.begin(), values.end());
+    }
+
+    void emplace_front(T value)
+    {
+        std::lock_guard<std::mutex> lock(mtx);
+        deq.emplace_front(value);
+    }
+
+    size_t size()
+    {
+        std::lock_guard<std::mutex> lock(mtx);
+        return deq.size();
+    }
+
+    bool empty()
+    {
+        std::lock_guard<std::mutex> lock(mtx);
+        return deq.empty();
+    }
+
+    std::deque<T> unsafeGet()
+    {
+        return deq;
+    }
+
+private:
+    std::deque<T> deq;
+    mutable std::mutex mtx;
+};
+
 }
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp 
b/cpp-ch/local-engine/Common/QueryContext.cpp
index 9d2ea7f26..c659e6f34 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -68,6 +68,7 @@ int64_t initializeQuery(ReservationListenerWrapperPtr 
listener)
             listener->reserve(size);
     };
     CurrentMemoryTracker::before_free = [listener](Int64 size) -> void { 
listener->free(size); };
+    CurrentMemoryTracker::current_memory = [listener]() -> Int64 { return 
listener->currentMemory(); };
     allocator_map.insert(allocator_id, allocator_context);
     return allocator_id;
 }
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index cff32a83f..368015fb9 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -26,6 +26,8 @@ namespace DB
 namespace ErrorCodes
 {
 extern const int DUPLICATE_DATA_PART;
+extern const int NO_SUCH_DATA_PART;
+
 }
 }
 
@@ -146,9 +148,9 @@ CustomStorageMergeTree::CustomStorageMergeTree(
 
 std::atomic<int> CustomStorageMergeTree::part_num;
 
-MergeTreeData::MutableDataPartsVector 
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string> 
parts)
+std::vector<MergeTreeDataPartPtr> 
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string> 
parts)
 {
-    MutableDataPartsVector data_parts;
+    std::vector<MergeTreeDataPartPtr> data_parts;
     const auto disk = getStoragePolicy()->getDisks().at(0);
     for (const auto& name : parts)
     {
@@ -158,14 +160,8 @@ MergeTreeData::MutableDataPartsVector 
CustomStorageMergeTree::loadDataPartsWithN
         data_parts.emplace_back(res.part);
     }
 
-    if(getStorageID().hasUUID())
-    {
-        // the following lines will modify storage's member.
-        // So when current storage is shared (when UUID is default Nil value),
-        // we should avoid modify because we don't have locks here
-
-        calculateColumnAndSecondaryIndexSizesImpl(); // without it "test 
mergetree optimize partitioned by one low card column" will log ERROR
-    }
+    // without it "test mergetree optimize partitioned by one low card column" 
will log ERROR
+    calculateColumnAndSecondaryIndexSizesImpl();
     return data_parts;
 }
 
@@ -246,6 +242,38 @@ MergeTreeData::LoadPartResult 
CustomStorageMergeTree::loadDataPart(
     return res;
 }
 
+void CustomStorageMergeTree::removePartFromMemory(const 
MergeTreeData::DataPartPtr & part_to_detach)
+{
+    auto lock = lockParts();
+    bool removed_active_part = false;
+    bool restored_active_part = false;
+
+    auto it_part = data_parts_by_info.find(part_to_detach->info);
+    if (it_part == data_parts_by_info.end())
+    {
+        LOG_DEBUG(log, "No such data part {}", 
part_to_detach->getNameWithState());
+        return;
+    }
+
+    /// What if part_to_detach is a reference to *it_part? Make a new owner 
just in case.
+    /// Important to own part pointer here (not const reference), because it 
will be removed from data_parts_indexes
+    /// few lines below.
+    DataPartPtr part = *it_part; // NOLINT
+
+    if (part->getState() == DataPartState::Active)
+    {
+        removePartContributionToColumnAndSecondaryIndexSizes(part);
+        removed_active_part = true;
+    }
+
+    modifyPartState(it_part, DataPartState::Deleting);
+    LOG_TEST(log, "removePartFromMemory: removing {} from data_parts_indexes", 
part->getNameWithState());
+    data_parts_indexes.erase(it_part);
+
+    if (removed_active_part || restored_active_part)
+        resetObjectColumnsFromActiveParts(lock);
+}
+
 void CustomStorageMergeTree::dropPartNoWaitNoThrow(const String & 
/*part_name*/)
 {
     throw std::runtime_error("not implement");
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h 
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
index 0aeee4ef9..cd507a3ac 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
@@ -53,8 +53,8 @@ public:
     std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
     bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
     std::map<std::string, MutationCommands> getUnfinishedMutationCommands() 
const override;
-    MutableDataPartsVector 
loadDataPartsWithNames(std::unordered_set<std::string> parts);
-
+    std::vector<MergeTreeDataPartPtr> 
loadDataPartsWithNames(std::unordered_set<std::string> parts);
+    void removePartFromMemory(const MergeTreeData::DataPartPtr & 
part_to_detach);
 
     MergeTreeDataWriter writer;
     MergeTreeDataSelectExecutor reader;
@@ -84,6 +84,7 @@ protected:
     bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const 
override;
     MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & 
/*part*/) const override { return {}; }
     void attachRestoredParts(MutableDataPartsVector && /*parts*/) override { 
throw std::runtime_error("not implement"); }
+
 };
 
 }
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
index 21c0fc968..57bb804fa 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
@@ -15,9 +15,13 @@
  * limitations under the License.
  */
 #include "MetaDataHelper.h"
+
 #include <filesystem>
 
 #include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+#include <Parser/MergeTreeRelParser.h>
+#include <Storages/Mergetree/MergeSparkMergeTreeTask.h>
+#include <Poco/StringTokenizer.h>
 
 namespace CurrentMetrics
 {
@@ -123,6 +127,7 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, 
const MergeTreeTable &
 void saveFileStatus(
     const DB::MergeTreeData & storage,
     const DB::ContextPtr& context,
+    const String & part_name,
     IDataPartStorage & data_part_storage)
 {
     const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
@@ -142,5 +147,61 @@ void saveFileStatus(
         }
         out->finalize();
     }
+
+    LOG_DEBUG(&Poco::Logger::get("MetaDataHelper"), "Save part {} metadata 
success.", part_name);
+}
+
+
+std::vector<MergeTreeDataPartPtr> mergeParts(
+    std::vector<DB::DataPartPtr> selected_parts,
+    std::unordered_map<String, String> & partition_values,
+    const String & new_part_uuid,
+    CustomStorageMergeTreePtr storage,
+    const String  & partition_dir,
+    const String & bucket_dir)
+{
+    auto future_part = std::make_shared<DB::FutureMergedMutatedPart>();
+    future_part->uuid = UUIDHelpers::generateV4();
+
+    future_part->assign(std::move(selected_parts));
+
+    future_part->name = "";
+    if(!partition_dir.empty())
+    {
+        future_part->name =  partition_dir + "/";
+        extractPartitionValues(partition_dir, partition_values);
+    }
+    if(!bucket_dir.empty())
+    {
+        future_part->name = future_part->name + bucket_dir + "/";
+    }
+    future_part->name = future_part->name +  new_part_uuid + "-merged";
+
+    auto entry = std::make_shared<DB::MergeMutateSelectedEntry>(future_part, 
DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared<DB::MutationCommands>());
+
+    // Copying a vector of columns `deduplicate by columns.
+    DB::IExecutableTask::TaskResultCallback f = [](bool) {};
+    auto task = std::make_shared<local_engine::MergeSparkMergeTreeTask>(
+        *storage, storage->getInMemoryMetadataPtr(), false,  
std::vector<std::string>{}, false, entry,
+        DB::TableLockHolder{}, f);
+
+    task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, 
DB::MergeTreeTransactionPtr{});
+
+    executeHere(task);
+
+    std::unordered_set<std::string> to_load{future_part->name};
+    std::vector<MergeTreeDataPartPtr> merged = 
storage->loadDataPartsWithNames(to_load);
+    return merged;
+}
+
+void extractPartitionValues(const String & partition_dir, 
std::unordered_map<String, String> & partition_values)
+{
+    Poco::StringTokenizer partitions(partition_dir, "/");
+    for (const auto & partition : partitions)
+    {
+        Poco::StringTokenizer key_value(partition, "=");
+        chassert(key_value.count() == 2);
+        partition_values.emplace(key_value[0], key_value[1]);
+    }
 }
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h 
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
index b15a15322..7163ee02c 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
@@ -28,6 +28,16 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, 
const MergeTreeTable &
 void saveFileStatus(
     const DB::MergeTreeData & storage,
     const DB::ContextPtr& context,
+    const String & part_name,
     IDataPartStorage & data_part_storage);
 
+std::vector<MergeTreeDataPartPtr> mergeParts(
+    std::vector<DB::DataPartPtr> selected_parts,
+    std::unordered_map<String, String> & partition_values,
+    const String & new_part_uuid,
+    CustomStorageMergeTreePtr storage,
+    const String & partition_dir,
+    const String & bucket_dir);
+
+void extractPartitionValues(const String & partition_dir, 
std::unordered_map<String, String> & partition_values);
 }
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index 40e716a56..c709a5f24 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -16,12 +16,24 @@
  */
 #include "SparkMergeTreeWriter.h"
 
-#include <Disks/createVolume.h>
 #include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+#include <Disks/createVolume.h>
 #include <Interpreters/ActionsDAG.h>
 #include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
-#include <rapidjson/prettywriter.h>
 #include <Storages/Mergetree/MetaDataHelper.h>
+#include <rapidjson/prettywriter.h>
+#include <Common/CHUtil.h>
+
+
+namespace CurrentMetrics
+{
+extern const Metric LocalThread;
+extern const Metric LocalThreadActive;
+extern const Metric LocalThreadScheduled;
+extern const Metric GlobalThread;
+extern const Metric GlobalThreadActive;
+extern const Metric GlobalThreadScheduled;
+}
 
 using namespace DB;
 
@@ -42,6 +54,43 @@ Block removeColumnSuffix(const DB::Block & block)
     return Block(columns);
 }
 
+SparkMergeTreeWriter::SparkMergeTreeWriter(
+    CustomStorageMergeTreePtr storage_,
+    const DB::StorageMetadataPtr & metadata_snapshot_,
+    const DB::ContextPtr & context_,
+    const String & uuid_,
+    const String & partition_dir_,
+    const String & bucket_dir_)
+    : storage(storage_)
+    , metadata_snapshot(metadata_snapshot_)
+    , context(context_)
+    , uuid(uuid_)
+    , partition_dir(partition_dir_)
+    , bucket_dir(bucket_dir_)
+    , thread_pool(CurrentMetrics::LocalThread, 
CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 
100000)
+{
+    const DB::Settings & settings = context->getSettingsRef();
+    squashing_transform
+        = 
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows, 
settings.min_insert_block_size_bytes);
+    if (!partition_dir.empty())
+    {
+        extractPartitionValues(partition_dir, partition_values);
+    }
+    header = metadata_snapshot->getSampleBlock();
+
+    Field is_merge;
+    if (context->getSettings().tryGet("mergetree.merge_after_insert", 
is_merge))
+        merge_after_insert = is_merge.get<bool>();
+
+    Field limit_size_field;
+    if (context->getSettings().tryGet("optimize.minFileSize", 
limit_size_field))
+        merge_min_size = limit_size_field.get<Int64>() <= 0 ? merge_min_size : 
limit_size_field.get<Int64>();
+
+    Field limit_cnt_field;
+    if (context->getSettings().tryGet("mergetree.max_num_part_per_merge_task", 
limit_cnt_field))
+        merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ? 
merge_limit_parts : limit_cnt_field.get<Int64>();
+}
+
 void SparkMergeTreeWriter::write(DB::Block & block)
 {
     auto new_block = removeColumnSuffix(block);
@@ -52,11 +101,57 @@ void SparkMergeTreeWriter::write(DB::Block & block)
         do_convert.execute(new_block);
     }
 
-    auto blocks_with_partition = 
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block), 
10, metadata_snapshot, context);
-        for (auto & item : blocks_with_partition)
+    auto blocks_with_partition
+        = 
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block), 
10, metadata_snapshot, context);
+    for (auto & item : blocks_with_partition)
+    {
+        size_t before_write_memory = 0;
+        if (auto * memory_tracker = CurrentThread::getMemoryTracker())
         {
-            new_parts.emplace_back(writeTempPartAndFinalize(item, 
metadata_snapshot).part);
+            CurrentThread::flushUntrackedMemory();
+            before_write_memory = memory_tracker->get();
+        }
+
+        new_parts.emplace_back(writeTempPartAndFinalize(item, 
metadata_snapshot).part);
         part_num++;
+        manualFreeMemory(before_write_memory);
+        /// Reset earlier to free memory
+        item.block.clear();
+        item.partition.clear();
+    }
+
+    if (!blocks_with_partition.empty() && merge_after_insert)
+        checkAndMerge();
+}
+
+void SparkMergeTreeWriter::manualFreeMemory(size_t before_write_memory)
+{
+    // If mergetree disk is not local fs, like remote fs s3 or hdfs,
+    // it may alloc memory in current thread, and free on global thread.
+    // Now, wo have not idea to clear global memory by used spark thread 
tracker.
+    // So we manually correct the memory usage.
+    auto disk = storage->getStoragePolicy()->getAnyDisk();
+    if (!disk->isRemote())
+        return;
+
+    std::lock_guard lock(memory_mutex);
+    auto * memory_tracker = CurrentThread::getMemoryTracker();
+    if (memory_tracker && CurrentMemoryTracker::before_free)
+    {
+        CurrentThread::flushUntrackedMemory();
+        const size_t ch_alloc = memory_tracker->get();
+        if (disk->getName().contains("s3") && 
context->getSettings().s3_allow_parallel_part_upload && ch_alloc > 
before_write_memory)
+        {
+            const size_t diff_ch_alloc = before_write_memory - ch_alloc;
+            memory_tracker->adjustWithUntrackedMemory(diff_ch_alloc);
+        }
+
+        const size_t a = memory_tracker->get();
+        const size_t spark_alloc = CurrentMemoryTracker::current_memory();
+        const size_t diff_alloc = spark_alloc - memory_tracker->get();
+
+        if (diff_alloc > 0)
+            CurrentMemoryTracker::before_free(diff_alloc);
     }
 }
 
@@ -67,7 +162,61 @@ void SparkMergeTreeWriter::finalize()
     {
         auto blocks_with_partition = 
MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10, 
metadata_snapshot, context);
         for (auto & item : blocks_with_partition)
+        {
+            size_t before_write_memory = 0;
+            if (auto * memory_tracker = CurrentThread::getMemoryTracker())
+            {
+                CurrentThread::flushUntrackedMemory();
+                before_write_memory = memory_tracker->get();
+            }
+
             new_parts.emplace_back(writeTempPartAndFinalize(item, 
metadata_snapshot).part);
+            part_num++;
+            manualFreeMemory(before_write_memory);
+            /// Reset earlier to free memory
+            item.block.clear();
+            item.partition.clear();
+        }
+    }
+
+    SCOPE_EXIT({
+        for (auto merge_tree_data_part : new_parts.unsafeGet())
+            saveFileStatus(
+                *storage, context, merge_tree_data_part->name, 
const_cast<IDataPartStorage &>(merge_tree_data_part->getDataPartStorage()));
+    });
+
+    if (!merge_after_insert)
+        return;
+
+    // wait all merge task end and do final merge
+    thread_pool.wait();
+
+    size_t before_merge_size;
+    do
+    {
+        before_merge_size = new_parts.size();
+        checkAndMerge(true);
+        thread_pool.wait();
+    } while (before_merge_size != new_parts.size());
+
+    std::unordered_set<String> final_parts;
+    for (auto merge_tree_data_part : new_parts.unsafeGet())
+        final_parts.emplace(merge_tree_data_part->name);
+
+    for (const auto & tmp_part : tmp_parts)
+    {
+        if (final_parts.contains(tmp_part))
+            continue;
+
+        GlobalThreadPool::instance().scheduleOrThrow(
+            [&]() -> void
+            {
+                for (auto disk : storage->getDisks())
+                {
+                    auto full_path = storage->getFullPathOnDisk(disk);
+                    disk->removeRecursive(full_path + "/" + tmp_part);
+                }
+            });
     }
 }
 
@@ -76,16 +225,15 @@ SparkMergeTreeWriter::writeTempPartAndFinalize(
     DB::BlockWithPartition & block_with_partition,
     const DB::StorageMetadataPtr & metadata_snapshot)
 {
-    auto temp_part = writeTempPart(block_with_partition, metadata_snapshot);
+    MergeTreeDataWriter::TemporaryPart temp_part;
+    writeTempPart(temp_part, block_with_partition, metadata_snapshot);
     temp_part.finalize();
-    saveFileStatus(storage, context, temp_part.part->getDataPartStorage());
     return temp_part;
 }
 
-MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart(
+void SparkMergeTreeWriter::writeTempPart(MergeTreeDataWriter::TemporaryPart & 
temp_part,
     BlockWithPartition & block_with_partition, const StorageMetadataPtr & 
metadata_snapshot)
 {
-    MergeTreeDataWriter::TemporaryPart temp_part;
     Block & block = block_with_partition.block;
 
     auto columns = 
metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
@@ -95,7 +243,7 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeWriter::writeTempPart(
             column.type = block.getByName(column.name).type;
 
     auto minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
-    minmax_idx->update(block, 
storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
+    minmax_idx->update(block, 
storage->getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
 
     MergeTreePartition partition(block_with_partition.partition);
 
@@ -121,13 +269,13 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeWriter::writeTempPart(
 
     String part_name = part_dir;
 
-    temp_part.temporary_directory_lock = 
storage.getTemporaryPartDirectoryHolder(part_dir);
+    temp_part.temporary_directory_lock = 
storage->getTemporaryPartDirectoryHolder(part_dir);
 
     auto indices = 
MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
 
     /// If we need to calculate some columns to sort.
     if (metadata_snapshot->hasSortingKey() || 
metadata_snapshot->hasSecondaryIndices())
-        storage.getSortingKeyAndSkipIndicesExpression(metadata_snapshot, 
indices)->execute(block);
+        storage->getSortingKeyAndSkipIndicesExpression(metadata_snapshot, 
indices)->execute(block);
 
     Names sort_columns = metadata_snapshot->getSortingKeyColumns();
     SortDescription sort_description;
@@ -157,19 +305,19 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeWriter::writeTempPart(
     /// If optimize_on_insert is true, block may become empty after merge.
     /// There is no need to create empty part.
     if (expected_size == 0)
-        return temp_part;
+        return;
 
-    VolumePtr volume = storage.getStoragePolicy()->getVolume(0);
+    VolumePtr volume = storage->getStoragePolicy()->getVolume(0);
     VolumePtr data_part_volume = 
std::make_shared<SingleDiskVolume>(volume->getName(), volume->getDisk(), 
volume->max_data_part_size);
-    auto new_data_part = storage.getDataPartBuilder(part_name, 
data_part_volume, part_dir)
-                             
.withPartFormat(storage.choosePartFormat(expected_size, block.rows()))
+    auto new_data_part = storage->getDataPartBuilder(part_name, 
data_part_volume, part_dir)
+                             
.withPartFormat(storage->choosePartFormat(expected_size, block.rows()))
                              .withPartInfo(new_part_info)
                              .build();
 
     auto data_part_storage = new_data_part->getDataPartStoragePtr();
 
 
-    const auto & data_settings = storage.getSettings();
+    const auto & data_settings = storage->getSettings();
 
     SerializationInfo::Settings 
settings{data_settings->ratio_of_defaults_for_sparse_serialization, true};
     SerializationInfoByName infos(columns, settings);
@@ -194,7 +342,7 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeWriter::writeTempPart(
 
         data_part_storage->createDirectories();
 
-        if (storage.getSettings()->fsync_part_directory)
+        if (storage->getSettings()->fsync_part_directory)
         {
             const auto disk = data_part_volume->getDisk();
             sync_guard = disk->getDirectorySyncGuard(full_path);
@@ -203,7 +351,7 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeWriter::writeTempPart(
 
     /// This effectively chooses minimal compression method:
     ///  either default lz4 or compression method with zero thresholds on 
absolute and relative part size.
-    auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 
0);
+    auto compression_codec = storage->getContext()->chooseCompressionCodec(0, 
0);
 
     auto out = std::make_unique<MergedBlockOutputStream>(
         new_data_part,
@@ -218,21 +366,24 @@ MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeWriter::writeTempPart(
         context->getWriteSettings());
 
     out->writeWithPermutation(block, perm_ptr);
-
-
     auto finalizer = out->finalizePartAsync(new_data_part, 
data_settings->fsync_after_insert, nullptr, nullptr);
 
     temp_part.part = new_data_part;
-    
temp_part.streams.emplace_back(MergeTreeDataWriter::TemporaryPart::Stream{.stream
 = std::move(out), .finalizer = std::move(finalizer)});
-
-    return temp_part;
+    temp_part.streams.emplace_back(
+        MergeTreeDataWriter::TemporaryPart::Stream{.stream = std::move(out), 
.finalizer = std::move(finalizer)});
 }
 
 std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo()
 {
     std::vector<PartInfo> res;
-    for (const MergeTreeDataPartPtr & part : new_parts)
-        res.emplace_back(PartInfo{part->name, part->getMarksCount(), 
part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir});
+    res.reserve(new_parts.size());
+
+    for (auto part : new_parts.unsafeGet())
+    {
+        res.emplace_back(
+            PartInfo{part->name, part->getMarksCount(), 
part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir});
+    }
+
     return res;
 }
 
@@ -268,4 +419,90 @@ String SparkMergeTreeWriter::partInfosToJson(const 
std::vector<PartInfo> & part_
     return result.GetString();
 }
 
+void SparkMergeTreeWriter::checkAndMerge(bool force)
+{
+    // Only finalize should force merge.
+    if (!force && new_parts.size() < merge_limit_parts)
+        return;
+
+    auto doTask = [this](
+                      const ThreadGroupPtr & thread_group,
+                      const std::vector<MergeTreeDataPartPtr> 
prepare_merge_parts,
+                      CustomStorageMergeTreePtr & storage,
+                      String & partition_dir,
+                      String & bucket_dir) -> std::vector<MergeTreeDataPartPtr>
+    {
+        setThreadName("InsertWithMerge");
+        ThreadStatus thread_status;
+        thread_status.attachToGroup(thread_group);
+
+        size_t before_size = 0;
+        size_t after_size = 0;
+        for (const auto & prepare_merge_part : prepare_merge_parts)
+            before_size += prepare_merge_part->getBytesOnDisk();
+
+        std::unordered_map<String, String> partition_values;
+        auto merged_parts
+            = mergeParts(prepare_merge_parts, partition_values, 
toString(UUIDHelpers::generateV4()), storage, partition_dir, bucket_dir);
+        for (const auto & merge_tree_data_part : merged_parts)
+            after_size += merge_tree_data_part->getBytesOnDisk();
+
+        LOG_DEBUG(
+            &Poco::Logger::get("SparkMergeTreeWriter"),
+            "Mergetree merge on insert finished, before merge part size {}, 
part count {}, after part size {}, part count {}.",
+            before_size,
+            prepare_merge_parts.size(),
+            after_size,
+            merged_parts.size());
+
+        return merged_parts;
+    };
+
+    std::vector<MergeTreeDataPartPtr> selected_parts;
+    selected_parts.reserve(merge_limit_parts);
+    size_t totol_size = 0;
+    std::vector<MergeTreeDataPartPtr> skip_parts;
+
+    while (const auto merge_tree_data_part_option = new_parts.pop_front())
+    {
+        auto merge_tree_data_part = merge_tree_data_part_option.value();
+        if (merge_tree_data_part->getBytesOnDisk() >= merge_min_size)
+        {
+            skip_parts.emplace_back(merge_tree_data_part);
+            continue;
+        }
+
+        selected_parts.emplace_back(merge_tree_data_part);
+        totol_size += merge_tree_data_part->getBytesOnDisk();
+        if (merge_min_size > totol_size && merge_limit_parts > 
selected_parts.size())
+            continue;
+
+        for (auto selected_part : selected_parts)
+        {
+            tmp_parts.emplace(selected_part->name);
+        }
+
+        thread_pool.scheduleOrThrow([this, doTask, selected_parts, 
thread_group = CurrentThread::getGroup()]() -> void
+                   { new_parts.emplace_back(doTask(thread_group, 
selected_parts, storage, partition_dir, bucket_dir)); });
+        selected_parts.clear();
+        totol_size = 0;
+    }
+
+    if (!selected_parts.empty())
+    {
+        if (force && selected_parts.size() > 1)
+        {
+            for (auto selected_part : selected_parts)
+                tmp_parts.emplace(selected_part->name);
+            thread_pool.scheduleOrThrow(
+                [this, doTask, selected_parts, thread_group = 
CurrentThread::getGroup()]() -> void
+                { new_parts.emplace_back(doTask(thread_group, selected_parts, 
storage, partition_dir, bucket_dir)); });
+        }
+        else
+            new_parts.emplace_back(selected_parts);
+    }
+
+    new_parts.emplace_back(skip_parts);
+}
+
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 000d009fe..5251d4cc4 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -19,7 +19,9 @@
 #include <Interpreters/SquashingTransform.h>
 #include <Storages/MergeTree/MergeTreeDataWriter.h>
 #include <Storages/MergeTree/IMergeTreeDataPart.h>
+#include <Storages/StorageMergeTreeFactory.h>
 #include <Poco/StringTokenizer.h>
+#include <Common/CHUtil.h>
 
 namespace DB
 {
@@ -40,6 +42,8 @@ struct PartInfo
     size_t row_count;
     std::unordered_map<String, String> partition_values;
     String bucket_id;
+
+    bool operator<(const PartInfo & rhs) const { return disk_size < 
rhs.disk_size; }
 };
 
 class SparkMergeTreeWriter
@@ -47,56 +51,44 @@ class SparkMergeTreeWriter
 public:
     static String partInfosToJson(const std::vector<PartInfo> & part_infos);
     SparkMergeTreeWriter(
-        DB::MergeTreeData & storage_,
+        CustomStorageMergeTreePtr storage_,
         const DB::StorageMetadataPtr & metadata_snapshot_,
         const DB::ContextPtr & context_,
         const String & uuid_,
         const String & partition_dir_ = "",
-        const String & bucket_dir_ = "")
-        : storage(storage_)
-        , metadata_snapshot(metadata_snapshot_)
-        , context(context_)
-        , uuid(uuid_)
-        , partition_dir(partition_dir_)
-        , bucket_dir(bucket_dir_)
-    {
-        const DB::Settings & settings = context->getSettingsRef();
-        squashing_transform
-            = 
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows, 
settings.min_insert_block_size_bytes);
-        if (!partition_dir.empty())
-        {
-            Poco::StringTokenizer partitions(partition_dir, "/");
-            for (const auto & partition : partitions)
-            {
-                Poco::StringTokenizer key_value(partition, "=");
-                chassert(key_value.count() == 2);
-                partition_values.emplace(key_value[0], key_value[1]);
-            }
-        }
-        header = metadata_snapshot->getSampleBlock();
-    }
+        const String & bucket_dir_ = "");
 
     void write(DB::Block & block);
     void finalize();
     std::vector<PartInfo> getAllPartInfo();
 
 private:
-    DB::MergeTreeDataWriter::TemporaryPart
-    writeTempPart(DB::BlockWithPartition & block_with_partition, const 
DB::StorageMetadataPtr & metadata_snapshot);
+    void
+    writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part, 
DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr & 
metadata_snapshot);
     DB::MergeTreeDataWriter::TemporaryPart
     writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition, 
const DB::StorageMetadataPtr & metadata_snapshot);
+    void checkAndMerge(bool force = false);
+    void safeEmplaceBackPart(DB::MergeTreeDataPartPtr);
+    void safeAddPart(DB::MergeTreeDataPartPtr);
+    void manualFreeMemory(size_t before_write_memory);
 
     String uuid;
     String partition_dir;
     String bucket_dir;
-    DB::MergeTreeData & storage;
+    CustomStorageMergeTreePtr storage;
     DB::StorageMetadataPtr metadata_snapshot;
     DB::ContextPtr context;
     std::unique_ptr<DB::SquashingTransform> squashing_transform;
     int part_num = 1;
-    std::vector<DB::MergeTreeDataPartPtr> new_parts;
+    ConcurrentDeque<DB::MergeTreeDataPartPtr> new_parts;
     std::unordered_map<String, String> partition_values;
+    std::unordered_set<String> tmp_parts;
     DB::Block header;
+    bool merge_after_insert;
+    FreeThreadPool thread_pool;
+    size_t merge_min_size = 1024 * 1024 * 1024;
+    size_t merge_limit_parts = 10;
+    std::mutex memory_mutex;
 };
 
 }
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp 
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
index b5c5f8cd0..65b29c2a2 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
@@ -24,6 +24,7 @@ jclass ReservationListenerWrapper::reservation_listener_class 
= nullptr;
 jmethodID ReservationListenerWrapper::reservation_listener_reserve = nullptr;
 jmethodID ReservationListenerWrapper::reservation_listener_reserve_or_throw = 
nullptr;
 jmethodID ReservationListenerWrapper::reservation_listener_unreserve = nullptr;
+jmethodID ReservationListenerWrapper::reservation_listener_currentMemory = 
nullptr;
 
 ReservationListenerWrapper::ReservationListenerWrapper(jobject listener_) : 
listener(listener_)
 {
@@ -56,4 +57,12 @@ void ReservationListenerWrapper::free(int64_t size)
     safeCallVoidMethod(env, listener, reservation_listener_unreserve, size);
     CLEAN_JNIENV
 }
+
+size_t ReservationListenerWrapper::currentMemory()
+{
+    GET_JNIENV(env)
+    int64_t res = safeCallLongMethod(env, listener, 
reservation_listener_currentMemory);
+    return res;
+    CLEAN_JNIENV
+}
 }
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h 
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
index 93efd497d..1dfb3671f 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
@@ -28,12 +28,16 @@ public:
     static jmethodID reservation_listener_reserve;
     static jmethodID reservation_listener_reserve_or_throw;
     static jmethodID reservation_listener_unreserve;
+    static jmethodID reservation_listener_currentMemory;
 
     explicit ReservationListenerWrapper(jobject listener);
     ~ReservationListenerWrapper();
     void reserve(int64_t size);
     void reserveOrThrow(int64_t size);
     void free(int64_t size);
+    size_t currentMemory();
+
+
 
 private:
     jobject listener;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 34b81d7a1..7baad210e 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -186,6 +186,9 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
         = local_engine::GetMethodID(env, 
local_engine::ReservationListenerWrapper::reservation_listener_class, 
"reserveOrThrow", "(J)V");
     local_engine::ReservationListenerWrapper::reservation_listener_unreserve
         = local_engine::GetMethodID(env, 
local_engine::ReservationListenerWrapper::reservation_listener_class, 
"unreserve", "(J)J");
+    
local_engine::ReservationListenerWrapper::reservation_listener_currentMemory
+        = local_engine::GetMethodID(env, 
local_engine::ReservationListenerWrapper::reservation_listener_class, 
"currentMemory", "()J");
+
 
     native_metrics_class = local_engine::CreateGlobalClassReference(env, 
"Lorg/apache/gluten/metrics/NativeMetrics;");
     native_metrics_constructor = local_engine::GetMethodID(env, 
native_metrics_class, "<init>", "(Ljava/lang/String;)V");
@@ -997,9 +1000,26 @@ JNIEXPORT jlong 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
 }
 
 JNIEXPORT jlong 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper(
-    JNIEnv * env, jobject, jbyteArray plan_, jbyteArray split_info_, jstring 
uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_)
+    JNIEnv * env,
+    jobject,
+    jbyteArray plan_,
+    jbyteArray split_info_,
+    jstring uuid_,
+    jstring task_id_,
+    jstring partition_dir_,
+    jstring bucket_dir_,
+    jbyteArray conf_plan,
+    jlong allocator_id)
 {
     LOCAL_ENGINE_JNI_METHOD_START
+    auto query_context = 
local_engine::getAllocator(allocator_id)->query_context;
+    // by task update new configs ( in case of dynamic config update )
+    jsize conf_plan_buf_size = env->GetArrayLength(conf_plan);
+    jbyte * conf_plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
+    std::string conf_plan_str;
+    conf_plan_str.assign(reinterpret_cast<const char *>(conf_plan_buf_addr), 
conf_plan_buf_size);
+    local_engine::BackendInitializerUtil::updateConfig(query_context, 
&conf_plan_str);
+
     const auto uuid_str = jstring2string(env, uuid_);
     const auto task_id = jstring2string(env, task_id_);
     const auto partition_dir = jstring2string(env, partition_dir_);
@@ -1035,10 +1055,11 @@ JNIEXPORT jlong 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
         extension_table, local_engine::SerializedPlanParser::global_context);
     auto uuid = uuid_str + "_" + task_id;
     auto * writer = new local_engine::SparkMergeTreeWriter(
-        *storage, storage->getInMemoryMetadataPtr(), 
local_engine::SerializedPlanParser::global_context, uuid, partition_dir, 
bucket_dir);
+        storage, storage->getInMemoryMetadataPtr(), query_context, uuid, 
partition_dir, bucket_dir);
 
     env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
     env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
+    env->ReleaseByteArrayElements(conf_plan, conf_plan_buf_addr, JNI_ABORT);
     return reinterpret_cast<jlong>(writer);
     LOCAL_ENGINE_JNI_METHOD_END(env, 0)
 }
@@ -1180,53 +1201,20 @@ JNIEXPORT jstring 
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
     auto storage_factory = local_engine::StorageMergeTreeFactory::instance();
     std::vector<DB::DataPartPtr> selected_parts = 
storage_factory.getDataParts(table_id, merge_tree_table.snapshot_id, 
merge_tree_table.getPartNames());
 
-    auto future_part = std::make_shared<DB::FutureMergedMutatedPart>();
-    future_part->uuid = DB::UUIDHelpers::generateV4();
-
-    future_part->assign(std::move(selected_parts));
-
-    future_part->name = "";
     std::unordered_map<String, String> partition_values;
-    if(!partition_dir.empty())
-    {
-        future_part->name =  partition_dir + "/";
-        Poco::StringTokenizer partitions(partition_dir, "/");
-        for (const auto & partition : partitions)
-        {
-            Poco::StringTokenizer key_value(partition, "=");
-            chassert(key_value.count() == 2);
-            partition_values.emplace(key_value[0], key_value[1]);
-        }
-    }
-    if(!bucket_dir.empty())
-    {
-        future_part->name = future_part->name + bucket_dir + "/";
-    }
-    future_part->name = future_part->name +  uuid_str + "-merged";
-
-    auto entry = std::make_shared<DB::MergeMutateSelectedEntry>(future_part, 
DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared<DB::MutationCommands>());
-
-
-    // Copying a vector of columns `deduplicate by columns.
-    DB::IExecutableTask::TaskResultCallback f = [](bool) {};
-    auto task = std::make_shared<local_engine::MergeSparkMergeTreeTask>(
-        *storage, storage->getInMemoryMetadataPtr(), false,  
std::vector<std::string>{}, false, entry,
-        DB::TableLockHolder{}, f);
-
-    task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, 
DB::MergeTreeTransactionPtr{});
-
-    executeHere(task);
+    std::vector<MergeTreeDataPartPtr> loaded =
+        local_engine::mergeParts(selected_parts, partition_values, uuid_str, 
storage, partition_dir, bucket_dir);
 
-    std::unordered_set<std::string> to_load{future_part->name};
-    std::vector<std::shared_ptr<DB::IMergeTreeDataPart>> loaded = 
storage->loadDataPartsWithNames(to_load);
     std::vector<local_engine::PartInfo> res;
     for (auto & partPtr : loaded)
     {
-        local_engine::saveFileStatus(*storage, 
local_engine::SerializedPlanParser::global_context, 
partPtr->getDataPartStorage());
-        res.emplace_back(
-            local_engine::PartInfo{partPtr->name, partPtr->getMarksCount(), 
partPtr->getBytesOnDisk(), partPtr->rows_count,
-                                   /*partition_value*/ partition_values,
-                                   bucket_dir});
+        saveFileStatus(
+            *storage,
+            local_engine::SerializedPlanParser::global_context,
+            partPtr->name,
+            const_cast<IDataPartStorage &>(partPtr->getDataPartStorage()));
+        res.emplace_back(local_engine::PartInfo{
+            partPtr->name, partPtr->getMarksCount(), 
partPtr->getBytesOnDisk(), partPtr->rows_count, partition_values, bucket_dir});
     }
 
     auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to