This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a7c9567bc [CH] Support rocksdb disk metadata (#7239)
a7c9567bc is described below
commit a7c9567bc7acdd14c8474c3d2dc99d1cfd062b56
Author: LiuNeng <[email protected]>
AuthorDate: Sat Sep 14 16:17:09 2024 +0800
[CH] Support rocksdb disk metadata (#7239)
What changes were proposed in this pull request?
Added rocksdb-based disk metadata management to solve the problem of too
many small mergetree disk metadata files. It also supports automatic cleanup of
expired data.
How was this patch tested?
unit tests
---
...eMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala | 660 +++++++++++++++++++++
...lutenClickHouseWholeStageTransformerSuite.scala | 30 +
cpp-ch/CMakeLists.txt | 19 +-
.../CompactObjectStorageDiskTransaction.cpp | 25 -
.../ObjectStorages/MetadataStorageFromRocksDB.cpp | 316 ++++++++++
.../ObjectStorages/MetadataStorageFromRocksDB.h | 97 +++
...dataStorageFromRocksDBTransactionOperations.cpp | 177 ++++++
...tadataStorageFromRocksDBTransactionOperations.h | 135 +++++
cpp-ch/local-engine/Disks/registerGlutenDisks.cpp | 30 +-
.../Storages/MergeTree/MetaDataHelper.cpp | 135 ++++-
.../Storages/MergeTree/MetaDataHelper.h | 4 +
11 files changed, 1571 insertions(+), 57 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
new file mode 100644
index 000000000..6454f155b
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite.scala
@@ -0,0 +1,660 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import java.io.File
+
+import scala.concurrent.duration.DurationInt
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteOnHDFSWithRocksDBMetaSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert",
+ "false")
+
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ val conf = new Configuration
+ conf.set("fs.defaultFS", HDFS_URL)
+ val fs = FileSystem.get(conf)
+ fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+ }
+
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ }
+
+ test("test mergetree table write") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(1)(addFiles.size)
+ assertResult(600572)(addFiles.head.rows)
+ }
+ spark.sql("drop table lineitem_mergetree_hdfs")
+ }
+
+ test("test mergetree write with orderby keys / primary keys") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb',
+ | orderByKey='l_shipdate,l_orderkey',
+ | primaryKey='l_shipdate')
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_orderbykey_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_orderbykey_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assertResult("l_shipdate,l_orderkey")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(","))
+ assertResult("l_shipdate")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .get
+ .mkString(","))
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assertResult(1)(addFiles.size)
+ assertResult(600572)(addFiles.head.rows)
+ }
+ spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
+ }
+
+ test("test mergetree write with partition") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb',
+ | orderByKey='l_orderkey',
+ | primaryKey='l_orderkey')
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs'
+ |""".stripMargin)
+
+ // dynamic partitions
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_partition_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+
+ // write with dataframe api
+ val source = spark.sql(s"""
+ |select
+ | l_orderkey ,
+ | l_partkey ,
+ | l_suppkey ,
+ | l_linenumber ,
+ | l_quantity ,
+ | l_extendedprice ,
+ | l_discount ,
+ | l_tax ,
+ | l_returnflag ,
+ | l_linestatus ,
+ | l_shipdate ,
+ | l_commitdate ,
+ | l_receiptdate ,
+ | l_shipinstruct ,
+ | l_shipmode ,
+ | l_comment
+ | from lineitem
+ | where l_shipdate BETWEEN date'1993-01-01' AND
date'1993-01-10'
+ |""".stripMargin)
+
+ source.write
+ .format("clickhouse")
+ .mode(SaveMode.Append)
+ .insertInto("lineitem_mergetree_partition_hdfs")
+
+ // static partition
+ spark.sql(s"""
+ | insert into lineitem_mergetree_partition_hdfs PARTITION
(l_returnflag = 'A')
+ | (l_shipdate,
+ | l_orderkey,
+ | l_partkey,
+ | l_suppkey,
+ | l_linenumber,
+ | l_quantity,
+ | l_extendedprice,
+ | l_discount,
+ | l_tax,
+ | l_linestatus,
+ | l_commitdate,
+ | l_receiptdate,
+ | l_shipinstruct,
+ | l_shipmode,
+ | l_comment)
+ | select
+ | l_shipdate,
+ | l_orderkey,
+ | l_partkey,
+ | l_suppkey,
+ | l_linenumber,
+ | l_quantity,
+ | l_extendedprice,
+ | l_discount,
+ | l_tax,
+ | l_linestatus,
+ | l_commitdate,
+ | l_receiptdate,
+ | l_shipinstruct,
+ | l_shipmode,
+ | l_comment from lineitem
+ | where l_returnflag = 'A'
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_partition_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
+ df =>
+ val result = df.collect()
+ assertResult(4)(result.length)
+ assertResult("A")(result(0).getString(0))
+ assertResult("F")(result(0).getString(1))
+ assertResult(7578058.0)(result(0).getDouble(2))
+
+ assertResult("N")(result(2).getString(0))
+ assertResult("O")(result(2).getString(1))
+ assertResult(7454519.0)(result(2).getDouble(2))
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+ assertResult(6)(mergetreeScan.metrics("numFiles").value)
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assertResult("l_orderkey")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(","))
+ assertResult("l_orderkey")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .get
+ .mkString(","))
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .partitionColumns
+ .head)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assertResult(6)(addFiles.size)
+ assertResult(750735)(addFiles.map(_.rows).sum)
+ }
+ spark.sql("drop table lineitem_mergetree_partition_hdfs")
+ }
+
+ test("test mergetree write with bucket table") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |CLUSTERED BY (l_orderkey)
+ |${if (sparkVersion.equals("3.2")) "" else "SORTED BY
(l_partkey)"} INTO 4 BUCKETS
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_bucket_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_bucket_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
+ if (sparkVersion.equals("3.2")) {
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+ } else {
+ assertResult("l_partkey")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(","))
+ }
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .partitionColumns
+ .head)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assertResult(12)(addFiles.size)
+ assertResult(600572)(addFiles.map(_.rows).sum)
+ }
+ spark.sql("drop table lineitem_mergetree_bucket_hdfs purge")
+ }
+
+ test("test mergetree write with the path based") {
+ val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
+
+ val sourceDF = spark.sql(s"""
+ |select * from lineitem
+ |""".stripMargin)
+
+ sourceDF.write
+ .format("clickhouse")
+ .mode(SaveMode.Append)
+ .partitionBy("l_returnflag")
+ .option("clickhouse.orderByKey", "l_orderkey")
+ .option("clickhouse.primaryKey", "l_orderkey")
+ .option("clickhouse.numBuckets", "4")
+ .option("clickhouse.bucketColumnNames", "l_orderkey")
+ .option("clickhouse.storage_policy", "__hdfs_main_rocksdb")
+ .save(dataPath)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | clickhouse.`$dataPath`
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assertResult(1)(scanExec.size)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isDefined)
+ assertResult("l_orderkey")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(","))
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.nonEmpty)
+
assertResult(1)(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size)
+ assertResult("l_returnflag")(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .partitionColumns
+ .head)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assertResult(12)(addFiles.size)
+ assertResult(600572)(addFiles.map(_.rows).sum)
+ }
+
+ val result = spark.read
+ .format("clickhouse")
+ .load(dataPath)
+ .count()
+ assertResult(600572)(result)
+ }
+
+ test("test mergetree insert with optimize basic") {
+ val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
+ val dataPath = s"$HDFS_URL/test/$tableName"
+
+ withSQLConf(
+ "spark.databricks.delta.optimize.minFileSize" -> "200000000",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
-> "true",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage"
-> "true",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows"
-> "10000"
+ ) {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS $tableName;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS $tableName
+ |USING clickhouse
+ |LOCATION '$dataPath'
+ |TBLPROPERTIES (storage_policy='__hdfs_main_rocksdb')
+ | as select * from lineitem
+ |""".stripMargin)
+
+ val ret = spark.sql(s"select count(*) from $tableName").collect()
+ assertResult(600572)(ret.apply(0).get(0))
+ val conf = new Configuration
+ conf.set("fs.defaultFS", HDFS_URL)
+ val fs = FileSystem.get(conf)
+
+ eventually(timeout(60.seconds), interval(2.seconds)) {
+ val it = fs.listFiles(new Path(dataPath), true)
+ var files = 0
+ while (it.hasNext) {
+ it.next()
+ files += 1
+ }
+ assertResult(4)(files)
+ }
+ }
+ }
+}
+// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index dfc5fbd3b..6e304db19 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -152,6 +152,36 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk",
"hdfs_cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.type",
+ "hdfs_gluten")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.endpoint",
+ HDFS_URL_ENDPOINT + "/")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.metadata_path",
+ HDFS_METADATA_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs2.metadata_type",
+ "rocksdb")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.type",
+ "cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.disk",
+ "hdfs2")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.path",
+ HDFS_CACHE_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache2.max_size",
+ "10Gi")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main_rocksdb.volumes",
+ "main")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main_rocksdb.volumes.main.disk",
+ "hdfs_cache2")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit",
"false")
diff --git a/cpp-ch/CMakeLists.txt b/cpp-ch/CMakeLists.txt
index 82049bdd3..15be68a2a 100644
--- a/cpp-ch/CMakeLists.txt
+++ b/cpp-ch/CMakeLists.txt
@@ -105,16 +105,15 @@ else()
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DENABLE_PROTOBUF=ON
-DENABLE_TESTS=OFF -DENABLE_JEMALLOC=ON -DENABLE_MULTITARGET_CODE=ON
-DENABLE_EXTERN_LOCAL_ENGINE=ON -DENABLE_ODBC=OFF -DENABLE_CAPNP=OFF
- -DENABLE_ROCKSDB=OFF -DENABLE_GRPC=OFF -DENABLE_RUST=OFF -DENABLE_H3=OFF
- -DENABLE_AMQPCPP=OFF -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=OFF
- -DENABLE_NATS=OFF -DENABLE_LIBPQXX=OFF -DENABLE_NURAFT=OFF
- -DENABLE_DATASKETCHES=OFF -DENABLE_SQLITE=OFF -DENABLE_S2_GEOMETRY=OFF
- -DENABLE_ANNOY=OFF -DENABLE_ULID=OFF -DENABLE_MYSQL=OFF
- -DENABLE_BCRYPT=OFF -DENABLE_LDAP=OFF -DENABLE_MSGPACK=OFF
- -DUSE_REPLXX=OFF -DENABLE_CLICKHOUSE_ALL=OFF -DENABLE_GWP_ASAN=OFF
- -DCOMPILER_FLAGS='-fvisibility=hidden -fvisibility-inlines-hidden' -S
- ${CH_SOURCE_DIR} -G Ninja -B ${CH_BINARY_DIR} && cmake --build
- ${CH_BINARY_DIR} --target libch\"
+ -DENABLE_GRPC=OFF -DENABLE_RUST=OFF -DENABLE_H3=OFF -DENABLE_AMQPCPP=OFF
+ -DENABLE_CASSANDRA=OFF -DENABLE_KAFKA=OFF -DENABLE_NATS=OFF
+ -DENABLE_LIBPQXX=OFF -DENABLE_NURAFT=OFF -DENABLE_DATASKETCHES=OFF
+ -DENABLE_SQLITE=OFF -DENABLE_S2_GEOMETRY=OFF -DENABLE_ANNOY=OFF
+ -DENABLE_ULID=OFF -DENABLE_MYSQL=OFF -DENABLE_BCRYPT=OFF
-DENABLE_LDAP=OFF
+ -DENABLE_MSGPACK=OFF -DUSE_REPLXX=OFF -DENABLE_CLICKHOUSE_ALL=OFF
+ -DENABLE_GWP_ASAN=OFF -DCOMPILER_FLAGS='-fvisibility=hidden
+ -fvisibility-inlines-hidden' -S ${CH_SOURCE_DIR} -G Ninja -B
+ ${CH_BINARY_DIR} && cmake --build ${CH_BINARY_DIR} --target libch\"
OUTPUT _build_ch)
endif()
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
index 3a4d15687..82afeb85e 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -22,37 +22,12 @@
namespace local_engine
{
-int getFileOrder(const std::string & path)
-{
- if (path.ends_with("columns.txt"))
- return 1;
- if (path.ends_with("metadata_version.txt"))
- return 2;
- if (path.ends_with("count.txt"))
- return 3;
- if (path.ends_with("default_compression_codec.txt"))
- return 4;
- if (path.ends_with("checksums.txt"))
- return 5;
- if (path.ends_with("uuid.txt"))
- return 6;
- if (path.ends_with(".cmrk3") || path.ends_with(".cmrk2") ||
path.ends_with(".cmrk1") ||
- path.ends_with(".mrk3") || path.ends_with(".mrk2") ||
path.ends_with(".mrk1"))
- return 10;
- if (path.ends_with("idx"))
- return 20;
- if (path.ends_with("bin"))
- return 1000;
- return 100;
-}
bool isMetaDataFile(const std::string & path)
{
return !path.ends_with("bin");
}
-using FileMappings = std::vector<std::pair<String,
std::shared_ptr<DB::TemporaryFileOnDisk>>>;
-
void CompactObjectStorageDiskTransaction::commit()
{
auto metadata_tx = disk.getMetadataStorage()->createTransaction();
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp
new file mode 100644
index 000000000..aba426564
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.cpp
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MetadataStorageFromRocksDB.h"
+#if USE_ROCKSDB
+#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
+#include
<Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.h>
+#include <Disks/ObjectStorages/StaticDirectoryIterator.h>
+#include <Interpreters/Context.h>
+#include <Storages/MergeTree/MetaDataHelper.h>
+#include <rocksdb/db.h>
+#include <Common/QueryContext.h>
+
+namespace local_engine
+{
+static std::string getObjectKeyCompatiblePrefix(
+ const DB::IObjectStorage & object_storage, const
Poco::Util::AbstractConfiguration & config, const String & config_prefix)
+{
+ return config.getString(config_prefix + ".key_compatibility_prefix",
object_storage.getCommonKeyPrefix());
+}
+
+DB::MetadataStoragePtr MetadataStorageFromRocksDB::create(
+ const std::string &,
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & config_prefix,
+ DB::ObjectStoragePtr object_storage)
+{
+ auto metadata_path = config.getString(config_prefix + ".metadata_path");
+ size_t clean_meta_task_interval_seconds = config.getUInt(config_prefix +
".clean_meta_task_interval_seconds", 60 * 60 * 12);
+ fs::create_directories(metadata_path);
+ auto key_compatibility_prefix =
getObjectKeyCompatiblePrefix(*object_storage, config, config_prefix);
+ return std::make_shared<MetadataStorageFromRocksDB>(
+ key_compatibility_prefix, metadata_path, object_storage,
clean_meta_task_interval_seconds);
+}
+
+MetadataStorageFromRocksDB::MetadataStorageFromRocksDB(
+ const String & compatible_key_prefix,
+ const String & rocksdb_dir,
+ DB::ObjectStoragePtr & object_storage_,
+ size_t metadata_clean_task_interval_seconds_)
+ : compatible_key_prefix(compatible_key_prefix)
+ , rocksdb_dir(rocksdb_dir)
+ , object_storage(object_storage_)
+ ,
metadata_clean_task_interval_seconds(metadata_clean_task_interval_seconds_)
+{
+ rocksdb::Options options;
+ options.create_if_missing = true;
+ throwRockDBErrorNotOk(rocksdb::DB::Open(options, rocksdb_dir, &rocksdb));
+ metadata_clean_task =
QueryContext::globalContext()->getSchedulePool().createTask(
+ "MetadataStorageFromRocksDB", [this] {
cleanOutdatedMetadataThreadFunc(); });
+ metadata_clean_task->scheduleAfter(metadata_clean_task_interval_seconds *
1000);
+ logger = getLogger("MetadataStorageFromRocksDB");
+}
+
+DB::MetadataTransactionPtr MetadataStorageFromRocksDB::createTransaction()
+{
+ return std::make_shared<MetadataStorageFromRocksDBTransaction>(*this);
+}
+
+const std::string & MetadataStorageFromRocksDB::getPath() const
+{
+ return rocksdb_dir;
+}
+
+DB::MetadataStorageType MetadataStorageFromRocksDB::getType() const
+{
+ return DB::MetadataStorageType::None;
+}
+
+bool MetadataStorageFromRocksDB::exists(const std::string & path) const
+{
+ return exist(getRocksDB(), path);
+}
+
+bool MetadataStorageFromRocksDB::isFile(const std::string & path) const
+{
+ auto data = getData(getRocksDB(), path);
+ return data != RocksDBCreateDirectoryOperation::DIR_DATA;
+}
+
+bool MetadataStorageFromRocksDB::isDirectory(const std::string & path) const
+{
+ auto data = getData(getRocksDB(), path);
+ return data == RocksDBCreateDirectoryOperation::DIR_DATA;
+}
+
+uint64_t MetadataStorageFromRocksDB::getFileSize(const std::string & path)
const
+{
+ return readMetadata(path)->getTotalSizeBytes();
+}
+
+Poco::Timestamp MetadataStorageFromRocksDB::getLastModified(const std::string
& /*path*/) const
+{
+ return {};
+}
+
+bool MetadataStorageFromRocksDB::supportsChmod() const
+{
+ return false;
+}
+
+bool MetadataStorageFromRocksDB::supportsStat() const
+{
+ return false;
+}
+
+std::vector<std::string> MetadataStorageFromRocksDB::listDirectory(const
std::string & path) const
+{
+ return listKeys(getRocksDB(), path);
+}
+
+DB::DirectoryIteratorPtr MetadataStorageFromRocksDB::iterateDirectory(const
std::string & path) const
+{
+ auto files = listKeys(getRocksDB(), path);
+ std::vector<std::filesystem::path> paths;
+ paths.reserve(files.size());
+ for (const auto & file : files)
+ paths.emplace_back(file);
+ return std::make_unique<DB::StaticDirectoryIterator>(std::move(paths));
+}
+
+uint32_t MetadataStorageFromRocksDB::getHardlinkCount(const std::string &
/*path*/) const
+{
+ return 0;
+}
+
+DB::StoredObjects MetadataStorageFromRocksDB::getStorageObjects(const
std::string & path) const
+{
+ auto metadata = readMetadata(path);
+ const auto & keys_with_meta = metadata->getKeysWithMeta();
+
+ DB::StoredObjects objects;
+ objects.reserve(keys_with_meta.size());
+ for (const auto & [object_key, object_meta] : keys_with_meta)
+ objects.emplace_back(object_key.serialize(), path,
object_meta.size_bytes, object_meta.offset);
+
+ return objects;
+}
+
+DB::DiskObjectStorageMetadataPtr
MetadataStorageFromRocksDB::readMetadata(const std::string & path) const
+{
+ std::shared_lock lock(metadata_mutex);
+ return readMetadataUnlocked(path, lock);
+}
+
+DB::DiskObjectStorageMetadataPtr
+MetadataStorageFromRocksDB::readMetadataUnlocked(const std::string & path,
std::unique_lock<DB::SharedMutex> &) const
+{
+ auto metadata =
std::make_unique<DB::DiskObjectStorageMetadata>(compatible_key_prefix, path);
+ auto str = getData(getRocksDB(), path);
+ metadata->deserializeFromString(str);
+ return metadata;
+}
+
+DB::DiskObjectStorageMetadataPtr
+MetadataStorageFromRocksDB::readMetadataUnlocked(const std::string & path,
std::shared_lock<DB::SharedMutex> &) const
+{
+ auto metadata =
std::make_unique<DB::DiskObjectStorageMetadata>(compatible_key_prefix, path);
+ auto str = getData(getRocksDB(), path);
+ metadata->deserializeFromString(str);
+ return metadata;
+}
+
+std::string MetadataStorageFromRocksDB::readFileToString(const std::string &
path) const
+{
+ return getData(getRocksDB(), path);
+}
+
+void MetadataStorageFromRocksDB::shutdown()
+{
+ metadata_clean_task->deactivate();
+ if (rocksdb)
+ {
+ rocksdb->Close();
+ rocksdb = nullptr;
+ }
+}
+
+void MetadataStorageFromRocksDB::cleanOutdatedMetadataThreadFunc()
+{
+ LOG_INFO(logger, "start to clean disk metadata in rocksdb.");
+ std::queue<String> part_queue;
+ size_t total_count_remove = 0;
+ auto removeParts = [&]
+ {
+ while (!part_queue.empty())
+ {
+ auto meta_name = part_queue.front();
+ part_queue.pop();
+ std::filesystem::path meta_path(meta_name);
+ auto part_path = meta_path.parent_path();
+ auto files = listDirectory(part_path);
+ total_count_remove += (files.size() + 1);
+ getRocksDB().DeleteRange({}, part_path.generic_string(),
files.back());
+ getRocksDB().Delete({}, files.back());
+ }
+ };
+ auto * it = getRocksDB().NewIterator({});
+ String prev_key;
+ String prev_data;
+ for (it->SeekToFirst(); it->Valid(); it->Next())
+ {
+ auto file_name = it->key().ToString();
+ // mark outdated part
+ if (isMergeTreePartMetaDataFile(file_name))
+ {
+ auto objects = getStorageObjects(it->key().ToString());
+ if (!object_storage->exists(objects.front()))
+ part_queue.push(file_name);
+ }
+ // clean empty directory
+ if (!prev_key.empty() && !file_name.starts_with(prev_key) && prev_data
== RocksDBCreateDirectoryOperation::DIR_DATA)
+ {
+ getRocksDB().Delete({}, prev_key);
+ total_count_remove ++;
+ }
+ if (part_queue.size() > 10000)
+ {
+ removeParts();
+ }
+ }
+ removeParts();
+ rocksdb::Slice begin(nullptr, 0);
+ rocksdb::Slice end(nullptr, 0);
+ rocksdb::Status s = getRocksDB().CompactRange({}, &begin, &end);
+ LOG_INFO(logger, "Clean meta finish, totally clean {} meta",
total_count_remove);
+ metadata_clean_task->scheduleAfter(metadata_clean_task_interval_seconds *
1000);
+}
+
+DB::SharedMutex & MetadataStorageFromRocksDB::getMetadataMutex() const
+{
+ return metadata_mutex;
+}
+
+rocksdb::DB & MetadataStorageFromRocksDB::getRocksDB() const
+{
+ if (!rocksdb)
+ throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "RocksDB is not
initialized");
+ return *rocksdb;
+}
+
+void MetadataStorageFromRocksDBTransaction::commit()
+{
+ commitImpl(metadata_storage.getMetadataMutex());
+}
+
+const DB::IMetadataStorage &
MetadataStorageFromRocksDBTransaction::getStorageForNonTransactionalReads()
const
+{
+ return metadata_storage;
+}
+
+bool MetadataStorageFromRocksDBTransaction::supportsChmod() const
+{
+ return false;
+}
+
+void MetadataStorageFromRocksDBTransaction::createEmptyMetadataFile(const
std::string & path)
+{
+ auto metadata =
std::make_unique<DB::DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix,
path);
+ writeStringToFile(path, metadata->serializeToString());
+}
+
+void MetadataStorageFromRocksDBTransaction::createMetadataFile(const
std::string & path, DB::ObjectStorageKey key, uint64_t size_in_bytes)
+{
+ auto metadata =
std::make_unique<DB::DiskObjectStorageMetadata>(metadata_storage.compatible_key_prefix,
path);
+ metadata->addObject(std::move(key), size_in_bytes);
+
+ auto data = metadata->serializeToString();
+ if (!data.empty())
+ addOperation(std::make_unique<RocksDBWriteFileOperation>(path,
metadata_storage.getRocksDB(), data));
+}
+
+void MetadataStorageFromRocksDBTransaction::writeStringToFile(const
std::string & path, const std::string & data)
+{
+ addOperation(std::make_unique<RocksDBWriteFileOperation>(path,
metadata_storage.getRocksDB(), data));
+}
+
+void MetadataStorageFromRocksDBTransaction::createDirectory(const std::string
& path)
+{
+ addOperation(std::make_unique<RocksDBCreateDirectoryOperation>(path,
metadata_storage.getRocksDB()));
+}
+
+void MetadataStorageFromRocksDBTransaction::createDirectoryRecursive(const
std::string & path)
+{
+
addOperation(std::make_unique<RocksDBCreateDirectoryRecursiveOperation>(path,
metadata_storage.getRocksDB()));
+}
+
+void MetadataStorageFromRocksDBTransaction::removeDirectory(const std::string
& path)
+{
+ addOperation(std::make_unique<RocksDBRemoveDirectoryOperation>(path,
metadata_storage.getRocksDB()));
+}
+
+void MetadataStorageFromRocksDBTransaction::removeRecursive(const std::string
& path)
+{
+ addOperation(std::make_unique<RocksDBRemoveRecursiveOperation>(path,
metadata_storage.getRocksDB()));
+}
+
+void MetadataStorageFromRocksDBTransaction::unlinkFile(const std::string &
path)
+{
+ addOperation(std::make_unique<RocksDBUnlinkFileOperation>(path,
metadata_storage.getRocksDB()));
+}
+}
+#endif
\ No newline at end of file
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.h
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.h
new file mode 100644
index 000000000..1f53a1efe
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDB.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <config.h>
+#if USE_ROCKSDB
+#include <Disks/DiskLocal.h>
+#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
+#include <Disks/ObjectStorages/IMetadataStorage.h>
+#include <Disks/ObjectStorages/MetadataOperationsHolder.h>
+#include <rocksdb/db.h>
+
+namespace local_engine
+{
+class MetadataStorageFromRocksDB final : public DB::IMetadataStorage
+{
+ friend class MetadataStorageFromRocksDBTransaction;
+
+public:
+ static DB::MetadataStoragePtr create(
+ const std::string & name,
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & config_prefix,
+ DB::ObjectStoragePtr object_storage);
+ MetadataStorageFromRocksDB(const String & compatible_key_prefix, const
String & rocksdb_dir, DB::ObjectStoragePtr & object_storage, size_t
metadata_clean_task_interval_seconds);
+ DB::MetadataTransactionPtr createTransaction() override;
+ const std::string & getPath() const override;
+ DB::MetadataStorageType getType() const override;
+ bool exists(const std::string & path) const override;
+ bool isFile(const std::string & path) const override;
+ bool isDirectory(const std::string & path) const override;
+ uint64_t getFileSize(const std::string & path) const override;
+ Poco::Timestamp getLastModified(const std::string & path) const override;
+ bool supportsChmod() const override;
+ bool supportsStat() const override;
+ std::vector<std::string> listDirectory(const std::string & path) const
override;
+ DB::DirectoryIteratorPtr iterateDirectory(const std::string & path) const
override;
+ uint32_t getHardlinkCount(const std::string & path) const override;
+ DB::StoredObjects getStorageObjects(const std::string & path) const
override;
+ DB::DiskObjectStorageMetadataPtr readMetadata(const std::string & path)
const;
+ DB::DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string &
path, std::unique_lock<DB::SharedMutex> & lock) const;
+ DB::DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string &
path, std::shared_lock<DB::SharedMutex> & lock) const;
+ std::string readFileToString(const std::string & path) const override;
+ void shutdown() override;
+ void cleanOutdatedMetadataThreadFunc();
+
+private:
+ DB::SharedMutex & getMetadataMutex() const;
+ rocksdb::DB & getRocksDB() const;
+
+ using RocksDBPtr = rocksdb::DB *;
+ RocksDBPtr rocksdb = nullptr;
+ mutable DB::SharedMutex metadata_mutex;
+ String compatible_key_prefix;
+ String rocksdb_dir;
+ DB::ObjectStoragePtr object_storage;
+ size_t metadata_clean_task_interval_seconds;
+ DB::BackgroundSchedulePool::TaskHolder metadata_clean_task;
+ LoggerPtr logger;
+};
+
+class MetadataStorageFromRocksDBTransaction final : public
DB::IMetadataTransaction, private DB::MetadataOperationsHolder
+{
+public:
+ MetadataStorageFromRocksDBTransaction(const MetadataStorageFromRocksDB &
metadata_storage_) : metadata_storage(metadata_storage_) { }
+
+ void commit() override;
+ const DB::IMetadataStorage & getStorageForNonTransactionalReads() const
override;
+ bool supportsChmod() const override;
+ void createEmptyMetadataFile(const std::string & path) override;
+ void createMetadataFile(const std::string & path, DB::ObjectStorageKey
key, uint64_t size_in_bytes) override;
+
+ void writeStringToFile(const std::string &, const std::string &) override;
+ void createDirectory(const std::string &) override;
+ void createDirectoryRecursive(const std::string &) override;
+ void removeDirectory(const std::string &) override;
+ void removeRecursive(const std::string &) override;
+ void unlinkFile(const std::string &) override;
+
+private:
+ const MetadataStorageFromRocksDB & metadata_storage;
+};
+}
+#endif
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.cpp
new file mode 100644
index 000000000..34214594b
--- /dev/null
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.cpp
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <config.h>
+#if USE_ROCKSDB
+#include "MetadataStorageFromRocksDBTransactionOperations.h"
+#include <ranges>
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int INVALID_STATE;
+}
+}
+
+namespace local_engine
+{
+
+void throwRockDBErrorNotOk(const rocksdb::Status & status)
+{
+ if (!status.ok())
+ throw DB::Exception(DB::ErrorCodes::INVALID_STATE, "Access rocksdb
failed: {}", status.ToString());
+}
+
+bool exist(rocksdb::DB & db, const std::string & path)
+{
+ std::string data;
+ auto status = db.Get({}, path, &data);
+ if (status.IsNotFound())
+ return false;
+ throwRockDBErrorNotOk(status);
+ return status.ok();
+}
+
+String getData(rocksdb::DB & db, const std::string & path)
+{
+ std::string data;
+ throwRockDBErrorNotOk(db.Get({}, path, &data));
+ return data;
+}
+
+std::vector<String> listKeys(rocksdb::DB & db, const std::string & path)
+{
+ std::vector<String> result;
+ auto *it = db.NewIterator({});
+ for (it->Seek(path); it->Valid() && it->key().starts_with(path);
it->Next())
+ {
+ if (it->key() == path)
+ continue;
+ result.push_back(it->key().ToString());
+ }
+ return result;
+}
+
+void RocksDBWriteFileOperation::execute(std::unique_lock<DB::SharedMutex> &)
+{
+ auto status = db.Get({}, path, &prev_data);
+ if (status.IsNotFound())
+ existed = false;
+ else
+ throwRockDBErrorNotOk(status);
+ db.Put({}, path, data);
+}
+
+void RocksDBWriteFileOperation::undo(std::unique_lock<DB::SharedMutex> &)
+{
+ if (existed)
+ throwRockDBErrorNotOk(db.Put({}, path, prev_data));
+ else
+ throwRockDBErrorNotOk(db.Delete({}, path));
+}
+
+void
RocksDBCreateDirectoryOperation::execute(std::unique_lock<DB::SharedMutex> &)
+{
+ existed = exist(db, path);
+ if (existed)
+ return;
+ throwRockDBErrorNotOk(db.Put({}, path, DIR_DATA));
+}
+
+void RocksDBCreateDirectoryOperation::undo(std::unique_lock<DB::SharedMutex> &)
+{
+ if (existed) return;
+ throwRockDBErrorNotOk(db.Delete({}, path));
+}
+
+void
RocksDBCreateDirectoryRecursiveOperation::execute(std::unique_lock<DB::SharedMutex>
& )
+{
+ namespace fs = std::filesystem;
+ fs::path p(path);
+ while (!exist(db, p.string()))
+ {
+ paths_created.push_back(p);
+ if (!p.has_parent_path())
+ break;
+ p = p.parent_path();
+ }
+ for (const auto & path_to_create : paths_created | std::views::reverse)
+ throwRockDBErrorNotOk(db.Put({}, path_to_create,
RocksDBCreateDirectoryOperation::DIR_DATA));
+}
+
+void
RocksDBCreateDirectoryRecursiveOperation::undo(std::unique_lock<DB::SharedMutex>
& )
+{
+ for (const auto & path_created : paths_created)
+ throwRockDBErrorNotOk(db.Delete({}, path_created));
+}
+
+void
RocksDBRemoveDirectoryOperation::execute(std::unique_lock<DB::SharedMutex> &)
+{
+ auto *it = db.NewIterator({});
+ bool empty_dir = true;
+ for (it->Seek(path); it->Valid() && it->key().starts_with(path);
it->Next())
+ {
+ if (it->key() != path && it->key().starts_with(path))
+ {
+ empty_dir = false;
+ break;
+ }
+ if (it->key() == path)
+ existed = true;
+ }
+ if (!empty_dir)
+ {
+ throw DB::Exception(DB::ErrorCodes::INVALID_STATE, "Directory {} is
not empty", path);
+ }
+ if (existed)
+ throwRockDBErrorNotOk(db.Delete({}, path));
+}
+
+void RocksDBRemoveDirectoryOperation::undo(std::unique_lock<DB::SharedMutex> &)
+{
+ if (existed)
+ throwRockDBErrorNotOk(db.Put({}, path,
RocksDBCreateDirectoryOperation::DIR_DATA));
+}
+
+void
RocksDBRemoveRecursiveOperation::execute(std::unique_lock<DB::SharedMutex> &)
+{
+ auto *it = db.NewIterator({});
+ for (it->Seek(path); it->Valid() && it->key().starts_with(path);
it->Next())
+ {
+ files.emplace(it->key().ToString(), it->value().ToString());
+ throwRockDBErrorNotOk(db.Delete({}, it->key()));
+ }
+}
+
+void RocksDBRemoveRecursiveOperation::undo(std::unique_lock<DB::SharedMutex> &)
+{
+ for (const auto & [key, value] : files)
+ throwRockDBErrorNotOk(db.Put({}, key, value));
+}
+
+void RocksDBUnlinkFileOperation::execute(std::unique_lock<DB::SharedMutex> &)
+{
+ prev_data = getData(db, path);
+ throwRockDBErrorNotOk(db.Delete({}, path));
+}
+
+void RocksDBUnlinkFileOperation::undo(std::unique_lock<DB::SharedMutex> &)
+{
+ throwRockDBErrorNotOk(db.Put({}, path, prev_data));
+}
+}
+#endif
\ No newline at end of file
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.h
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.h
new file mode 100644
index 000000000..4a246cf7b
--- /dev/null
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/MetadataStorageFromRocksDBTransactionOperations.h
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <config.h>
+#if USE_ROCKSDB
+#include <Disks/ObjectStorages/IMetadataOperation.h>
+#include <Disks/ObjectStorages/IMetadataStorage.h>
+#include <rocksdb/db.h>
+
+namespace local_engine
+{
+void throwRockDBErrorNotOk(const rocksdb::Status & status);
+bool exist(rocksdb::DB & db, const std::string & path);
+String getData(rocksdb::DB & db, const std::string & path);
+std::vector<String> listKeys(rocksdb::DB & db, const std::string & path);
+
+struct RocksDBWriteFileOperation final : public DB::IMetadataOperation
+{
+ RocksDBWriteFileOperation(const std::string& path_, rocksdb::DB& db_,
const std::string& data_) : path(path_),
+ db(db_), data(data_)
+ {
+ }
+
+ void execute(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+ void undo(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+private:
+ std::string path;
+ rocksdb::DB & db;
+ std::string data;
+ bool existed = false;
+ std::string prev_data;
+};
+
+struct RocksDBCreateDirectoryOperation final : public DB::IMetadataOperation
+{
+ RocksDBCreateDirectoryOperation(const std::string & path_, rocksdb::DB &
db_) : path(path_), db(db_)
+ {
+ }
+
+ void execute(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+ void undo(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+ const static inline String DIR_DATA = "__DIR__";
+private:
+ std::string path;
+ bool existed = false;
+ rocksdb::DB & db;
+
+};
+
+struct RocksDBCreateDirectoryRecursiveOperation final : public
DB::IMetadataOperation
+{
+ RocksDBCreateDirectoryRecursiveOperation(const std::string & path_,
rocksdb::DB & db_) : path(path_), db(db_)
+ {
+ };
+
+ void execute(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+ void undo(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+private:
+ std::string path;
+ std::vector<std::string> paths_created;
+ rocksdb::DB & db;
+};
+
+struct RocksDBRemoveDirectoryOperation final : public DB::IMetadataOperation
+{
+ RocksDBRemoveDirectoryOperation(const std::string & path_, rocksdb::DB &
db_): path(path_), db(db_)
+ {
+ }
+
+ void execute(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+ void undo(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+private:
+ std::string path;
+ bool existed = false;
+ rocksdb::DB & db;
+};
+
+struct RocksDBRemoveRecursiveOperation final : public DB::IMetadataOperation
+{
+ RocksDBRemoveRecursiveOperation(const std::string & path_, rocksdb::DB &
db_) : path(path_), db(db_)
+ {
+ }
+
+ void execute(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+ void undo(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+private:
+ std::string path;
+ rocksdb::DB & db;
+ std::unordered_map<String, String> files;
+};
+
+struct RocksDBUnlinkFileOperation final : public DB::IMetadataOperation
+{
+ RocksDBUnlinkFileOperation(const std::string & path_, rocksdb::DB & db_) :
path(path_), db(db_)
+ {
+ }
+
+ void execute(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+ void undo(std::unique_lock<DB::SharedMutex> & metadata_lock) override;
+
+private:
+ std::string path;
+ rocksdb::DB & db;
+ std::string prev_data;
+};
+
+
+}
+#endif
+
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index 52398b5f2..99627181c 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -29,6 +29,10 @@
#include <Disks/ObjectStorages/GlutenDiskS3.h>
#endif
+#if USE_ROCKSDB
+#include <Disks/ObjectStorages/MetadataStorageFromRocksDB.h>
+#endif
+
#include "registerGlutenDisks.h"
namespace local_engine
@@ -60,7 +64,18 @@ void registerGlutenDisks(bool global_skip_access_check)
const
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) ->
DB::ObjectStoragePtr
{ return DB::ObjectStorageFactory::instance().create(name, conf,
config_prefix, ctx, skip_access_check); };
auto object_storage =
DB::ObjectStorageFactory::instance().create(name, config, config_prefix,
context, skip_access_check);
- auto metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
+ DB::MetadataStoragePtr metadata_storage;
+ auto metadata_type =
DB::MetadataStorageFactory::getMetadataType(config, config_prefix, "local");
+ if (metadata_type == "rocksdb")
+ {
+#if USE_ROCKSDB
+ metadata_storage = MetadataStorageFromRocksDB::create(name,
config, config_prefix, object_storage);
+#else
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RocksDB metadata
storage is not enabled in the build");
+#endif
+ }
+ else
+ metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
DB::DiskObjectStoragePtr disk =
std::make_shared<local_engine::GlutenDiskS3>(
name,
@@ -96,7 +111,18 @@ void registerGlutenDisks(bool global_skip_access_check)
const
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) ->
DB::ObjectStoragePtr
{ return DB::ObjectStorageFactory::instance().create(name, conf,
config_prefix, ctx, skip_access_check); };
auto object_storage = object_storage_creator(config, context);
- auto metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
+ DB::MetadataStoragePtr metadata_storage;
+ auto metadata_type =
DB::MetadataStorageFactory::getMetadataType(config, config_prefix, "local");
+ if (metadata_type == "rocksdb")
+ {
+#if USE_ROCKSDB
+ metadata_storage = MetadataStorageFromRocksDB::create(name,
config, config_prefix, object_storage);
+#else
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RocksDB metadata
storage is not enabled in the build");
+#endif
+ }
+ else
+ metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
DB::DiskObjectStoragePtr disk =
std::make_shared<local_engine::GlutenDiskHDFS>(
name,
diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
index c9b56734a..621374377 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.cpp
@@ -29,10 +29,19 @@ extern const Metric LocalThreadActive;
extern const Metric LocalThreadScheduled;
}
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int NOT_IMPLEMENTED;
+}
+}
+
using namespace DB;
namespace local_engine
{
+static const String METADATA_FILE_NAME = "metadata.gluten";
std::unordered_map<String, String> extractPartMetaData(ReadBuffer & in)
{
@@ -53,12 +62,73 @@ std::unordered_map<String, String>
extractPartMetaData(ReadBuffer & in)
return result;
}
-void restoreMetaData(const SparkStorageMergeTreePtr & storage, const
MergeTreeTableInstance & mergeTreeTable, const Context & context)
+enum SupportedMetaDataStorageType
{
- const auto data_disk = storage->getStoragePolicy()->getAnyDisk();
- if (!data_disk->isRemote())
- return;
+ UNKNOWN =0,
+ ROCKSDB,
+ LOCAL
+};
+
+template <SupportedMetaDataStorageType type>
+static void restoreMetaData(const SparkStorageMergeTreePtr & storage, const
MergeTreeTableInstance & mergeTreeTable, const Context & context)
+{
+ UNREACHABLE();
+}
+
+template <>
+void restoreMetaData<ROCKSDB>(const SparkStorageMergeTreePtr & storage, const
MergeTreeTableInstance & mergeTreeTable, const Context & context)
+{
+ auto data_disk = storage->getStoragePolicy()->getAnyDisk();
+ std::unordered_set<String> not_exists_part;
+ auto metadata_storage = data_disk->getMetadataStorage();
+ auto table_path = std::filesystem::path(mergeTreeTable.relative_path);
+ for (const auto & part : mergeTreeTable.getPartNames())
+ {
+ auto part_path = table_path / part;
+ if (!metadata_storage->exists(part_path))
+ not_exists_part.emplace(part);
+ }
+
+ if (auto lock =
storage->lockForAlter(context.getSettingsRef().lock_acquire_timeout))
+ {
+ // put this return clause in lockForAlter
+ // so that it will not return until other thread finishes restoring
+ if (not_exists_part.empty())
+ return;
+
+ auto s3 = data_disk->getObjectStorage();
+ auto transaction = metadata_storage->createTransaction();
+
+ if (!metadata_storage->exists(table_path))
+ transaction->createDirectoryRecursive(table_path.generic_string());
+
+ for (const auto & part : not_exists_part)
+ {
+ auto part_path = table_path / part;
+ auto metadata_file_path = part_path / METADATA_FILE_NAME;
+ if (metadata_storage->exists(part_path))
+ return;
+ else
+ transaction->createDirectoryRecursive(part_path);
+ auto key =
s3->generateObjectKeyForPath(metadata_file_path.generic_string(), std::nullopt);
+ StoredObject metadata_object(key.serialize());
+ auto part_metadata =
extractPartMetaData(*s3->readObject(metadata_object));
+ for (const auto & item : part_metadata)
+ {
+ auto item_path = part_path / item.first;
+ transaction->writeStringToFile(item_path, item.second);
+ }
+ }
+ transaction->commit();
+ }
+}
+
+template <>
+void restoreMetaData<LOCAL>(
+ const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance &
mergeTreeTable, const Context & context)
+{
+ const auto data_disk = storage->getStoragePolicy()->getAnyDisk();
std::unordered_set<String> not_exists_part;
const DB::MetadataStorageFromDisk * metadata_storage =
static_cast<MetadataStorageFromDisk *>(data_disk->getMetadataStorage().get());
const auto metadata_disk = metadata_storage->getDisk();
@@ -86,8 +156,7 @@ void restoreMetaData(const SparkStorageMergeTreePtr &
storage, const MergeTreeTa
CurrentMetrics::LocalThreadScheduled,
max_threads,
max_threads,
- not_exists_part.size()
- );
+ not_exists_part.size());
auto s3 = data_disk->getObjectStorage();
if (!metadata_disk->exists(table_path))
@@ -95,9 +164,10 @@ void restoreMetaData(const SparkStorageMergeTreePtr &
storage, const MergeTreeTa
for (const auto & part : not_exists_part)
{
- auto job = [&]() {
+ auto job = [&]()
+ {
auto part_path = table_path / part;
- auto metadata_file_path = part_path / "metadata.gluten";
+ auto metadata_file_path = part_path / METADATA_FILE_NAME;
if (metadata_disk->exists(part_path))
return;
@@ -119,6 +189,33 @@ void restoreMetaData(const SparkStorageMergeTreePtr &
storage, const MergeTreeTa
}
}
+
+bool isMergeTreePartMetaDataFile(const String & file_name)
+{
+ return file_name.ends_with(METADATA_FILE_NAME);
+}
+
+void restoreMetaData(const SparkStorageMergeTreePtr & storage, const
MergeTreeTableInstance & mergeTreeTable, const Context & context)
+{
+ const auto data_disk = storage->getStoragePolicy()->getAnyDisk();
+ if (!data_disk->isRemote())
+ return;
+ auto metadata_storage = data_disk->getMetadataStorage();
+ if (metadata_storage->getType() == MetadataStorageType::Local)
+ {
+ restoreMetaData<LOCAL>(storage, mergeTreeTable, context);
+ }
+ // None is RocksDB
+ else if (metadata_storage->getType() == MetadataStorageType::None)
+ {
+ restoreMetaData<ROCKSDB>(storage, mergeTreeTable, context);
+ }
+ else
+ {
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported metadata
storage type {}.", metadata_storage->getType());
+ }
+}
+
void saveFileStatus(
const DB::MergeTreeData & storage,
const DB::ContextPtr& context,
@@ -128,20 +225,18 @@ void saveFileStatus(
const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
if (!disk->isRemote())
return;
- if (auto * const disk_metadata = dynamic_cast<MetadataStorageFromDisk
*>(disk->getMetadataStorage().get()))
+ auto meta_storage = disk->getMetadataStorage();
+ const auto out = data_part_storage.writeFile(METADATA_FILE_NAME,
DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
+ for (const auto it = data_part_storage.iterate(); it->isValid();
it->next())
{
- const auto out = data_part_storage.writeFile("metadata.gluten",
DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
- for (const auto it = data_part_storage.iterate(); it->isValid();
it->next())
- {
- auto content = disk_metadata->readFileToString(it->path());
- writeString(it->name(), *out);
- writeChar('\t', *out);
- writeIntText(content.length(), *out);
- writeChar('\n', *out);
- writeString(content, *out);
- }
- out->finalize();
+ auto content = meta_storage->readFileToString(it->path());
+ writeString(it->name(), *out);
+ writeChar('\t', *out);
+ writeIntText(content.length(), *out);
+ writeChar('\n', *out);
+ writeString(content, *out);
}
+ out->finalize();
LOG_DEBUG(&Poco::Logger::get("MetaDataHelper"), "Save part {} metadata
success.", part_name);
}
diff --git a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h
b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h
index 329720817..503517cc8 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/MetaDataHelper.h
@@ -23,6 +23,10 @@
namespace local_engine
{
+
+
+bool isMergeTreePartMetaDataFile(const String & file_name);
+
void restoreMetaData(const SparkStorageMergeTreePtr & storage, const
MergeTreeTableInstance & mergeTreeTable, const Context & context);
void saveFileStatus(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]