This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8efe3e4e5 [GLUTEN-5476][CH] Triger merge on insert task (#5529)
8efe3e4e5 is described below
commit 8efe3e4e551694f1596b1b76758f81ddffd14807
Author: Shuai li <[email protected]>
AuthorDate: Mon May 6 14:48:50 2024 +0800
[GLUTEN-5476][CH] Triger merge on insert task (#5529)
What changes were proposed in this pull request?
(Fixes: #5476)
How was this patch tested?
Test by ut
---
.../datasources/CHDatasourceJniWrapper.java | 4 +-
.../delta/ClickhouseOptimisticTransaction.scala | 13 +-
.../datasources/v1/CHMergeTreeWriterInjects.scala | 25 +-
.../GlutenClickHouseMergeTreeOptimizeSuite.scala | 31 +++
...tenClickHouseMergeTreePathBasedWriteSuite.scala | 7 +-
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 3 +
...ergeTreeWriteOnObjectStorageAbstractSuite.scala | 93 -------
.../GlutenClickHouseMergeTreeWriteSuite.scala | 3 +
.../GlutenClickHouseTableAfterRestart.scala | 3 +
...lutenClickHouseWholeStageTransformerSuite.scala | 3 +
cpp-ch/local-engine/Common/CHUtil.cpp | 48 ++--
cpp-ch/local-engine/Common/CHUtil.h | 71 ++++-
cpp-ch/local-engine/Common/QueryContext.cpp | 1 +
.../Storages/CustomStorageMergeTree.cpp | 48 +++-
.../local-engine/Storages/CustomStorageMergeTree.h | 5 +-
.../Storages/Mergetree/MetaDataHelper.cpp | 61 +++++
.../Storages/Mergetree/MetaDataHelper.h | 10 +
.../Storages/Mergetree/SparkMergeTreeWriter.cpp | 289 +++++++++++++++++++--
.../Storages/Mergetree/SparkMergeTreeWriter.h | 48 ++--
.../jni/ReservationListenerWrapper.cpp | 9 +
.../local-engine/jni/ReservationListenerWrapper.h | 4 +
cpp-ch/local-engine/local_engine_jni.cpp | 76 +++---
22 files changed, 603 insertions(+), 252 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
index 9ca301efb..c041ee352 100644
---
a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
+++
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
@@ -27,7 +27,9 @@ public class CHDatasourceJniWrapper {
String uuid,
String taskId,
String partition_dir,
- String bucket_dir);
+ String bucket_dir,
+ byte[] confArray,
+ long allocId);
public native String nativeMergeMTParts(
byte[] plan,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index c73b7e7af..e31560259 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -16,6 +16,7 @@
*/
package org.apache.spark.sql.delta
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.execution.ColumnarToRowExecBase
import org.apache.spark.SparkException
@@ -109,7 +110,7 @@ class ClickhouseOptimisticTransaction(
// Retain only a minimal selection of Spark writer options to avoid any
potential
// compatibility issues
- val options = writeOptions match {
+ var options = writeOptions match {
case None => Map.empty[String, String]
case Some(writeOptions) =>
writeOptions.options.filterKeys {
@@ -119,6 +120,16 @@ class ClickhouseOptimisticTransaction(
}.toMap
}
+ spark.conf.getAll.foreach(
+ entry => {
+ if (
+
entry._1.startsWith(s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings")
+ ||
entry._1.equalsIgnoreCase(DeltaSQLConf.DELTA_OPTIMIZE_MIN_FILE_SIZE.key)
+ ) {
+ options += (entry._1 -> entry._2)
+ }
+ })
+
try {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
MergeTreeFileFormatWriter.write(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index 64aa8863b..8a61385fc 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -16,10 +16,13 @@
*/
package org.apache.spark.sql.execution.datasources.v1
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.memory.alloc.CHNativeMemoryAllocators
import org.apache.gluten.substrait.`type`.ColumnTypeNode
import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.expression.{ExpressionBuilder,
StringMapNode}
+import org.apache.gluten.substrait.extensions.{AdvancedExtensionNode,
ExtensionBuilder}
import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.gluten.substrait.rel.{ExtensionTableBuilder, RelBuilder}
@@ -39,7 +42,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
import java.util.{ArrayList => JList, Map => JMap, UUID}
import scala.collection.JavaConverters._
-import scala.collection.mutable
+
case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte])
class CHMergeTreeWriterInjects extends GlutenFormatWriterInjectsBase {
@@ -47,10 +50,7 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
override def nativeConf(
options: Map[String, String],
compressionCodec: String): JMap[String, String] = {
- // pass options to native so that velox can take user-specified conf to
write parquet,
- // i.e., compression, block size, block rows.
- val sparkOptions = new mutable.HashMap[String, String]()
- sparkOptions.asJava
+ options.asJava
}
override def createOutputWriter(
@@ -95,7 +95,7 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
clickhouseTableConfigs,
tableSchema.toAttributes // use table schema instead of data schema
)
-
+ val allocId = CHNativeMemoryAllocators.contextInstance.getNativeInstanceId
val datasourceJniWrapper = new CHDatasourceJniWrapper()
val instance =
datasourceJniWrapper.nativeInitMergeTreeWriterWrapper(
@@ -104,7 +104,9 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
uuid,
context.getTaskAttemptID.getTaskID.getId.toString,
context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"),
-
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str")
+
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"),
+ buildNativeConf(nativeConf),
+ allocId
)
new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper,
instance, path)
@@ -121,6 +123,13 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
override def getFormatName(): String = {
"mergetree"
}
+
+ private def buildNativeConf(confs: JMap[String, String]): Array[Byte] = {
+ val stringMapNode: StringMapNode = ExpressionBuilder.makeStringMap(confs)
+ val extensionNode: AdvancedExtensionNode =
ExtensionBuilder.makeAdvancedExtension(
+
BackendsApiManager.getTransformerApiInstance.packPBMessage(stringMapNode.toProtobuf))
+ PlanBuilder.makePlan(extensionNode).toProtobuf.toByteArray
+ }
}
object CHMergeTreeWriterInjects {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index 9635b9958..ae0cd170d 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -24,6 +24,8 @@ import io.delta.tables.ClickhouseTable
import java.io.File
+import scala.concurrent.duration.DurationInt
+
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
@@ -54,6 +56,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
"spark.databricks.delta.retentionDurationCheck.enabled",
"false"
) // otherwise RETAIN 0 HOURS will fail
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+ "false")
}
override protected def createTPCHNotNullTables(): Unit = {
@@ -426,5 +431,31 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql(s"select count(*) from
clickhouse.`$dataPath`").collect()
assert(ret.apply(0).get(0) == 600572)
}
+
+ test("test mergetree insert with optimize basic") {
+ withSQLConf(
+ ("spark.databricks.delta.optimize.minFileSize" -> "200000000"),
+
("spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
-> "true")
+ ) {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS
lineitem_mergetree_insert_optimize_basic;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS
lineitem_mergetree_insert_optimize_basic
+ |USING clickhouse
+ |LOCATION
'$basePath/lineitem_mergetree_insert_optimize_basic'
+ | as select * from lineitem
+ |""".stripMargin)
+
+ val ret = spark.sql("select count(*) from
lineitem_mergetree_insert_optimize_basic").collect()
+ assert(ret.apply(0).get(0) == 600572)
+ eventually(timeout(60.seconds), interval(3.seconds)) {
+ assert(
+ new
File(s"$basePath/lineitem_mergetree_insert_optimize_basic").listFiles().length
== 2
+ )
+ }
+ }
+ }
}
// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
index d85217536..93f22baa2 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreePathBasedWriteSuite.scala
@@ -57,6 +57,9 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"100000")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+ "false")
}
override protected def createTPCHNotNullTables(): Unit = {
@@ -170,8 +173,8 @@ class GlutenClickHouseMergeTreePathBasedWriteSuite
.format("clickhouse")
.load(dataPath)
.where("l_shipdate = date'1998-09-02'")
- .collect()
- assert(result.apply(0).get(0) == 110501)
+ .count()
+ assert(result == 183)
}
test("test mergetree path based write with dataframe api") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 49011e031..ca5b39fff 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+ "false")
}
override protected def beforeEach(): Unit = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
deleted file mode 100644
index 90db0f5d8..000000000
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.execution
-
-import org.apache.gluten.GlutenConfig
-
-import org.apache.spark.sql.SparkSession
-
-import _root_.org.apache.spark.SparkConf
-import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.commons.io.FileUtils
-
-import java.io.File
-
-// Some sqls' line length exceeds 100
-// scalastyle:off line.size.limit
-
-class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
- extends GlutenClickHouseTPCHAbstractSuite
- with AdaptiveSparkPlanHelper {
- private var _spark: SparkSession = _
-
- override protected def spark: SparkSession = _spark
-
- override protected val needCopyParquetToTablePath = true
-
- override protected val tablesPath: String = basePath + "/tpch-data"
- override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
- override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
-
- override protected def initializeSession(): Unit = {
- if (_spark == null) {
- _spark = SparkSession
- .builder()
- .appName("Gluten-UT-RemoteHS")
- .config(sparkConf)
- .getOrCreate()
- }
- }
-
- override protected def sparkConf: SparkConf = {
- super.sparkConf
- .setMaster("local[2]")
- .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
- .set("spark.io.compression.codec", "LZ4")
- .set("spark.sql.shuffle.partitions", "5")
- .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
- .set("spark.sql.adaptive.enabled", "true")
- .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
- }
- override protected def createTPCHNotNullTables(): Unit = {
- createNotNullTPCHTablesInParquet(tablesPath)
- }
-
- override protected def afterAll(): Unit = {
- try {
- super.afterAll()
- } finally {
- try {
- if (_spark != null) {
- try {
- _spark.sessionState.catalog.reset()
- } finally {
- _spark.stop()
- _spark = null
- }
- }
- } finally {
- SparkSession.clearActiveSession()
- SparkSession.clearDefaultSession()
- }
- }
-
- FileUtils.forceDelete(new File(basePath))
- // init GlutenConfig in the next beforeAll
- GlutenConfig.ins = null
- }
-}
-// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 6f10035fa..439a1b58f 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -54,6 +54,9 @@ class GlutenClickHouseMergeTreeWriteSuite
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"100000")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+ "false")
}
override protected def createTPCHNotNullTables(): Unit = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
index 36002b7e5..a673d4ba3 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
@@ -58,6 +58,9 @@ class GlutenClickHouseTableAfterRestart
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"100000")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+ "false")
}
override protected def createTPCHNotNullTables(): Unit = {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index e455c956d..a891d6d10 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -161,6 +161,9 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
}
override def beforeAll(): Unit = {
+ // is not exist may cause some ut error
+ assert(new File("/data").exists())
+
// prepare working paths
val basePathDir = new File(basePath)
if (basePathDir.exists()) {
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index ba86fe306..9704b3041 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -323,34 +323,6 @@ std::string PlanUtil::explainPlan(DB::QueryPlan & plan)
return plan_str;
}
-std::vector<MergeTreeUtil::Path> MergeTreeUtil::getAllMergeTreeParts(const
Path & storage_path)
-{
- if (!fs::exists(storage_path))
- throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree
store path:{}", storage_path.string());
-
- // TODO: May need to check the storage format version
- std::vector<fs::path> res;
- for (const auto & entry : fs::directory_iterator(storage_path))
- {
- auto filename = entry.path().filename();
- if (filename == "format_version.txt" || filename == "detached" ||
filename == "_delta_log")
- continue;
- res.push_back(entry.path());
- }
- return res;
-}
-
-DB::NamesAndTypesList MergeTreeUtil::getSchemaFromMergeTreePart(const fs::path
& part_path)
-{
- DB::NamesAndTypesList names_types_list;
- if (!fs::exists(part_path))
- throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Invalid merge tree
store path:{}", part_path.string());
- DB::ReadBufferFromFile readbuffer((part_path / "columns.txt").string());
- names_types_list.readText(readbuffer);
- return names_types_list;
-}
-
-
NestedColumnExtractHelper::NestedColumnExtractHelper(const DB::Block & block_,
bool case_insentive_)
: block(block_), case_insentive(case_insentive_)
{
@@ -594,10 +566,21 @@ void
BackendInitializerUtil::initEnvs(DB::Context::ConfigurationPtr config)
spark_user = spark_user_c_str;
}
+DB::Field BackendInitializerUtil::toField(const String key, const String value)
+{
+ if (BOOL_VALUE_SETTINGS.contains(key))
+ return DB::Field(value == "true" || value == "1");
+ else if (LONG_VALUE_SETTINGS.contains(key))
+ return DB::Field(std::strtoll(value.c_str(), NULL, 10));
+ else
+ return DB::Field(value);
+}
+
void BackendInitializerUtil::initSettings(std::map<std::string, std::string> &
backend_conf_map, DB::Settings & settings)
{
/// Initialize default setting.
settings.set("date_time_input_format", "best_effort");
+ settings.set("mergetree.merge_after_insert", true);
for (const auto & [key, value] : backend_conf_map)
{
@@ -609,7 +592,8 @@ void
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
}
else if (key.starts_with(CH_RUNTIME_SETTINGS_PREFIX))
{
- settings.set(key.substr(CH_RUNTIME_SETTINGS_PREFIX.size()), value);
+ auto k = key.substr(CH_RUNTIME_SETTINGS_PREFIX.size());
+ settings.set(k, toField(k, value));
LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{}
value:{}", key, value);
}
else if (key.starts_with(SPARK_HADOOP_PREFIX + S3A_PREFIX))
@@ -624,6 +608,12 @@ void
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
// 4. fs.s3a.bucket.bucket_name.assumed.role.externalId (non
hadoop official)
settings.set(key.substr(SPARK_HADOOP_PREFIX.length()), value);
}
+ else if (key.starts_with(SPARK_DELTA_PREFIX))
+ {
+ auto k = key.substr(SPARK_DELTA_PREFIX.size());
+ settings.set(k, toField(k, value));
+ LOG_DEBUG(&Poco::Logger::get("CHUtil"), "Set settings key:{}
value:{}", key, value);
+ }
}
/// Finally apply some fixed kvs to settings.
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index 308e22422..574cdbe4c 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -32,6 +32,10 @@
namespace local_engine
{
+static const std::unordered_set<String>
BOOL_VALUE_SETTINGS{"mergetree.merge_after_insert"};
+static const std::unordered_set<String> LONG_VALUE_SETTINGS{
+ "optimize.maxfilesize", "optimize.minFileSize",
"mergetree.max_num_part_per_merge_task"};
+
class BlockUtil
{
public:
@@ -98,14 +102,6 @@ public:
static std::string explainPlan(DB::QueryPlan & plan);
};
-class MergeTreeUtil
-{
-public:
- using Path = std::filesystem::path;
- static std::vector<Path> getAllMergeTreeParts(const Path & storage_path);
- static DB::NamesAndTypesList getSchemaFromMergeTreePart(const Path &
part_path);
-};
-
class ActionsDAGUtil
{
public:
@@ -131,6 +127,8 @@ class JNIUtils;
class BackendInitializerUtil
{
public:
+ static DB::Field toField(const String key, const String value);
+
/// Initialize two kinds of resources
/// 1. global level resources like global_context/shared_context, notice
that they can only be initialized once in process lifetime
/// 2. session level resources like settings/configs, they can be
initialized multiple times following the lifetime of executor/driver
@@ -166,6 +164,7 @@ public:
inline static const std::string HADOOP_S3_CLIENT_CACHE_IGNORE =
"fs.s3a.client.cached.ignore";
inline static const std::string SPARK_HADOOP_PREFIX = "spark.hadoop.";
inline static const std::string S3A_PREFIX = "fs.s3a.";
+ inline static const std::string SPARK_DELTA_PREFIX =
"spark.databricks.delta.";
/// On yarn mode, native writing on hdfs cluster takes yarn container user
as the user passed to libhdfs3, which
/// will cause permission issue because yarn container user is not the
owner of the hdfs dir to be written.
@@ -228,4 +227,60 @@ public:
static UInt64 getMemoryRSS();
};
+template <typename T>
+class ConcurrentDeque
+{
+public:
+ std::optional<T> pop_front()
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+
+ if (deq.empty())
+ return {};
+
+ T t = deq.front();
+ deq.pop_front();
+ return t;
+ }
+
+ void emplace_back(T value)
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ deq.emplace_back(value);
+ }
+
+ void emplace_back(std::vector<T> values)
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ deq.insert(deq.end(), values.begin(), values.end());
+ }
+
+ void emplace_front(T value)
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ deq.emplace_front(value);
+ }
+
+ size_t size()
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ return deq.size();
+ }
+
+ bool empty()
+ {
+ std::lock_guard<std::mutex> lock(mtx);
+ return deq.empty();
+ }
+
+ std::deque<T> unsafeGet()
+ {
+ return deq;
+ }
+
+private:
+ std::deque<T> deq;
+ mutable std::mutex mtx;
+};
+
}
diff --git a/cpp-ch/local-engine/Common/QueryContext.cpp
b/cpp-ch/local-engine/Common/QueryContext.cpp
index 9d2ea7f26..c659e6f34 100644
--- a/cpp-ch/local-engine/Common/QueryContext.cpp
+++ b/cpp-ch/local-engine/Common/QueryContext.cpp
@@ -68,6 +68,7 @@ int64_t initializeQuery(ReservationListenerWrapperPtr
listener)
listener->reserve(size);
};
CurrentMemoryTracker::before_free = [listener](Int64 size) -> void {
listener->free(size); };
+ CurrentMemoryTracker::current_memory = [listener]() -> Int64 { return
listener->currentMemory(); };
allocator_map.insert(allocator_id, allocator_context);
return allocator_id;
}
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index cff32a83f..368015fb9 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -26,6 +26,8 @@ namespace DB
namespace ErrorCodes
{
extern const int DUPLICATE_DATA_PART;
+extern const int NO_SUCH_DATA_PART;
+
}
}
@@ -146,9 +148,9 @@ CustomStorageMergeTree::CustomStorageMergeTree(
std::atomic<int> CustomStorageMergeTree::part_num;
-MergeTreeData::MutableDataPartsVector
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string>
parts)
+std::vector<MergeTreeDataPartPtr>
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string>
parts)
{
- MutableDataPartsVector data_parts;
+ std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
for (const auto& name : parts)
{
@@ -158,14 +160,8 @@ MergeTreeData::MutableDataPartsVector
CustomStorageMergeTree::loadDataPartsWithN
data_parts.emplace_back(res.part);
}
- if(getStorageID().hasUUID())
- {
- // the following lines will modify storage's member.
- // So when current storage is shared (when UUID is default Nil value),
- // we should avoid modify because we don't have locks here
-
- calculateColumnAndSecondaryIndexSizesImpl(); // without it "test
mergetree optimize partitioned by one low card column" will log ERROR
- }
+ // without it "test mergetree optimize partitioned by one low card column"
will log ERROR
+ calculateColumnAndSecondaryIndexSizesImpl();
return data_parts;
}
@@ -246,6 +242,38 @@ MergeTreeData::LoadPartResult
CustomStorageMergeTree::loadDataPart(
return res;
}
+void CustomStorageMergeTree::removePartFromMemory(const
MergeTreeData::DataPartPtr & part_to_detach)
+{
+ auto lock = lockParts();
+ bool removed_active_part = false;
+ bool restored_active_part = false;
+
+ auto it_part = data_parts_by_info.find(part_to_detach->info);
+ if (it_part == data_parts_by_info.end())
+ {
+ LOG_DEBUG(log, "No such data part {}",
part_to_detach->getNameWithState());
+ return;
+ }
+
+ /// What if part_to_detach is a reference to *it_part? Make a new owner
just in case.
+ /// Important to own part pointer here (not const reference), because it
will be removed from data_parts_indexes
+ /// few lines below.
+ DataPartPtr part = *it_part; // NOLINT
+
+ if (part->getState() == DataPartState::Active)
+ {
+ removePartContributionToColumnAndSecondaryIndexSizes(part);
+ removed_active_part = true;
+ }
+
+ modifyPartState(it_part, DataPartState::Deleting);
+ LOG_TEST(log, "removePartFromMemory: removing {} from data_parts_indexes",
part->getNameWithState());
+ data_parts_indexes.erase(it_part);
+
+ if (removed_active_part || restored_active_part)
+ resetObjectColumnsFromActiveParts(lock);
+}
+
void CustomStorageMergeTree::dropPartNoWaitNoThrow(const String &
/*part_name*/)
{
throw std::runtime_error("not implement");
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
index 0aeee4ef9..cd507a3ac 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
@@ -53,8 +53,8 @@ public:
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
std::map<std::string, MutationCommands> getUnfinishedMutationCommands()
const override;
- MutableDataPartsVector
loadDataPartsWithNames(std::unordered_set<std::string> parts);
-
+ std::vector<MergeTreeDataPartPtr>
loadDataPartsWithNames(std::unordered_set<std::string> parts);
+ void removePartFromMemory(const MergeTreeData::DataPartPtr &
part_to_detach);
MergeTreeDataWriter writer;
MergeTreeDataSelectExecutor reader;
@@ -84,6 +84,7 @@ protected:
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
override;
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr &
/*part*/) const override { return {}; }
void attachRestoredParts(MutableDataPartsVector && /*parts*/) override {
throw std::runtime_error("not implement"); }
+
};
}
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
index 21c0fc968..57bb804fa 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
@@ -15,9 +15,13 @@
* limitations under the License.
*/
#include "MetaDataHelper.h"
+
#include <filesystem>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+#include <Parser/MergeTreeRelParser.h>
+#include <Storages/Mergetree/MergeSparkMergeTreeTask.h>
+#include <Poco/StringTokenizer.h>
namespace CurrentMetrics
{
@@ -123,6 +127,7 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage,
const MergeTreeTable &
void saveFileStatus(
const DB::MergeTreeData & storage,
const DB::ContextPtr& context,
+ const String & part_name,
IDataPartStorage & data_part_storage)
{
const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
@@ -142,5 +147,61 @@ void saveFileStatus(
}
out->finalize();
}
+
+ LOG_DEBUG(&Poco::Logger::get("MetaDataHelper"), "Save part {} metadata
success.", part_name);
+}
+
+
+std::vector<MergeTreeDataPartPtr> mergeParts(
+ std::vector<DB::DataPartPtr> selected_parts,
+ std::unordered_map<String, String> & partition_values,
+ const String & new_part_uuid,
+ CustomStorageMergeTreePtr storage,
+ const String & partition_dir,
+ const String & bucket_dir)
+{
+ auto future_part = std::make_shared<DB::FutureMergedMutatedPart>();
+ future_part->uuid = UUIDHelpers::generateV4();
+
+ future_part->assign(std::move(selected_parts));
+
+ future_part->name = "";
+ if(!partition_dir.empty())
+ {
+ future_part->name = partition_dir + "/";
+ extractPartitionValues(partition_dir, partition_values);
+ }
+ if(!bucket_dir.empty())
+ {
+ future_part->name = future_part->name + bucket_dir + "/";
+ }
+ future_part->name = future_part->name + new_part_uuid + "-merged";
+
+ auto entry = std::make_shared<DB::MergeMutateSelectedEntry>(future_part,
DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared<DB::MutationCommands>());
+
+ // Copying a vector of columns `deduplicate by columns.
+ DB::IExecutableTask::TaskResultCallback f = [](bool) {};
+ auto task = std::make_shared<local_engine::MergeSparkMergeTreeTask>(
+ *storage, storage->getInMemoryMetadataPtr(), false,
std::vector<std::string>{}, false, entry,
+ DB::TableLockHolder{}, f);
+
+ task->setCurrentTransaction(DB::MergeTreeTransactionHolder{},
DB::MergeTreeTransactionPtr{});
+
+ executeHere(task);
+
+ std::unordered_set<std::string> to_load{future_part->name};
+ std::vector<MergeTreeDataPartPtr> merged =
storage->loadDataPartsWithNames(to_load);
+ return merged;
+}
+
+void extractPartitionValues(const String & partition_dir,
std::unordered_map<String, String> & partition_values)
+{
+ Poco::StringTokenizer partitions(partition_dir, "/");
+ for (const auto & partition : partitions)
+ {
+ Poco::StringTokenizer key_value(partition, "=");
+ chassert(key_value.count() == 2);
+ partition_values.emplace(key_value[0], key_value[1]);
+ }
}
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
index b15a15322..7163ee02c 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
@@ -28,6 +28,16 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage,
const MergeTreeTable &
void saveFileStatus(
const DB::MergeTreeData & storage,
const DB::ContextPtr& context,
+ const String & part_name,
IDataPartStorage & data_part_storage);
+std::vector<MergeTreeDataPartPtr> mergeParts(
+ std::vector<DB::DataPartPtr> selected_parts,
+ std::unordered_map<String, String> & partition_values,
+ const String & new_part_uuid,
+ CustomStorageMergeTreePtr storage,
+ const String & partition_dir,
+ const String & bucket_dir);
+
+void extractPartitionValues(const String & partition_dir,
std::unordered_map<String, String> & partition_values);
}
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index 40e716a56..c709a5f24 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -16,12 +16,24 @@
*/
#include "SparkMergeTreeWriter.h"
-#include <Disks/createVolume.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+#include <Disks/createVolume.h>
#include <Interpreters/ActionsDAG.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
-#include <rapidjson/prettywriter.h>
#include <Storages/Mergetree/MetaDataHelper.h>
+#include <rapidjson/prettywriter.h>
+#include <Common/CHUtil.h>
+
+
+namespace CurrentMetrics
+{
+extern const Metric LocalThread;
+extern const Metric LocalThreadActive;
+extern const Metric LocalThreadScheduled;
+extern const Metric GlobalThread;
+extern const Metric GlobalThreadActive;
+extern const Metric GlobalThreadScheduled;
+}
using namespace DB;
@@ -42,6 +54,43 @@ Block removeColumnSuffix(const DB::Block & block)
return Block(columns);
}
+SparkMergeTreeWriter::SparkMergeTreeWriter(
+ CustomStorageMergeTreePtr storage_,
+ const DB::StorageMetadataPtr & metadata_snapshot_,
+ const DB::ContextPtr & context_,
+ const String & uuid_,
+ const String & partition_dir_,
+ const String & bucket_dir_)
+ : storage(storage_)
+ , metadata_snapshot(metadata_snapshot_)
+ , context(context_)
+ , uuid(uuid_)
+ , partition_dir(partition_dir_)
+ , bucket_dir(bucket_dir_)
+ , thread_pool(CurrentMetrics::LocalThread,
CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1,
100000)
+{
+ const DB::Settings & settings = context->getSettingsRef();
+ squashing_transform
+ =
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows,
settings.min_insert_block_size_bytes);
+ if (!partition_dir.empty())
+ {
+ extractPartitionValues(partition_dir, partition_values);
+ }
+ header = metadata_snapshot->getSampleBlock();
+
+ Field is_merge;
+ if (context->getSettings().tryGet("mergetree.merge_after_insert",
is_merge))
+ merge_after_insert = is_merge.get<bool>();
+
+ Field limit_size_field;
+ if (context->getSettings().tryGet("optimize.minFileSize",
limit_size_field))
+ merge_min_size = limit_size_field.get<Int64>() <= 0 ? merge_min_size :
limit_size_field.get<Int64>();
+
+ Field limit_cnt_field;
+ if (context->getSettings().tryGet("mergetree.max_num_part_per_merge_task",
limit_cnt_field))
+ merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ?
merge_limit_parts : limit_cnt_field.get<Int64>();
+}
+
void SparkMergeTreeWriter::write(DB::Block & block)
{
auto new_block = removeColumnSuffix(block);
@@ -52,11 +101,57 @@ void SparkMergeTreeWriter::write(DB::Block & block)
do_convert.execute(new_block);
}
- auto blocks_with_partition =
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block),
10, metadata_snapshot, context);
- for (auto & item : blocks_with_partition)
+ auto blocks_with_partition
+ =
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block),
10, metadata_snapshot, context);
+ for (auto & item : blocks_with_partition)
+ {
+ size_t before_write_memory = 0;
+ if (auto * memory_tracker = CurrentThread::getMemoryTracker())
{
- new_parts.emplace_back(writeTempPartAndFinalize(item,
metadata_snapshot).part);
+ CurrentThread::flushUntrackedMemory();
+ before_write_memory = memory_tracker->get();
+ }
+
+ new_parts.emplace_back(writeTempPartAndFinalize(item,
metadata_snapshot).part);
part_num++;
+ manualFreeMemory(before_write_memory);
+ /// Reset earlier to free memory
+ item.block.clear();
+ item.partition.clear();
+ }
+
+ if (!blocks_with_partition.empty() && merge_after_insert)
+ checkAndMerge();
+}
+
+void SparkMergeTreeWriter::manualFreeMemory(size_t before_write_memory)
+{
+ // If mergetree disk is not local fs, like remote fs s3 or hdfs,
+ // it may alloc memory in current thread, and free on global thread.
+ // Now, wo have not idea to clear global memory by used spark thread
tracker.
+ // So we manually correct the memory usage.
+ auto disk = storage->getStoragePolicy()->getAnyDisk();
+ if (!disk->isRemote())
+ return;
+
+ std::lock_guard lock(memory_mutex);
+ auto * memory_tracker = CurrentThread::getMemoryTracker();
+ if (memory_tracker && CurrentMemoryTracker::before_free)
+ {
+ CurrentThread::flushUntrackedMemory();
+ const size_t ch_alloc = memory_tracker->get();
+ if (disk->getName().contains("s3") &&
context->getSettings().s3_allow_parallel_part_upload && ch_alloc >
before_write_memory)
+ {
+ const size_t diff_ch_alloc = before_write_memory - ch_alloc;
+ memory_tracker->adjustWithUntrackedMemory(diff_ch_alloc);
+ }
+
+ const size_t a = memory_tracker->get();
+ const size_t spark_alloc = CurrentMemoryTracker::current_memory();
+ const size_t diff_alloc = spark_alloc - memory_tracker->get();
+
+ if (diff_alloc > 0)
+ CurrentMemoryTracker::before_free(diff_alloc);
}
}
@@ -67,7 +162,61 @@ void SparkMergeTreeWriter::finalize()
{
auto blocks_with_partition =
MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10,
metadata_snapshot, context);
for (auto & item : blocks_with_partition)
+ {
+ size_t before_write_memory = 0;
+ if (auto * memory_tracker = CurrentThread::getMemoryTracker())
+ {
+ CurrentThread::flushUntrackedMemory();
+ before_write_memory = memory_tracker->get();
+ }
+
new_parts.emplace_back(writeTempPartAndFinalize(item,
metadata_snapshot).part);
+ part_num++;
+ manualFreeMemory(before_write_memory);
+ /// Reset earlier to free memory
+ item.block.clear();
+ item.partition.clear();
+ }
+ }
+
+ SCOPE_EXIT({
+ for (auto merge_tree_data_part : new_parts.unsafeGet())
+ saveFileStatus(
+ *storage, context, merge_tree_data_part->name,
const_cast<IDataPartStorage &>(merge_tree_data_part->getDataPartStorage()));
+ });
+
+ if (!merge_after_insert)
+ return;
+
+ // wait all merge task end and do final merge
+ thread_pool.wait();
+
+ size_t before_merge_size;
+ do
+ {
+ before_merge_size = new_parts.size();
+ checkAndMerge(true);
+ thread_pool.wait();
+ } while (before_merge_size != new_parts.size());
+
+ std::unordered_set<String> final_parts;
+ for (auto merge_tree_data_part : new_parts.unsafeGet())
+ final_parts.emplace(merge_tree_data_part->name);
+
+ for (const auto & tmp_part : tmp_parts)
+ {
+ if (final_parts.contains(tmp_part))
+ continue;
+
+ GlobalThreadPool::instance().scheduleOrThrow(
+ [&]() -> void
+ {
+ for (auto disk : storage->getDisks())
+ {
+ auto full_path = storage->getFullPathOnDisk(disk);
+ disk->removeRecursive(full_path + "/" + tmp_part);
+ }
+ });
}
}
@@ -76,16 +225,15 @@ SparkMergeTreeWriter::writeTempPartAndFinalize(
DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot)
{
- auto temp_part = writeTempPart(block_with_partition, metadata_snapshot);
+ MergeTreeDataWriter::TemporaryPart temp_part;
+ writeTempPart(temp_part, block_with_partition, metadata_snapshot);
temp_part.finalize();
- saveFileStatus(storage, context, temp_part.part->getDataPartStorage());
return temp_part;
}
-MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart(
+void SparkMergeTreeWriter::writeTempPart(MergeTreeDataWriter::TemporaryPart &
temp_part,
BlockWithPartition & block_with_partition, const StorageMetadataPtr &
metadata_snapshot)
{
- MergeTreeDataWriter::TemporaryPart temp_part;
Block & block = block_with_partition.block;
auto columns =
metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
@@ -95,7 +243,7 @@ MergeTreeDataWriter::TemporaryPart
SparkMergeTreeWriter::writeTempPart(
column.type = block.getByName(column.name).type;
auto minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
- minmax_idx->update(block,
storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
+ minmax_idx->update(block,
storage->getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
MergeTreePartition partition(block_with_partition.partition);
@@ -121,13 +269,13 @@ MergeTreeDataWriter::TemporaryPart
SparkMergeTreeWriter::writeTempPart(
String part_name = part_dir;
- temp_part.temporary_directory_lock =
storage.getTemporaryPartDirectoryHolder(part_dir);
+ temp_part.temporary_directory_lock =
storage->getTemporaryPartDirectoryHolder(part_dir);
auto indices =
MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() ||
metadata_snapshot->hasSecondaryIndices())
- storage.getSortingKeyAndSkipIndicesExpression(metadata_snapshot,
indices)->execute(block);
+ storage->getSortingKeyAndSkipIndicesExpression(metadata_snapshot,
indices)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
@@ -157,19 +305,19 @@ MergeTreeDataWriter::TemporaryPart
SparkMergeTreeWriter::writeTempPart(
/// If optimize_on_insert is true, block may become empty after merge.
/// There is no need to create empty part.
if (expected_size == 0)
- return temp_part;
+ return;
- VolumePtr volume = storage.getStoragePolicy()->getVolume(0);
+ VolumePtr volume = storage->getStoragePolicy()->getVolume(0);
VolumePtr data_part_volume =
std::make_shared<SingleDiskVolume>(volume->getName(), volume->getDisk(),
volume->max_data_part_size);
- auto new_data_part = storage.getDataPartBuilder(part_name,
data_part_volume, part_dir)
-
.withPartFormat(storage.choosePartFormat(expected_size, block.rows()))
+ auto new_data_part = storage->getDataPartBuilder(part_name,
data_part_volume, part_dir)
+
.withPartFormat(storage->choosePartFormat(expected_size, block.rows()))
.withPartInfo(new_part_info)
.build();
auto data_part_storage = new_data_part->getDataPartStoragePtr();
- const auto & data_settings = storage.getSettings();
+ const auto & data_settings = storage->getSettings();
SerializationInfo::Settings
settings{data_settings->ratio_of_defaults_for_sparse_serialization, true};
SerializationInfoByName infos(columns, settings);
@@ -194,7 +342,7 @@ MergeTreeDataWriter::TemporaryPart
SparkMergeTreeWriter::writeTempPart(
data_part_storage->createDirectories();
- if (storage.getSettings()->fsync_part_directory)
+ if (storage->getSettings()->fsync_part_directory)
{
const auto disk = data_part_volume->getDisk();
sync_guard = disk->getDirectorySyncGuard(full_path);
@@ -203,7 +351,7 @@ MergeTreeDataWriter::TemporaryPart
SparkMergeTreeWriter::writeTempPart(
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on
absolute and relative part size.
- auto compression_codec = storage.getContext()->chooseCompressionCodec(0,
0);
+ auto compression_codec = storage->getContext()->chooseCompressionCodec(0,
0);
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
@@ -218,21 +366,24 @@ MergeTreeDataWriter::TemporaryPart
SparkMergeTreeWriter::writeTempPart(
context->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
-
-
auto finalizer = out->finalizePartAsync(new_data_part,
data_settings->fsync_after_insert, nullptr, nullptr);
temp_part.part = new_data_part;
-
temp_part.streams.emplace_back(MergeTreeDataWriter::TemporaryPart::Stream{.stream
= std::move(out), .finalizer = std::move(finalizer)});
-
- return temp_part;
+ temp_part.streams.emplace_back(
+ MergeTreeDataWriter::TemporaryPart::Stream{.stream = std::move(out),
.finalizer = std::move(finalizer)});
}
std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo()
{
std::vector<PartInfo> res;
- for (const MergeTreeDataPartPtr & part : new_parts)
- res.emplace_back(PartInfo{part->name, part->getMarksCount(),
part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir});
+ res.reserve(new_parts.size());
+
+ for (auto part : new_parts.unsafeGet())
+ {
+ res.emplace_back(
+ PartInfo{part->name, part->getMarksCount(),
part->getBytesOnDisk(), part->rows_count, partition_values, bucket_dir});
+ }
+
return res;
}
@@ -268,4 +419,90 @@ String SparkMergeTreeWriter::partInfosToJson(const
std::vector<PartInfo> & part_
return result.GetString();
}
+void SparkMergeTreeWriter::checkAndMerge(bool force)
+{
+ // Only finalize should force merge.
+ if (!force && new_parts.size() < merge_limit_parts)
+ return;
+
+ auto doTask = [this](
+ const ThreadGroupPtr & thread_group,
+ const std::vector<MergeTreeDataPartPtr>
prepare_merge_parts,
+ CustomStorageMergeTreePtr & storage,
+ String & partition_dir,
+ String & bucket_dir) -> std::vector<MergeTreeDataPartPtr>
+ {
+ setThreadName("InsertWithMerge");
+ ThreadStatus thread_status;
+ thread_status.attachToGroup(thread_group);
+
+ size_t before_size = 0;
+ size_t after_size = 0;
+ for (const auto & prepare_merge_part : prepare_merge_parts)
+ before_size += prepare_merge_part->getBytesOnDisk();
+
+ std::unordered_map<String, String> partition_values;
+ auto merged_parts
+ = mergeParts(prepare_merge_parts, partition_values,
toString(UUIDHelpers::generateV4()), storage, partition_dir, bucket_dir);
+ for (const auto & merge_tree_data_part : merged_parts)
+ after_size += merge_tree_data_part->getBytesOnDisk();
+
+ LOG_DEBUG(
+ &Poco::Logger::get("SparkMergeTreeWriter"),
+ "Mergetree merge on insert finished, before merge part size {},
part count {}, after part size {}, part count {}.",
+ before_size,
+ prepare_merge_parts.size(),
+ after_size,
+ merged_parts.size());
+
+ return merged_parts;
+ };
+
+ std::vector<MergeTreeDataPartPtr> selected_parts;
+ selected_parts.reserve(merge_limit_parts);
+ size_t totol_size = 0;
+ std::vector<MergeTreeDataPartPtr> skip_parts;
+
+ while (const auto merge_tree_data_part_option = new_parts.pop_front())
+ {
+ auto merge_tree_data_part = merge_tree_data_part_option.value();
+ if (merge_tree_data_part->getBytesOnDisk() >= merge_min_size)
+ {
+ skip_parts.emplace_back(merge_tree_data_part);
+ continue;
+ }
+
+ selected_parts.emplace_back(merge_tree_data_part);
+ totol_size += merge_tree_data_part->getBytesOnDisk();
+ if (merge_min_size > totol_size && merge_limit_parts >
selected_parts.size())
+ continue;
+
+ for (auto selected_part : selected_parts)
+ {
+ tmp_parts.emplace(selected_part->name);
+ }
+
+ thread_pool.scheduleOrThrow([this, doTask, selected_parts,
thread_group = CurrentThread::getGroup()]() -> void
+ { new_parts.emplace_back(doTask(thread_group,
selected_parts, storage, partition_dir, bucket_dir)); });
+ selected_parts.clear();
+ totol_size = 0;
+ }
+
+ if (!selected_parts.empty())
+ {
+ if (force && selected_parts.size() > 1)
+ {
+ for (auto selected_part : selected_parts)
+ tmp_parts.emplace(selected_part->name);
+ thread_pool.scheduleOrThrow(
+ [this, doTask, selected_parts, thread_group =
CurrentThread::getGroup()]() -> void
+ { new_parts.emplace_back(doTask(thread_group, selected_parts,
storage, partition_dir, bucket_dir)); });
+ }
+ else
+ new_parts.emplace_back(selected_parts);
+ }
+
+ new_parts.emplace_back(skip_parts);
+}
+
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 000d009fe..5251d4cc4 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -19,7 +19,9 @@
#include <Interpreters/SquashingTransform.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
+#include <Storages/StorageMergeTreeFactory.h>
#include <Poco/StringTokenizer.h>
+#include <Common/CHUtil.h>
namespace DB
{
@@ -40,6 +42,8 @@ struct PartInfo
size_t row_count;
std::unordered_map<String, String> partition_values;
String bucket_id;
+
+ bool operator<(const PartInfo & rhs) const { return disk_size <
rhs.disk_size; }
};
class SparkMergeTreeWriter
@@ -47,56 +51,44 @@ class SparkMergeTreeWriter
public:
static String partInfosToJson(const std::vector<PartInfo> & part_infos);
SparkMergeTreeWriter(
- DB::MergeTreeData & storage_,
+ CustomStorageMergeTreePtr storage_,
const DB::StorageMetadataPtr & metadata_snapshot_,
const DB::ContextPtr & context_,
const String & uuid_,
const String & partition_dir_ = "",
- const String & bucket_dir_ = "")
- : storage(storage_)
- , metadata_snapshot(metadata_snapshot_)
- , context(context_)
- , uuid(uuid_)
- , partition_dir(partition_dir_)
- , bucket_dir(bucket_dir_)
- {
- const DB::Settings & settings = context->getSettingsRef();
- squashing_transform
- =
std::make_unique<DB::SquashingTransform>(settings.min_insert_block_size_rows,
settings.min_insert_block_size_bytes);
- if (!partition_dir.empty())
- {
- Poco::StringTokenizer partitions(partition_dir, "/");
- for (const auto & partition : partitions)
- {
- Poco::StringTokenizer key_value(partition, "=");
- chassert(key_value.count() == 2);
- partition_values.emplace(key_value[0], key_value[1]);
- }
- }
- header = metadata_snapshot->getSampleBlock();
- }
+ const String & bucket_dir_ = "");
void write(DB::Block & block);
void finalize();
std::vector<PartInfo> getAllPartInfo();
private:
- DB::MergeTreeDataWriter::TemporaryPart
- writeTempPart(DB::BlockWithPartition & block_with_partition, const
DB::StorageMetadataPtr & metadata_snapshot);
+ void
+ writeTempPart(MergeTreeDataWriter::TemporaryPart & temp_part,
DB::BlockWithPartition & block_with_partition, const DB::StorageMetadataPtr &
metadata_snapshot);
DB::MergeTreeDataWriter::TemporaryPart
writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot);
+ void checkAndMerge(bool force = false);
+ void safeEmplaceBackPart(DB::MergeTreeDataPartPtr);
+ void safeAddPart(DB::MergeTreeDataPartPtr);
+ void manualFreeMemory(size_t before_write_memory);
String uuid;
String partition_dir;
String bucket_dir;
- DB::MergeTreeData & storage;
+ CustomStorageMergeTreePtr storage;
DB::StorageMetadataPtr metadata_snapshot;
DB::ContextPtr context;
std::unique_ptr<DB::SquashingTransform> squashing_transform;
int part_num = 1;
- std::vector<DB::MergeTreeDataPartPtr> new_parts;
+ ConcurrentDeque<DB::MergeTreeDataPartPtr> new_parts;
std::unordered_map<String, String> partition_values;
+ std::unordered_set<String> tmp_parts;
DB::Block header;
+ bool merge_after_insert;
+ FreeThreadPool thread_pool;
+ size_t merge_min_size = 1024 * 1024 * 1024;
+ size_t merge_limit_parts = 10;
+ std::mutex memory_mutex;
};
}
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
index b5c5f8cd0..65b29c2a2 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.cpp
@@ -24,6 +24,7 @@ jclass ReservationListenerWrapper::reservation_listener_class
= nullptr;
jmethodID ReservationListenerWrapper::reservation_listener_reserve = nullptr;
jmethodID ReservationListenerWrapper::reservation_listener_reserve_or_throw =
nullptr;
jmethodID ReservationListenerWrapper::reservation_listener_unreserve = nullptr;
+jmethodID ReservationListenerWrapper::reservation_listener_currentMemory =
nullptr;
ReservationListenerWrapper::ReservationListenerWrapper(jobject listener_) :
listener(listener_)
{
@@ -56,4 +57,12 @@ void ReservationListenerWrapper::free(int64_t size)
safeCallVoidMethod(env, listener, reservation_listener_unreserve, size);
CLEAN_JNIENV
}
+
+size_t ReservationListenerWrapper::currentMemory()
+{
+ GET_JNIENV(env)
+ int64_t res = safeCallLongMethod(env, listener,
reservation_listener_currentMemory);
+ return res;
+ CLEAN_JNIENV
+}
}
diff --git a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
index 93efd497d..1dfb3671f 100644
--- a/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
+++ b/cpp-ch/local-engine/jni/ReservationListenerWrapper.h
@@ -28,12 +28,16 @@ public:
static jmethodID reservation_listener_reserve;
static jmethodID reservation_listener_reserve_or_throw;
static jmethodID reservation_listener_unreserve;
+ static jmethodID reservation_listener_currentMemory;
explicit ReservationListenerWrapper(jobject listener);
~ReservationListenerWrapper();
void reserve(int64_t size);
void reserveOrThrow(int64_t size);
void free(int64_t size);
+ size_t currentMemory();
+
+
private:
jobject listener;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 34b81d7a1..7baad210e 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -186,6 +186,9 @@ JNIEXPORT jint JNI_OnLoad(JavaVM * vm, void * /*reserved*/)
= local_engine::GetMethodID(env,
local_engine::ReservationListenerWrapper::reservation_listener_class,
"reserveOrThrow", "(J)V");
local_engine::ReservationListenerWrapper::reservation_listener_unreserve
= local_engine::GetMethodID(env,
local_engine::ReservationListenerWrapper::reservation_listener_class,
"unreserve", "(J)J");
+
local_engine::ReservationListenerWrapper::reservation_listener_currentMemory
+ = local_engine::GetMethodID(env,
local_engine::ReservationListenerWrapper::reservation_listener_class,
"currentMemory", "()J");
+
native_metrics_class = local_engine::CreateGlobalClassReference(env,
"Lorg/apache/gluten/metrics/NativeMetrics;");
native_metrics_constructor = local_engine::GetMethodID(env,
native_metrics_class, "<init>", "(Ljava/lang/String;)V");
@@ -997,9 +1000,26 @@ JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
}
JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper(
- JNIEnv * env, jobject, jbyteArray plan_, jbyteArray split_info_, jstring
uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_)
+ JNIEnv * env,
+ jobject,
+ jbyteArray plan_,
+ jbyteArray split_info_,
+ jstring uuid_,
+ jstring task_id_,
+ jstring partition_dir_,
+ jstring bucket_dir_,
+ jbyteArray conf_plan,
+ jlong allocator_id)
{
LOCAL_ENGINE_JNI_METHOD_START
+ auto query_context =
local_engine::getAllocator(allocator_id)->query_context;
+ // by task update new configs ( in case of dynamic config update )
+ jsize conf_plan_buf_size = env->GetArrayLength(conf_plan);
+ jbyte * conf_plan_buf_addr = env->GetByteArrayElements(conf_plan, nullptr);
+ std::string conf_plan_str;
+ conf_plan_str.assign(reinterpret_cast<const char *>(conf_plan_buf_addr),
conf_plan_buf_size);
+ local_engine::BackendInitializerUtil::updateConfig(query_context,
&conf_plan_str);
+
const auto uuid_str = jstring2string(env, uuid_);
const auto task_id = jstring2string(env, task_id_);
const auto partition_dir = jstring2string(env, partition_dir_);
@@ -1035,10 +1055,11 @@ JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
extension_table, local_engine::SerializedPlanParser::global_context);
auto uuid = uuid_str + "_" + task_id;
auto * writer = new local_engine::SparkMergeTreeWriter(
- *storage, storage->getInMemoryMetadataPtr(),
local_engine::SerializedPlanParser::global_context, uuid, partition_dir,
bucket_dir);
+ storage, storage->getInMemoryMetadataPtr(), query_context, uuid,
partition_dir, bucket_dir);
env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
env->ReleaseByteArrayElements(split_info_, split_info_addr, JNI_ABORT);
+ env->ReleaseByteArrayElements(conf_plan, conf_plan_buf_addr, JNI_ABORT);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
@@ -1180,53 +1201,20 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
auto storage_factory = local_engine::StorageMergeTreeFactory::instance();
std::vector<DB::DataPartPtr> selected_parts =
storage_factory.getDataParts(table_id, merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
- auto future_part = std::make_shared<DB::FutureMergedMutatedPart>();
- future_part->uuid = DB::UUIDHelpers::generateV4();
-
- future_part->assign(std::move(selected_parts));
-
- future_part->name = "";
std::unordered_map<String, String> partition_values;
- if(!partition_dir.empty())
- {
- future_part->name = partition_dir + "/";
- Poco::StringTokenizer partitions(partition_dir, "/");
- for (const auto & partition : partitions)
- {
- Poco::StringTokenizer key_value(partition, "=");
- chassert(key_value.count() == 2);
- partition_values.emplace(key_value[0], key_value[1]);
- }
- }
- if(!bucket_dir.empty())
- {
- future_part->name = future_part->name + bucket_dir + "/";
- }
- future_part->name = future_part->name + uuid_str + "-merged";
-
- auto entry = std::make_shared<DB::MergeMutateSelectedEntry>(future_part,
DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared<DB::MutationCommands>());
-
-
- // Copying a vector of columns `deduplicate by columns.
- DB::IExecutableTask::TaskResultCallback f = [](bool) {};
- auto task = std::make_shared<local_engine::MergeSparkMergeTreeTask>(
- *storage, storage->getInMemoryMetadataPtr(), false,
std::vector<std::string>{}, false, entry,
- DB::TableLockHolder{}, f);
-
- task->setCurrentTransaction(DB::MergeTreeTransactionHolder{},
DB::MergeTreeTransactionPtr{});
-
- executeHere(task);
+ std::vector<MergeTreeDataPartPtr> loaded =
+ local_engine::mergeParts(selected_parts, partition_values, uuid_str,
storage, partition_dir, bucket_dir);
- std::unordered_set<std::string> to_load{future_part->name};
- std::vector<std::shared_ptr<DB::IMergeTreeDataPart>> loaded =
storage->loadDataPartsWithNames(to_load);
std::vector<local_engine::PartInfo> res;
for (auto & partPtr : loaded)
{
- local_engine::saveFileStatus(*storage,
local_engine::SerializedPlanParser::global_context,
partPtr->getDataPartStorage());
- res.emplace_back(
- local_engine::PartInfo{partPtr->name, partPtr->getMarksCount(),
partPtr->getBytesOnDisk(), partPtr->rows_count,
- /*partition_value*/ partition_values,
- bucket_dir});
+ saveFileStatus(
+ *storage,
+ local_engine::SerializedPlanParser::global_context,
+ partPtr->name,
+ const_cast<IDataPartStorage &>(partPtr->getDataPartStorage()));
+ res.emplace_back(local_engine::PartInfo{
+ partPtr->name, partPtr->getMarksCount(),
partPtr->getBytesOnDisk(), partPtr->rows_count, partition_values, bucket_dir});
}
auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]