This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4855a4d7d [GLUTEN-4675][CH] Support write mergetree to s3 (#4676)
4855a4d7d is described below
commit 4855a4d7dcc3c35160b531cc5913d8c41ae41972
Author: Shuai li <[email protected]>
AuthorDate: Fri Mar 22 18:33:16 2024 +0800
[GLUTEN-4675][CH] Support write mergetree to s3 (#4676)
What changes were proposed in this pull request?
support write mergetree on s3 and hdfs
How was this patch tested?
UT
---------
Co-authored-by: liuneng1994 <[email protected]>
Co-authored-by: liuneng <[email protected]>
---
backends-clickhouse/pom.xml | 28 +-
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 526 ++++++++++++++++++++
...ergeTreeWriteOnObjectStorageAbstractSuite.scala | 188 +++++++
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 543 +++++++++++++++++++++
cpp-ch/local-engine/CMakeLists.txt | 6 +-
cpp-ch/local-engine/Common/CHUtil.cpp | 25 +
cpp-ch/local-engine/Common/CHUtil.h | 1 +
cpp-ch/local-engine/Common/MergeTreeTool.cpp | 27 +-
cpp-ch/local-engine/Common/MergeTreeTool.h | 9 +-
.../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 75 +++
.../Disks/ObjectStorages/GlutenDiskHDFS.h | 63 +++
.../ObjectStorages/GlutenHDFSObjectStorage.cpp | 42 ++
.../Disks/ObjectStorages/GlutenHDFSObjectStorage.h | 53 ++
.../registerGlutenDiskObjectStorage.cpp | 120 +++++
cpp-ch/local-engine/Disks/registerGlutenDisks.cpp | 99 ++++
cpp-ch/local-engine/Disks/registerGlutenDisks.h | 27 +
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 13 +-
cpp-ch/local-engine/Parser/MergeTreeRelParser.h | 1 -
.../Storages/CustomStorageMergeTree.cpp | 3 +-
.../Storages/Mergetree/MetaDataHelper.cpp | 95 ++++
.../Storages/Mergetree/MetaDataHelper.h | 29 ++
.../Storages/Mergetree/SparkMergeTreeWriter.cpp | 55 ++-
.../Storages/Mergetree/SparkMergeTreeWriter.h | 6 +-
cpp-ch/local-engine/local_engine_jni.cpp | 2 +-
.../substrait/rel/ExtensionTableNode.java | 8 +-
25 files changed, 2008 insertions(+), 36 deletions(-)
diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index e0f96eda1..f173a61c9 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -19,6 +19,12 @@
<artifactId>gluten-core</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>io.glutenproject</groupId>
@@ -50,6 +56,16 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>hadoop-client-api</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-client-runtime</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -138,7 +154,11 @@
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
</exclusion>
- </exclusions>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -181,6 +201,12 @@
<version>1.11.901</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>io.minio</groupId>
+ <artifactId>minio</artifactId>
+ <version>8.5.9</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
new file mode 100644
index 000000000..88fc977b7
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+
+import java.io.File
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteOnHDFSSuite
+ extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ val conf = new Configuration
+ conf.set("fs.defaultFS", HDFS_URL)
+ val fs = FileSystem.get(conf)
+ fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+// FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+ FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
+// FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
+ }
+
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+// FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
+ }
+
+ test("test mergetree table write") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assert(addFiles.size == 1)
+ assert(addFiles.head.rows == 600572)
+ }
+ spark.sql("drop table lineitem_mergetree_hdfs")
+ }
+
+ test("test mergetree write with orderby keys / primary keys") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (storage_policy='__hdfs_main',
+ | orderByKey='l_shipdate,l_orderkey',
+ | primaryKey='l_shipdate')
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_orderbykey_hdfs'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_orderbykey_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_orderbykey_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_shipdate,l_orderkey"))
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .get
+ .mkString(",")
+ .equals("l_shipdate"))
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assert(addFiles.size == 1)
+ assert(addFiles.head.rows == 600572)
+ }
+ spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
+ }
+
+ test("test mergetree write with partition") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |TBLPROPERTIES (storage_policy='__hdfs_main',
+ | orderByKey='l_orderkey',
+ | primaryKey='l_orderkey')
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_partition_hdfs'
+ |""".stripMargin)
+
+ // dynamic partitions
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_partition_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+
+ // write with dataframe api
+ val source = spark.sql(s"""
+ |select
+ | l_orderkey ,
+ | l_partkey ,
+ | l_suppkey ,
+ | l_linenumber ,
+ | l_quantity ,
+ | l_extendedprice ,
+ | l_discount ,
+ | l_tax ,
+ | l_returnflag ,
+ | l_linestatus ,
+ | l_shipdate ,
+ | l_commitdate ,
+ | l_receiptdate ,
+ | l_shipinstruct ,
+ | l_shipmode ,
+ | l_comment
+ | from lineitem
+ | where l_shipdate BETWEEN date'1993-01-01' AND
date'1993-01-10'
+ |""".stripMargin)
+
+ source.write
+ .format("clickhouse")
+ .mode(SaveMode.Append)
+ .insertInto("lineitem_mergetree_partition_hdfs")
+
+ // static partition
+ spark.sql(s"""
+ | insert into lineitem_mergetree_partition_hdfs PARTITION
(l_returnflag = 'A')
+ | (l_shipdate,
+ | l_orderkey,
+ | l_partkey,
+ | l_suppkey,
+ | l_linenumber,
+ | l_quantity,
+ | l_extendedprice,
+ | l_discount,
+ | l_tax,
+ | l_linestatus,
+ | l_commitdate,
+ | l_receiptdate,
+ | l_shipinstruct,
+ | l_shipmode,
+ | l_comment)
+ | select
+ | l_shipdate,
+ | l_orderkey,
+ | l_partkey,
+ | l_suppkey,
+ | l_linenumber,
+ | l_quantity,
+ | l_extendedprice,
+ | l_discount,
+ | l_tax,
+ | l_linestatus,
+ | l_commitdate,
+ | l_receiptdate,
+ | l_shipinstruct,
+ | l_shipmode,
+ | l_comment from lineitem
+ | where l_returnflag = 'A'
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_partition_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
+ df =>
+ val result = df.collect()
+ assert(result.length == 4)
+ assert(result(0).getString(0).equals("A"))
+ assert(result(0).getString(1).equals("F"))
+ assert(result(0).getDouble(2) == 7578058.0)
+
+ assert(result(2).getString(0).equals("N"))
+ assert(result(2).getString(1).equals("O"))
+ assert(result(2).getDouble(2) == 7454519.0)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+ assert(mergetreeScan.metrics("numFiles").value == 6)
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_orderkey"))
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .get
+ .mkString(",")
+ .equals("l_orderkey"))
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .partitionColumns(0)
+ .equals("l_returnflag"))
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assert(addFiles.size == 6)
+ assert(addFiles.map(_.rows).sum == 750735)
+ }
+ spark.sql("drop table lineitem_mergetree_partition_hdfs")
+ }
+
+ test("test mergetree write with bucket table") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_hdfs
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |CLUSTERED BY (l_orderkey)
+ |${if (sparkVersion.equals("3.2")) "" else "SORTED BY
(l_orderkey)"} INTO 4 BUCKETS
+ |LOCATION '$HDFS_URL/test/lineitem_mergetree_bucket_hdfs'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_bucket_hdfs
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_bucket_hdfs
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec(0)
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ if (sparkVersion.equals("3.2")) {
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+ } else {
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_orderkey"))
+ }
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .partitionColumns(0)
+ .equals("l_returnflag"))
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assert(addFiles.size == 12)
+ assert(addFiles.map(_.rows).sum == 600572)
+ }
+ spark.sql("drop table lineitem_mergetree_bucket_hdfs")
+ }
+
+}
+// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
new file mode 100644
index 000000000..e9b938dbf
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import io.glutenproject.GlutenConfig
+
+import org.apache.spark.sql.SparkSession
+
+import _root_.org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.commons.io.FileUtils
+
+import java.io.File
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+ extends GlutenClickHouseTPCHAbstractSuite
+ with AdaptiveSparkPlanHelper {
+ private var _spark: SparkSession = _
+
+ override protected def spark: SparkSession = _spark
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ protected val sparkVersion: String = {
+ val version = SPARK_VERSION_SHORT.split("\\.")
+ version(0) + "." + version(1)
+ }
+
+ val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
+ val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
+ val S3_ENDPOINT = "s3://127.0.0.1:9000/"
+ val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
+ val BUCKET_NAME: String = sparkVersion.replace(".", "-")
+ val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
+
+ val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
+ val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
+ val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020"
+ val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
+
+ val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
+ val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
+
+ override protected def initializeSession(): Unit = {
+ if (_spark == null) {
+ _spark = SparkSession
+ .builder()
+ .appName("Gluten-UT-RemoteHS")
+ .config(sparkConf)
+ .getOrCreate()
+ }
+ }
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .setMaster("local[2]")
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format",
"false")
+ .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path",
+ "/tmp/user_defined")
+ .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
+ .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
+ .set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+ .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
+ .set("spark.hadoop.fs.s3a.path.style.access", "true")
+ .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type",
+ "s3_gluten")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.endpoint",
+ WHOLE_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.access_key_id",
+ S3_ACCESS_KEY)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.secret_access_key",
+ S3_SECRET_KEY)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.metadata_path",
+ S3_METADATA_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.type",
+ "cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.disk",
+ "s3")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.path",
+ S3_CACHE_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.max_size",
+ "10Gi")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes",
+ "main")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk",
+ "s3_cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type",
+ "hdfs_gluten")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint",
+ HDFS_URL_ENDPOINT + "/")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path",
+ HDFS_METADATA_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type",
+ "cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk",
+ "hdfs")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path",
+ HDFS_CACHE_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size",
+ "10Gi")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes",
+ "main")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk",
+ "hdfs_cache")
+ .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm",
"sparkMurmurHash3_32")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit",
+ "false")
+
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_default_replica",
"1")
+ }
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ override protected def afterAll(): Unit = {
+ try {
+ super.afterAll()
+ } finally {
+ try {
+ if (_spark != null) {
+ try {
+ _spark.sessionState.catalog.reset()
+ } finally {
+ _spark.stop()
+ _spark = null
+ }
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ }
+ }
+
+ FileUtils.forceDelete(new File(basePath))
+ // init GlutenConfig in the next beforeAll
+ GlutenConfig.ins = null
+ }
+}
+// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
new file mode 100644
index 000000000..45eb8625a
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
+import org.apache.spark.sql.delta.files.TahoeFileIndex
+import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+
+import _root_.org.apache.commons.io.FileUtils
+import _root_.org.apache.spark.sql.SaveMode
+import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import io.minio.{BucketExistsArgs, ListObjectsArgs, MakeBucketArgs,
MinioClient, RemoveBucketArgs, RemoveObjectsArgs}
+import io.minio.messages.DeleteObject
+
+import java.io.File
+import java.util
+
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
+class GlutenClickHouseMergeTreeWriteOnS3Suite
+ extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+ with AdaptiveSparkPlanHelper {
+
+ override protected val needCopyParquetToTablePath = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ val client = MinioClient
+ .builder()
+ .endpoint(MINIO_ENDPOINT)
+ .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
+ .build()
+ if
(client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
+ val results =
+
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
+ val objects = new util.LinkedList[DeleteObject]()
+ results.forEach(
+ obj => {
+ objects.add(new DeleteObject(obj.get().objectName()))
+ })
+ val removeResults = client.removeObjects(
+
RemoveObjectsArgs.builder().bucket(BUCKET_NAME).objects(objects).build())
+ removeResults.forEach(result => result.get().message())
+
client.removeBucket(RemoveBucketArgs.builder().bucket(BUCKET_NAME).build())
+ }
+ client.makeBucket(MakeBucketArgs.builder().bucket(BUCKET_NAME).build())
+ FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+ FileUtils.deleteDirectory(new File(S3_CACHE_PATH))
+ FileUtils.forceMkdir(new File(S3_METADATA_PATH))
+ FileUtils.forceMkdir(new File(S3_CACHE_PATH))
+ }
+
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+ FileUtils.deleteDirectory(new File(S3_CACHE_PATH))
+ }
+
+ test("test mergetree table write") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_s3;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_s3
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_s3'
+ |TBLPROPERTIES (storage_policy='__s3_main')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_s3
+ | select * from lineitem
+ |""".stripMargin)
+ FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_s3
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assert(addFiles.size == 1)
+ assert(addFiles.head.rows == 600572)
+ }
+ spark.sql("drop table lineitem_mergetree_s3") // clean up
+ }
+
+ test("test mergetree write with orderby keys / primary keys") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey_s3
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (storage_policy='__s3_main',
+ | orderByKey='l_shipdate,l_orderkey',
+ | primaryKey='l_shipdate')
+ |LOCATION
's3a://$BUCKET_NAME/lineitem_mergetree_orderbykey_s3'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_orderbykey_s3
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_orderbykey_s3
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_shipdate,l_orderkey"))
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .get
+ .mkString(",")
+ .equals("l_shipdate"))
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assert(addFiles.size == 1)
+ assert(addFiles.head.rows == 600572)
+ }
+ spark.sql("drop table lineitem_mergetree_orderbykey_s3")
+ }
+
+ test("test mergetree write with partition") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_partition_s3;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_partition_s3
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |TBLPROPERTIES (storage_policy='__s3_main',
+ | orderByKey='l_orderkey',
+ | primaryKey='l_orderkey')
+ |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_partition_s3'
+ |""".stripMargin)
+
+ // dynamic partitions
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_partition_s3
+ | select * from lineitem
+ |""".stripMargin)
+
+ // write with dataframe api
+ val source = spark.sql(s"""
+ |select
+ | l_orderkey ,
+ | l_partkey ,
+ | l_suppkey ,
+ | l_linenumber ,
+ | l_quantity ,
+ | l_extendedprice ,
+ | l_discount ,
+ | l_tax ,
+ | l_returnflag ,
+ | l_linestatus ,
+ | l_shipdate ,
+ | l_commitdate ,
+ | l_receiptdate ,
+ | l_shipinstruct ,
+ | l_shipmode ,
+ | l_comment
+ | from lineitem
+ | where l_shipdate BETWEEN date'1993-01-01' AND
date'1993-01-10'
+ |""".stripMargin)
+
+ source.write
+ .format("clickhouse")
+ .mode(SaveMode.Append)
+ .insertInto("lineitem_mergetree_partition_s3")
+
+ // static partition
+ spark.sql(s"""
+ | insert into lineitem_mergetree_partition_s3 PARTITION
(l_returnflag = 'A')
+ | (l_shipdate,
+ | l_orderkey,
+ | l_partkey,
+ | l_suppkey,
+ | l_linenumber,
+ | l_quantity,
+ | l_extendedprice,
+ | l_discount,
+ | l_tax,
+ | l_linestatus,
+ | l_commitdate,
+ | l_receiptdate,
+ | l_shipinstruct,
+ | l_shipmode,
+ | l_comment)
+ | select
+ | l_shipdate,
+ | l_orderkey,
+ | l_partkey,
+ | l_suppkey,
+ | l_linenumber,
+ | l_quantity,
+ | l_extendedprice,
+ | l_discount,
+ | l_tax,
+ | l_linestatus,
+ | l_commitdate,
+ | l_receiptdate,
+ | l_shipinstruct,
+ | l_shipmode,
+ | l_comment from lineitem
+ | where l_returnflag = 'A'
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_partition_s3
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr, compareResult = false) {
+ df =>
+ val result = df.collect()
+ assert(result.length == 4)
+ assert(result(0).getString(0).equals("A"))
+ assert(result(0).getString(1).equals("F"))
+ assert(result(0).getDouble(2) == 7578058.0)
+
+ assert(result(2).getString(0).equals("N"))
+ assert(result(2).getString(1).equals("O"))
+ assert(result(2).getDouble(2) == 7454519.0)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec.head
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+ assert(mergetreeScan.metrics("numFiles").value == 6)
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_orderkey"))
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .get
+ .mkString(",")
+ .equals("l_orderkey"))
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .partitionColumns(0)
+ .equals("l_returnflag"))
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assert(addFiles.size == 6)
+ assert(addFiles.map(_.rows).sum == 750735)
+ }
+ spark.sql("drop table lineitem_mergetree_partition_s3")
+
+ }
+
+ test("test mergetree write with bucket table") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_bucket_s3;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bucket_s3
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |PARTITIONED BY (l_returnflag)
+ |CLUSTERED BY (l_orderkey)
+ |${if (sparkVersion.equals("3.2")) "" else "SORTED BY
(l_orderkey)"} INTO 4 BUCKETS
+ |LOCATION 's3a://$BUCKET_NAME/lineitem_mergetree_bucket_s3'
+ |TBLPROPERTIES (storage_policy='__s3_main')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_bucket_s3
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | lineitem_mergetree_bucket_s3
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec(0)
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(!ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ if (sparkVersion.equals("3.2")) {
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).orderByKeyOption.isEmpty)
+ } else {
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_orderkey"))
+ }
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).primaryKeyOption.isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.size ==
1)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .partitionColumns(0)
+ .equals("l_returnflag"))
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assert(addFiles.size == 12)
+ assert(addFiles.map(_.rows).sum == 600572)
+ }
+ spark.sql("drop table lineitem_mergetree_bucket_s3")
+ }
+
+}
+// scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/CMakeLists.txt
b/cpp-ch/local-engine/CMakeLists.txt
index ae58aa70c..40fe4402f 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -57,6 +57,8 @@ add_headers_and_sources(shuffle Shuffle)
add_headers_and_sources(operator Operator)
add_headers_and_sources(jni jni)
add_headers_and_sources(aggregate_functions AggregateFunctions)
+add_headers_and_sources(disks Disks)
+add_headers_and_sources(disks Disks/ObjectStorages)
include_directories(
${JNI_INCLUDE_DIRS}
@@ -89,7 +91,9 @@ add_library(gluten_clickhouse_backend_libs
${shuffle_sources}
${operator_sources}
${aggregate_functions_sources}
- ${jni_sources})
+ ${jni_sources}
+ ${disks_sources}
+)
target_link_libraries(gluten_clickhouse_backend_libs PUBLIC
substrait_source
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index b728d7df0..880f6668d 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -67,6 +67,7 @@
#include <boost/algorithm/string/predicate.hpp>
#include "CHUtil.h"
+#include "Disks/registerGlutenDisks.h"
#include <unistd.h>
#include <sys/resource.h>
@@ -677,6 +678,19 @@ void
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
global_context->setTemporaryStoragePath(config->getString("tmp_path",
getDefaultPath()), 0);
global_context->setPath(config->getString("path", "/"));
+
+ String mark_cache_policy = config->getString("mark_cache_policy",
DEFAULT_MARK_CACHE_POLICY);
+ size_t mark_cache_size = config->getUInt64("mark_cache_size",
DEFAULT_MARK_CACHE_MAX_SIZE);
+ double mark_cache_size_ratio =
config->getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO);
+ if (!mark_cache_size)
+ LOG_ERROR(&Poco::Logger::get("CHUtil"), "Too low mark cache size
will lead to severe performance degradation.");
+
+ global_context->setMarkCache(mark_cache_policy, mark_cache_size,
mark_cache_size_ratio);
+
+ String index_mark_cache_policy =
config->getString("index_mark_cache_policy", DEFAULT_INDEX_MARK_CACHE_POLICY);
+ size_t index_mark_cache_size =
config->getUInt64("index_mark_cache_size", DEFAULT_INDEX_MARK_CACHE_MAX_SIZE);
+ double index_mark_cache_size_ratio =
config->getDouble("index_mark_cache_size_ratio",
DEFAULT_INDEX_MARK_CACHE_SIZE_RATIO);
+ global_context->setIndexMarkCache(index_mark_cache_policy,
index_mark_cache_size, index_mark_cache_size_ratio);
}
}
@@ -709,11 +723,22 @@ void registerAllFunctions()
auto & factory = AggregateFunctionCombinatorFactory::instance();
registerAggregateFunctionCombinatorPartialMerge(factory);
}
+
+}
+
+void registerGlutenDisks()
+{
registerDisks(true);
+
+#if USE_AWS_S3
+ registerGlutenDisks(true);
+#endif
}
void BackendInitializerUtil::registerAllFactories()
{
+ registerGlutenDisks();
+
registerReadBufferBuilders();
registerWriteBufferBuilders();
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index f6030485b..308e22422 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -124,6 +124,7 @@ public:
};
void registerAllFunctions();
+void registerGlutenDisks();
class BackendFinalizerUtil;
class JNIUtils;
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index 2f6b4602d..c5122905e 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -22,6 +22,8 @@
#include <IO/WriteHelpers.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <google/protobuf/util/json_util.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/document.h>
using namespace DB;
@@ -40,7 +42,10 @@ std::shared_ptr<DB::StorageInMemoryMetadata>
buildMetaData(const DB::NamesAndTyp
metadata->sorting_key = KeyDescription::parse(table.order_by_key,
metadata->getColumns(), context);
if (table.primary_key.empty())
{
- metadata->primary_key.expression =
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>());
+ if (table.order_by_key != MergeTreeTable::TUPLE)
+ metadata->primary_key = KeyDescription::parse(table.order_by_key,
metadata->getColumns(), context);
+ else
+ metadata->primary_key.expression =
std::make_shared<ExpressionActions>(std::make_shared<ActionsDAG>());
}
else
{
@@ -49,13 +54,12 @@ std::shared_ptr<DB::StorageInMemoryMetadata>
buildMetaData(const DB::NamesAndTyp
return metadata;
}
-std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings()
+std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const
MergeTreeTableSettings & config)
{
auto settings = std::make_unique<DB::MergeTreeSettings>();
-// settings->set("min_bytes_for_wide_part", Field(0));
-// settings->set("min_rows_for_wide_part", Field(0));
settings->set("allow_nullable_key", Field(1));
- // settings->set("storage_policy", Field("s3_main"));
+ if (!config.storage_policy.empty())
+ settings->set("storage_policy", Field(config.storage_policy));
return settings;
}
@@ -70,6 +74,15 @@ std::unique_ptr<SelectQueryInfo>
buildQueryInfo(NamesAndTypesList & names_and_ty
}
+void parseTableConfig(MergeTreeTableSettings & settings, String config_json)
+{
+ rapidjson::Document doc;
+ doc.Parse(config_json.c_str());
+ if (doc.HasMember("storage_policy"))
+ settings.storage_policy = doc["storage_policy"].GetString();
+
+}
+
MergeTreeTable parseMergeTreeTableString(const std::string & info)
{
@@ -97,7 +110,9 @@ MergeTreeTable parseMergeTreeTableString(const std::string &
info)
assertChar('\n', in);
readString(table.absolute_path, in);
assertChar('\n', in);
- readString(table.table_configs_json, in);
+ String json;
+ readString(json, in);
+ parseTableConfig(table.table_configs, json);
assertChar('\n', in);
while (!in.eof())
{
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h
b/cpp-ch/local-engine/Common/MergeTreeTool.h
index a6af7ebca..bde632f0d 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.h
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.h
@@ -43,6 +43,11 @@ struct MergeTreePart
size_t end;
};
+struct MergeTreeTableSettings
+{
+ String storage_policy = "";
+};
+
struct MergeTreeTable
{
inline static const String TUPLE = "tuple()";
@@ -54,7 +59,7 @@ struct MergeTreeTable
std::string primary_key = "";
std::string relative_path;
std::string absolute_path;
- std::string table_configs_json;
+ MergeTreeTableSettings table_configs;
std::vector<MergeTreePart> parts;
std::unordered_set<String> getPartNames() const;
RangesInDataParts extractRange(DataPartsVector parts_vector) const;
@@ -62,7 +67,7 @@ struct MergeTreeTable
std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const
DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &);
-std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings();
+std::unique_ptr<MergeTreeSettings> buildMergeTreeSettings(const
MergeTreeTableSettings & config);
std::unique_ptr<SelectQueryInfo> buildQueryInfo(NamesAndTypesList &
names_and_types_list);
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
new file mode 100644
index 000000000..bff4108f2
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenDiskHDFS.h"
+#include <ranges>
+#include <Parser/SerializedPlanParser.h>
+#if USE_HDFS
+
+namespace local_engine
+{
+using namespace DB;
+
+void GlutenDiskHDFS::createDirectory(const String & path)
+{
+ DiskObjectStorage::createDirectory(path);
+ hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str());
+}
+
+String GlutenDiskHDFS::path2AbsPath(const String & path)
+{
+ return getObjectStorage()->generateObjectKeyForPath(path).serialize();
+}
+
+void GlutenDiskHDFS::createDirectories(const String & path)
+{
+ DiskObjectStorage::createDirectories(path);
+ auto* hdfs = hdfs_object_storage->getHDFSFS();
+ fs::path p = path;
+ std::vector<std::string> paths_created;
+ while (hdfsExists(hdfs, p.c_str()) < 0)
+ {
+ paths_created.push_back(p);
+ if (!p.has_parent_path())
+ break;
+ p = p.parent_path();
+ }
+ for (const auto & path_to_create : paths_created | std::views::reverse)
+ hdfsCreateDirectory(hdfs, path_to_create.c_str());
+}
+
+void GlutenDiskHDFS::removeDirectory(const String & path)
+{
+ DiskObjectStorage::removeDirectory(path);
+ hdfsDelete(hdfs_object_storage->getHDFSFS(), path.c_str(), 1);
+}
+
+DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
+{
+ const auto config_prefix = "storage_configuration.disks." + name;
+ return std::make_shared<GlutenDiskHDFS>(
+ getName(),
+ object_key_prefix,
+ getMetadataStorage(),
+ getObjectStorage(),
+ SerializedPlanParser::global_context->getConfigRef(),
+ config_prefix);
+}
+
+
+}
+#endif
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
new file mode 100644
index 000000000..9caedaae8
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <config.h>
+
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#if USE_HDFS
+#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
+#endif
+
+namespace local_engine
+{
+#if USE_HDFS
+class GlutenDiskHDFS : public DB::DiskObjectStorage
+{
+public:
+ GlutenDiskHDFS(
+ const String & name_,
+ const String & object_key_prefix_,
+ DB::MetadataStoragePtr metadata_storage_,
+ DB::ObjectStoragePtr object_storage_,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix)
+ : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_,
object_storage_, config, config_prefix)
+ {
+ chassert(dynamic_cast<local_engine::GlutenHDFSObjectStorage
*>(object_storage_.get()) != nullptr);
+ object_key_prefix = object_key_prefix_;
+ hdfs_object_storage =
dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
+ hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
+ }
+
+ void createDirectory(const String & path) override;
+
+ void createDirectories(const String & path) override;
+
+ void removeDirectory(const String & path) override;
+
+ DB::DiskObjectStoragePtr createDiskObjectStorage() override;
+private:
+ String path2AbsPath(const String & path);
+
+ GlutenHDFSObjectStorage * hdfs_object_storage;
+ String object_key_prefix;
+};
+#endif
+}
+
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
new file mode 100644
index 000000000..3a844a91f
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "GlutenHDFSObjectStorage.h"
+#if USE_HDFS
+#include <Storages/HDFS/ReadBufferFromHDFS.h>
+using namespace DB;
+namespace local_engine
+{
+std::unique_ptr<ReadBufferFromFileBase> GlutenHDFSObjectStorage::readObject(
/// NOLINT
+ const StoredObject & object,
+ const ReadSettings & read_settings,
+ std::optional<size_t>,
+ std::optional<size_t>) const
+{
+ size_t begin_of_path = object.remote_path.find('/',
object.remote_path.find("//") + 2);
+ auto hdfs_path = object.remote_path.substr(begin_of_path);
+ auto hdfs_uri = object.remote_path.substr(0, begin_of_path);
+ return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_path, config,
HDFSObjectStorage::patchSettings(read_settings));
+}
+
+DB::ObjectStorageKey
local_engine::GlutenHDFSObjectStorage::generateObjectKeyForPath(const
std::string & path) const
+{
+ return DB::ObjectStorageKey::createAsAbsolute(hdfs_root_path + path);
+}
+}
+#endif
+
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
new file mode 100644
index 000000000..1efa441c2
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include "config.h"
+
+#if USE_HDFS
+#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
+#endif
+
+namespace local_engine
+{
+
+#if USE_HDFS
+class GlutenHDFSObjectStorage final : public DB::HDFSObjectStorage
+{
+public:
+ GlutenHDFSObjectStorage(
+ const String & hdfs_root_path_,
+ SettingsPtr settings_,
+ const Poco::Util::AbstractConfiguration & config_)
+ : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_),
config(config_)
+ {
+ }
+ std::unique_ptr<DB::ReadBufferFromFileBase> readObject( /// NOLINT
+ const DB::StoredObject & object,
+ const DB::ReadSettings & read_settings = DB::ReadSettings{},
+ std::optional<size_t> read_hint = {},
+ std::optional<size_t> file_size = {}) const override;
+ DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path)
const override;
+ hdfsFS getHDFSFS() const { return hdfs_fs.get(); }
+private:
+ const Poco::Util::AbstractConfiguration & config;
+};
+#endif
+
+}
+
+
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
new file mode 100644
index 000000000..8f2008029
--- /dev/null
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <Disks/ObjectStorages/ObjectStorageFactory.h>
+#if USE_AWS_S3
+#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
+#include <Disks/ObjectStorages/S3/diskSettings.h>
+#include <Disks/ObjectStorages/S3/DiskS3Utils.h>
+#endif
+
+#if USE_HDFS
+#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
+#endif
+
+#include <Interpreters/Context.h>
+#include <Common/Macros.h>
+
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int BAD_ARGUMENTS;
+extern const int LOGICAL_ERROR;
+}
+}
+
+namespace local_engine
+{
+using namespace DB;
+
+#if USE_AWS_S3
+static S3::URI getS3URI(
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & config_prefix,
+ const ContextPtr & context)
+{
+ String endpoint =
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
+ S3::URI uri(endpoint);
+
+ /// An empty key remains empty.
+ if (!uri.key.empty() && !uri.key.ends_with('/'))
+ uri.key.push_back('/');
+
+ return uri;
+}
+
+void registerGlutenS3ObjectStorage(ObjectStorageFactory & factory)
+{
+ static constexpr auto disk_type = "s3_gluten";
+
+ factory.registerObjectStorageType(
+ disk_type,
+ [](
+ const std::string & name,
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & config_prefix,
+ const ContextPtr & context,
+ bool /*skip_access_check*/) -> ObjectStoragePtr
+ {
+ auto uri = getS3URI(config, config_prefix, context);
+ auto s3_capabilities = getCapabilitiesFromConfig(config,
config_prefix);
+ auto settings = getSettings(config, config_prefix, context);
+ auto client = getClient(config, config_prefix, context, *settings);
+ auto key_generator =
createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
+
+ auto object_storage = std::make_shared<S3ObjectStorage>(
+ std::move(client),
+ std::move(settings),
+ uri,
+ s3_capabilities,
+ key_generator,
+ name);
+ return object_storage;
+ });
+}
+
+#endif
+
+#if USE_HDFS
+void registerGlutenHDFSObjectStorage(ObjectStorageFactory & factory)
+{
+ factory.registerObjectStorageType(
+ "hdfs_gluten",
+ [](
+ const std::string & /* name */,
+ const Poco::Util::AbstractConfiguration & config,
+ const std::string & config_prefix,
+ const ContextPtr & context,
+ bool /* skip_access_check */) -> ObjectStoragePtr
+ {
+ auto uri =
context->getMacros()->expand(config.getString(config_prefix + ".endpoint"));
+ checkHDFSURL(uri);
+ if (uri.back() != '/')
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "HDFS path must
ends with '/', but '{}' doesn't.", uri);
+
+ std::unique_ptr<HDFSObjectStorageSettings> settings =
std::make_unique<HDFSObjectStorageSettings>(
+ config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 *
1024),
+ config.getInt(config_prefix + ".objects_chunk_size_to_delete",
1000),
+ context->getSettingsRef().hdfs_replication
+ );
+ return std::make_unique<GlutenHDFSObjectStorage>(uri,
std::move(settings), config);
+ });
+}
+#endif
+}
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
new file mode 100644
index 000000000..c7e9c5fd3
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "config.h"
+#include <Disks/DiskFactory.h>
+#include <Interpreters/Context.h>
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Disks/ObjectStorages/MetadataStorageFactory.h>
+#include <Disks/ObjectStorages/ObjectStorageFactory.h>
+
+#if USE_HDFS
+#include <Disks/ObjectStorages/GlutenDiskHDFS.h>
+#endif
+
+#include "registerGlutenDisks.h"
+
+namespace local_engine
+{
+#if USE_AWS_S3
+void registerGlutenS3ObjectStorage(DB::ObjectStorageFactory & factory);
+#endif
+
+#if USE_HDFS
+void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory & factory);
+#endif
+
+void registerGlutenDisks(bool global_skip_access_check)
+{
+ auto & factory = DB::DiskFactory::instance();
+ auto creator = [global_skip_access_check](
+ const String & name,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix,
+ DB::ContextPtr context,
+ const DB::DisksMap & /* map */,
+ bool,
+ bool) -> DB::DiskPtr
+ {
+ bool skip_access_check = global_skip_access_check ||
config.getBool(config_prefix + ".skip_access_check", false);
+ auto object_storage =
DB::ObjectStorageFactory::instance().create(name, config, config_prefix,
context, skip_access_check);
+ auto metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
+
+ DB::DiskObjectStoragePtr disk =
std::make_shared<DB::DiskObjectStorage>(
+ name,
+ object_storage->getCommonKeyPrefix(),
+ std::move(metadata_storage),
+ std::move(object_storage),
+ config,
+ config_prefix);
+
+ disk->startup(context, skip_access_check);
+ return disk;
+ };
+
+ auto & object_factory = DB::ObjectStorageFactory::instance();
+#if USE_AWS_S3
+ registerGlutenS3ObjectStorage(object_factory);
+ factory.registerDiskType("s3_gluten", creator); /// For compatibility
+#endif
+
+#if USE_HDFS
+ auto hdfs_creator = [global_skip_access_check](
+ const String & name,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix,
+ DB::ContextPtr context,
+ const DB::DisksMap & /* map */,
+ bool,
+ bool) -> DB::DiskPtr
+ {
+ bool skip_access_check = global_skip_access_check ||
config.getBool(config_prefix + ".skip_access_check", false);
+ auto object_storage =
DB::ObjectStorageFactory::instance().create(name, config, config_prefix,
context, skip_access_check);
+ auto metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
+
+ DB::DiskObjectStoragePtr disk =
std::make_shared<local_engine::GlutenDiskHDFS>(
+ name, object_storage->getCommonKeyPrefix(),
std::move(metadata_storage), std::move(object_storage), config, config_prefix);
+
+ disk->startup(context, skip_access_check);
+ return disk;
+ };
+
+ registerGlutenHDFSObjectStorage(object_factory);
+ factory.registerDiskType("hdfs_gluten", hdfs_creator); /// For
compatibility
+#endif
+}
+}
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.h
b/cpp-ch/local-engine/Disks/registerGlutenDisks.h
new file mode 100644
index 000000000..a0c5d96d2
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.h
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+namespace local_engine
+{
+
+/// @param global_skip_access_check - skip access check regardless regardless
+/// .skip_access_check config directive (used
+/// for clickhouse-disks)
+void registerGlutenDisks(bool global_skip_access_check);
+
+}
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 34746217b..82a64b999 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -23,6 +23,7 @@
#include <Storages/StorageMergeTreeFactory.h>
#include <Common/CHUtil.h>
#include <Common/MergeTreeTool.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
#include "MergeTreeRelParser.h"
@@ -61,17 +62,14 @@ static Int64 findMinPosition(const NameSet &
condition_table_columns, const Name
}
CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
- const substrait::Rel & rel_,
const substrait::ReadRel::ExtensionTable & extension_table,
ContextMutablePtr context)
{
- const auto & rel = rel_.read();
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
auto merge_tree_table =
local_engine::parseMergeTreeTableString(table.value());
DB::Block header;
- chassert(rel.has_base_schema());
- header = TypeParser::buildBlockFromNamedStruct(rel.base_schema(),
merge_tree_table.low_card_key);
+ header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema,
merge_tree_table.low_card_key);
auto names_and_types_list = header.getNamesAndTypesList();
auto storage_factory = StorageMergeTreeFactory::instance();
auto metadata = buildMetaData(names_and_types_list, context,
merge_tree_table);
@@ -89,8 +87,7 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
context,
"",
MergeTreeData::MergingParams(),
- buildMergeTreeSettings());
- custom_storage_merge_tree->loadDataParts(false, std::nullopt);
+ buildMergeTreeSettings(merge_tree_table.table_configs));
return custom_storage_merge_tree;
});
return storage;
@@ -137,13 +134,13 @@ MergeTreeRelParser::parseReadRel(
global_context,
"",
MergeTreeData::MergingParams(),
- buildMergeTreeSettings());
+ buildMergeTreeSettings(merge_tree_table.table_configs));
return custom_storage_merge_tree;
});
+ restoreMetaData(storage, merge_tree_table, context);
for (const auto & [name, sizes] : storage->getColumnSizes())
column_sizes[name] = sizes.data_compressed;
-
query_context.storage_snapshot =
std::make_shared<StorageSnapshot>(*storage, metadata);
query_context.custom_storage_merge_tree = storage;
auto names_and_types_list = input.getNamesAndTypesList();
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
index 5f86a0cb4..921f3ac00 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
@@ -38,7 +38,6 @@ class MergeTreeRelParser : public RelParser
{
public:
static std::shared_ptr<CustomStorageMergeTree> parseStorage(
- const substrait::Rel & rel_,
const substrait::ReadRel::ExtensionTable & extension_table,
ContextMutablePtr context);
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index 9d7a35cb2..780d19cc8 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -90,7 +90,8 @@ CustomStorageMergeTree::CustomStorageMergeTree(
, writer(*this)
, reader(*this)
{
- initializeDirectoriesAndFormatVersion(relative_data_path_, attach,
date_column_name);
+ relative_data_path = relative_data_path_;
+ format_version = 1;
}
std::atomic<int> CustomStorageMergeTree::part_num;
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
new file mode 100644
index 000000000..a7d167385
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MetaDataHelper.h"
+#include <filesystem>
+
+#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
+
+using namespace DB;
+
+namespace local_engine
+{
+
+std::unordered_map<String, String> extractPartMetaData(ReadBuffer & in)
+{
+ std::unordered_map<String, String> result;
+ while (!in.eof())
+ {
+ String name;
+ readString(name, in);
+ assertChar('\t', in);
+ UInt64 size;
+ readIntText(size, in);
+ assertChar('\n', in);
+ String data;
+ data.resize(size);
+ in.read(data.data(), size);
+ result.emplace(name, data);
+ }
+ return result;
+}
+
+void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable
& mergeTreeTable, ContextPtr & context)
+{
+ auto data_disk = storage->getStoragePolicy()->getAnyDisk();
+ if (!data_disk->isRemote())
+ return;
+
+ std::unordered_set<String> not_exists_part;
+ DB::MetadataStorageFromDisk * metadata_storage =
static_cast<MetadataStorageFromDisk *>(data_disk->getMetadataStorage().get());
+ auto metadata_disk = metadata_storage->getDisk();
+ auto table_path = std::filesystem::path(mergeTreeTable.relative_path);
+ for (const auto & part : mergeTreeTable.getPartNames())
+ {
+ auto part_path = table_path / part;
+ if (!metadata_disk->exists(part_path))
+ not_exists_part.emplace(part);
+ }
+
+ if (not_exists_part.empty())
+ return;
+
+ if (auto lock =
storage->lockForAlter(context->getSettingsRef().lock_acquire_timeout))
+ {
+ auto s3 = data_disk->getObjectStorage();
+
+ if (!metadata_disk->exists(table_path))
+ metadata_disk->createDirectories(table_path.generic_string());
+
+ for (const auto & part : not_exists_part)
+ {
+ auto part_path = table_path / part;
+ auto metadata_file_path = part_path / "metadata.gluten";
+
+ if (metadata_disk->exists(part_path))
+ continue;
+ else
+ metadata_disk->createDirectories(part_path);
+ auto key =
s3->generateObjectKeyForPath(metadata_file_path.generic_string());
+ StoredObject metadata_object(key.serialize());
+ auto part_metadata =
extractPartMetaData(*s3->readObject(metadata_object));
+ for (const auto & item : part_metadata)
+ {
+ auto item_path = part_path / item.first;
+ auto out = metadata_disk->writeFile(item_path);
+ out->write(item.second.data(), item.second.size());
+ }
+ }
+ }
+}
+
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
new file mode 100644
index 000000000..47c5d615d
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <Common/MergeTreeTool.h>
+#include <Storages/StorageMergeTreeFactory.h>
+
+namespace local_engine
+{
+
+void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable
& mergeTreeTable, ContextPtr & context);
+
+}
+
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index f5c9a1338..8df171f99 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -15,8 +15,11 @@
* limitations under the License.
*/
#include "SparkMergeTreeWriter.h"
+
#include <Disks/createVolume.h>
+#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Interpreters/ActionsDAG.h>
+#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <rapidjson/prettywriter.h>
using namespace DB;
@@ -49,29 +52,59 @@ void SparkMergeTreeWriter::write(DB::Block & block)
}
auto blocks_with_partition =
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add(new_block),
10, metadata_snapshot, context);
- for (auto & item : blocks_with_partition)
- {
- auto temp_part = writeTempPart(item, metadata_snapshot, context);
- temp_part.finalize();
- new_parts.emplace_back(temp_part.part);
+ for (auto & item : blocks_with_partition)
+ {
+ new_parts.emplace_back(writeTempPartAndFinalize(item,
metadata_snapshot).part);
part_num++;
}
}
void SparkMergeTreeWriter::finalize()
{
+ auto block = squashing_transform->add({});
+ if (block.rows())
+ {
+ auto blocks_with_partition =
MergeTreeDataWriter::splitBlockIntoParts(std::move(block), 10,
metadata_snapshot, context);
+ for (auto & item : blocks_with_partition)
+ new_parts.emplace_back(writeTempPartAndFinalize(item,
metadata_snapshot).part);
+ }
+}
+
+DB::MergeTreeDataWriter::TemporaryPart
+SparkMergeTreeWriter::writeTempPartAndFinalize(
+ DB::BlockWithPartition & block_with_partition,
+ const DB::StorageMetadataPtr & metadata_snapshot)
+{
+ auto temp_part = writeTempPart(block_with_partition, metadata_snapshot);
+ temp_part.finalize();
+ saveFileStatus(temp_part);
+ return temp_part;
+}
- auto blocks_with_partition =
MergeTreeDataWriter::splitBlockIntoParts(squashing_transform->add({}), 10,
metadata_snapshot, context);
- for (auto & item : blocks_with_partition)
+void SparkMergeTreeWriter::saveFileStatus(const
DB::MergeTreeDataWriter::TemporaryPart & temp_part) const
+{
+ auto & data_part_storage = temp_part.part->getDataPartStorage();
+
+ const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
+ if (!disk->isRemote()) return;
+ if (auto *const disk_metadata = dynamic_cast<MetadataStorageFromDisk
*>(disk->getMetadataStorage().get()))
{
- auto temp_part = writeTempPart(item, metadata_snapshot, context);
- temp_part.finalize();
- new_parts.emplace_back(temp_part.part);
+ const auto out = data_part_storage.writeFile("metadata.gluten",
DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
+ for (const auto it = data_part_storage.iterate(); it->isValid();
it->next())
+ {
+ auto content = disk_metadata->readFileToString(it->path());
+ writeString(it->name(), *out);
+ writeChar('\t', *out);
+ writeIntText(content.length(), *out);
+ writeChar('\n', *out);
+ writeString(content, *out);
+ }
+ out->finalize();
}
}
MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart(
- BlockWithPartition & block_with_partition, const StorageMetadataPtr &
metadata_snapshot, ContextPtr context)
+ BlockWithPartition & block_with_partition, const StorageMetadataPtr &
metadata_snapshot)
{
MergeTreeDataWriter::TemporaryPart temp_part;
Block & block = block_with_partition.block;
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 5c63d1fef..d316f208e 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -82,7 +82,11 @@ public:
private:
DB::MergeTreeDataWriter::TemporaryPart
- writeTempPart(DB::BlockWithPartition & block_with_partition, const
DB::StorageMetadataPtr & metadata_snapshot, DB::ContextPtr context);
+ writeTempPart(DB::BlockWithPartition & block_with_partition, const
DB::StorageMetadataPtr & metadata_snapshot);
+ DB::MergeTreeDataWriter::TemporaryPart
+ writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot);
+ void saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart &
temp_part) const;
+
String uuid;
String partition_dir;
String bucket_dir;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index d1a675f49..7008c66f7 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1020,7 +1020,7 @@ JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
local_engine::SerializedPlanParser::parseExtensionTable(split_info_str);
auto storage = local_engine::MergeTreeRelParser::parseStorage(
- plan_ptr->relations()[0].root().input(), extension_table,
local_engine::SerializedPlanParser::global_context);
+ extension_table, local_engine::SerializedPlanParser::global_context);
auto uuid = uuid_str + "_" + task_id;
auto * writer = new local_engine::SparkMergeTreeWriter(
*storage, storage->getInMemoryMetadataPtr(),
local_engine::SerializedPlanParser::global_context, uuid, partition_dir,
bucket_dir);
diff --git
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
index bf942ef26..f07e6fccb 100644
---
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
+++
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.StringValue;
import io.substrait.proto.ReadRel;
+import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -71,10 +72,11 @@ public class ExtensionTableNode implements SplitInfo {
this.maxPartsNum = maxPartsNum;
this.database = database;
this.tableName = tableName;
- if (relativePath.contains(":/")) { // file:/tmp/xxx => tmp/xxx
- this.relativePath = relativePath.substring(relativePath.indexOf(":/") +
2);
+ URI table_uri = URI.create(relativePath);
+ if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx
+ this.relativePath = table_uri.getPath().substring(1);
} else {
- this.relativePath = relativePath;
+ this.relativePath = table_uri.getPath();
}
this.absolutePath = absolutePath;
this.tableSchemaJson = tableSchemaJson;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]