This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new b0b3a6399 [GLUTEN-5219][CH]Fix the table metadata sync issue for the
CH backend (#5221)
b0b3a6399 is described below
commit b0b3a63996cc3bcebdfae17d8a30652dc61ba561
Author: Zhichao Zhang <[email protected]>
AuthorDate: Sun Apr 7 10:42:41 2024 +0800
[GLUTEN-5219][CH]Fix the table metadata sync issue for the CH backend
(#5221)
The CH backend receives the table metadata by the substrait plan when
executing create table and query, but now there is no version info to represent
the table metadata, when it meets the below case, it will execute failed:
```
1. create a table_1 with the path1, insert data
2. query from the table_1
3. drop the table_1 and re-create a table named table_1, but the table path
is path2;
4. insert data, will be found that the inserted data will be located in the
path1 but not in the path2;
```
So add a timestamp field to represent the version of the table metadata,
this timestamp will be get from the Delta snapshot. The CH backend will update
the table metadata according to this timestamp.
Close #5219.
---
.../org/apache/spark/sql/delta/Snapshot.scala | 2 +-
.../source/DeltaMergeTreeFileFormat.scala | 6 +
.../org/apache/spark/sql/delta/Snapshot.scala | 6 +-
.../source/DeltaMergeTreeFileFormat.scala | 6 +
.../backendsapi/clickhouse/CHIteratorApi.scala | 1 +
.../backendsapi/clickhouse/CHListenerApi.scala | 2 +-
.../execution/GlutenMergeTreePartition.scala | 1 +
.../delta/ClickhouseOptimisticTransaction.scala | 1 +
.../spark/sql/delta/ClickhouseSnapshot.scala | 17 +-
.../sql/delta/catalog/ClickHouseTableV2.scala | 3 +-
.../commands/OptimizeTableCommandOverwrites.scala | 5 +-
.../utils/MergeTreePartsPartitionsUtil.scala | 9 +
.../datasources/v1/CHMergeTreeWriterInjects.scala | 4 +
.../execution/GlutenClickHouseHiveTableSuite.scala | 11 +-
.../GlutenClickHouseMergeTreeOptimizeSuite.scala | 15 +-
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 20 ++-
...ergeTreeWriteOnObjectStorageAbstractSuite.scala | 97 +---------
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 28 ++-
.../GlutenClickHouseMergeTreeWriteSuite.scala | 196 ++++++++++++++++++++-
.../GlutenClickHouseTPCHBucketSuite.scala | 7 +-
.../GlutenClickHouseTPCHParquetBucketSuite.scala | 6 +-
.../GlutenClickHouseTPCHParquetRFSuite.scala | 7 +-
.../GlutenClickHouseTableAfterRestart.scala | 7 +-
...lutenClickHouseWholeStageTransformerSuite.scala | 110 +++++++++++-
.../execution/GlutenFunctionValidateSuite.scala | 7 +-
.../apache/gluten/utils/UTSystemParameters.scala | 10 ++
.../apache/spark/affinity/MixedAffinitySuite.scala | 1 +
cpp-ch/local-engine/Common/CHUtil.cpp | 3 +
cpp-ch/local-engine/Common/MergeTreeTool.cpp | 2 +
cpp-ch/local-engine/Common/MergeTreeTool.h | 1 +
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 7 +-
.../Storages/StorageMergeTreeFactory.cpp | 147 ++++++----------
.../Storages/StorageMergeTreeFactory.h | 46 +++--
cpp-ch/local-engine/local_engine_jni.cpp | 4 +-
.../substrait/rel/ExtensionTableBuilder.java | 2 +
.../gluten/substrait/rel/ExtensionTableNode.java | 12 +-
.../datasource/GlutenFormatWriterInjects.scala | 2 +-
37 files changed, 517 insertions(+), 294 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/Snapshot.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/Snapshot.scala
index 712ff3ffe..2233aa0cd 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/Snapshot.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/Snapshot.scala
@@ -409,7 +409,7 @@ class Snapshot(
filters: Seq[Expression],
keepNumRecords: Boolean): DeltaScan = {
val deltaScan = ClickhouseSnapshot.deltaScanCache.get(
- FilterExprsAsKey(path, version, filters, None),
+ FilterExprsAsKey(path, ClickhouseSnapshot.genSnapshotId(this), filters,
None),
() => {
super.filesForScan(projection, filters, keepNumRecords)
})
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index d4ca321a9..63490cd10 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -30,6 +30,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
protected var database = ""
protected var tableName = ""
+ protected var snapshotId = ""
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
@@ -39,10 +40,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
+ // scalastyle:off argcount
def this(
metadata: Metadata,
database: String,
tableName: String,
+ snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
@@ -54,6 +57,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
this(metadata)
this.database = database
this.tableName = tableName
+ this.snapshotId = snapshotId
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
@@ -63,6 +67,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
}
+ // scalastyle:on argcount
override def shortName(): String = "mergetree"
@@ -105,6 +110,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path,
database,
tableName,
+ snapshotId,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
diff --git
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/Snapshot.scala
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/Snapshot.scala
index 900ae1c17..1c62d1331 100644
---
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/Snapshot.scala
+++
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/delta/Snapshot.scala
@@ -382,7 +382,7 @@ class Snapshot(
override def filesForScan(filters: Seq[Expression], keepNumRecords:
Boolean): DeltaScan = {
val deltaScan = ClickhouseSnapshot.deltaScanCache.get(
- FilterExprsAsKey(path, version, filters, None),
+ FilterExprsAsKey(path, ClickhouseSnapshot.genSnapshotId(this), filters,
None),
() => {
super.filesForScan(filters, keepNumRecords)
})
@@ -392,7 +392,7 @@ class Snapshot(
override def filesForScan(limit: Long): DeltaScan = {
val deltaScan = ClickhouseSnapshot.deltaScanCache.get(
- FilterExprsAsKey(path, version, Seq.empty, Some(limit)),
+ FilterExprsAsKey(path, ClickhouseSnapshot.genSnapshotId(this),
Seq.empty, Some(limit)),
() => {
super.filesForScan(limit)
})
@@ -402,7 +402,7 @@ class Snapshot(
override def filesForScan(limit: Long, partitionFilters: Seq[Expression]):
DeltaScan = {
val deltaScan = ClickhouseSnapshot.deltaScanCache.get(
- FilterExprsAsKey(path, version, partitionFilters, Some(limit)),
+ FilterExprsAsKey(path, ClickhouseSnapshot.genSnapshotId(this),
partitionFilters, Some(limit)),
() => {
super.filesForScan(limit, partitionFilters)
})
diff --git
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index 002e636af..130790308 100644
---
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -29,6 +29,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends
DeltaParquetFileForma
protected var database = ""
protected var tableName = ""
+ protected var snapshotId = ""
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
protected var minmaxIndexKeyOption: Option[Seq[String]] = None
@@ -38,10 +39,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends
DeltaParquetFileForma
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
+ // scalastyle:off argcount
def this(
metadata: Metadata,
database: String,
tableName: String,
+ snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
@@ -53,6 +56,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends
DeltaParquetFileForma
this(metadata)
this.database = database
this.tableName = tableName
+ this.snapshotId = snapshotId
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
this.minmaxIndexKeyOption = minmaxIndexKeyOption
@@ -62,6 +66,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends
DeltaParquetFileForma
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
}
+ // scalastyle:on argcount
override def shortName(): String = "mergetree"
@@ -104,6 +109,7 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends
DeltaParquetFileForma
path,
database,
tableName,
+ snapshotId,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index ee4db2d64..4926a97eb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -95,6 +95,7 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
-1L,
p.database,
p.table,
+ p.snapshotId,
p.relativeTablePath,
p.absoluteTablePath,
p.orderByKey,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 17883cf24..fcc3c333e 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -66,7 +66,7 @@ class CHListenerApi extends ListenerApi with Logging {
// add memory limit for external sort
val externalSortKey =
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
s".max_bytes_before_external_sort"
- if (conf.getInt(externalSortKey, -1) < 0) {
+ if (conf.getLong(externalSortKey, -1) < 0) {
if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
val memSize =
JavaUtils.byteStringAsBytes(conf.get("spark.memory.offHeap.size")).toInt
if (memSize > 0) {
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
index b28f9d01f..7532a29f8 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
@@ -35,6 +35,7 @@ case class GlutenMergeTreePartition(
engine: String,
database: String,
table: String,
+ snapshotId: String,
relativeTablePath: String,
absoluteTablePath: String,
orderByKey: String,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 7c27a5741..c73b7e7af 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -128,6 +128,7 @@ class ClickhouseOptimisticTransaction(
metadata,
tableV2.dataBaseName,
tableV2.tableName,
+ ClickhouseSnapshot.genSnapshotId(tableV2.snapshot),
tableV2.orderByKeyOption,
tableV2.lowCardKeyOption,
tableV2.minmaxIndexKeyOption,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala
index 0f53ad478..c8d2d1c77 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseSnapshot.scala
@@ -41,7 +41,7 @@ case class AddFileAsKey(addFile: AddFile) {
case class FilterExprsAsKey(
path: Path,
- version: Long,
+ snapshotId: String,
filters: Seq[Expression],
limit: Option[Long]) {
@@ -64,14 +64,14 @@ case class FilterExprsAsKey(
)
})
override def hashCode(): Int = {
- Objects.hashCode(path, version.asInstanceOf[AnyRef], semanticFilters,
limit)
+ Objects.hashCode(path, snapshotId, semanticFilters, limit)
}
override def equals(o: Any): Boolean = {
o match {
case that: FilterExprsAsKey =>
that.path == this.path &&
- that.version == this.version &&
+ that.snapshotId.equals(this.snapshotId) &&
that.semanticFilters == this.semanticFilters &&
that.limit == this.limit
case _ => false
@@ -109,4 +109,15 @@ object ClickhouseSnapshot {
pathToAddMTPCache.invalidateAll()
deltaScanCache.invalidateAll()
}
+
+ // use timestamp + version as the snapshot id for ch backend
+ def genSnapshotId(snapshot: Snapshot): String = {
+ // When CTAS, there is no latest timestamp in the Snapshot
+ val ts = if (snapshot.metadata.createdTime.isDefined) {
+ snapshot.metadata.createdTime.get
+ } else {
+ System.currentTimeMillis()
+ }
+ ts.toString + "_" + snapshot.version.toString
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 11b2b18e9..c491ecfef 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
Expression}
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.delta.{ColumnWithDefaultExprUtils,
DeltaColumnMapping, DeltaErrors, DeltaLog, DeltaTableIdentifier,
DeltaTimeTravelSpec, Snapshot}
+import org.apache.spark.sql.delta.{ClickhouseSnapshot,
ColumnWithDefaultExprUtils, DeltaColumnMapping, DeltaErrors, DeltaLog,
DeltaTableIdentifier, DeltaTimeTravelSpec, Snapshot}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2.deltaLog2Table
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
@@ -275,6 +275,7 @@ class ClickHouseTableV2(
meta,
dataBaseName,
tableName,
+ ClickhouseSnapshot.genSnapshotId(snapshot),
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index 5aeafbf81..36ce1637b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -25,7 +25,7 @@ import org.apache.spark.internal.io.SparkHadoopWriterUtils
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog,
DeltaTableIdentifier, OptimisticTransaction}
+import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaErrors, DeltaLog,
DeltaTableIdentifier, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -51,6 +51,7 @@ object OptimizeTableCommandOverwrites extends Logging {
path: String,
database: String,
tableName: String,
+ snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
@@ -100,6 +101,7 @@ object OptimizeTableCommandOverwrites extends Logging {
description.path,
description.database,
description.tableName,
+ description.snapshotId,
description.orderByKeyOption,
description.lowCardKeyOption,
description.minmaxIndexKeyOption,
@@ -198,6 +200,7 @@ object OptimizeTableCommandOverwrites extends Logging {
txn.deltaLog.dataPath.toString,
tableV2.dataBaseName,
tableV2.tableName,
+ ClickhouseSnapshot.genSnapshotId(tableV2.snapshot),
tableV2.orderByKeyOption,
tableV2.lowCardKeyOption,
tableV2.minmaxIndexKeyOption,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
index e71405a31..2b37ae787 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
@@ -55,6 +55,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
}
val fileIndex = relation.location.asInstanceOf[TahoeFileIndex]
+ // when querying, use deltaLog.update(true) to get the staleness
acceptable snapshot
+ val snapshotId =
ClickhouseSnapshot.genSnapshotId(table.deltaLog.update(true))
+
val partitions = new ArrayBuffer[InputPartition]
val (database, tableName) = if (table.catalogTable.isDefined) {
(table.catalogTable.get.identifier.database.get,
table.catalogTable.get.identifier.table)
@@ -97,6 +100,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
engine,
database,
tableName,
+ snapshotId,
relativeTablePath,
absoluteTablePath,
table.bucketOption.get,
@@ -119,6 +123,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
engine,
database,
tableName,
+ snapshotId,
relativeTablePath,
absoluteTablePath,
optionalBucketSet,
@@ -142,6 +147,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
engine: String,
database: String,
tableName: String,
+ snapshotId: String,
relativeTablePath: String,
absoluteTablePath: String,
optionalBucketSet: Option[BitSet],
@@ -234,6 +240,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
engine,
database,
tableName,
+ snapshotId,
relativeTablePath,
absoluteTablePath,
orderByKey,
@@ -273,6 +280,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
engine: String,
database: String,
tableName: String,
+ snapshotId: String,
relativeTablePath: String,
absoluteTablePath: String,
bucketSpec: BucketSpec,
@@ -344,6 +352,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
engine,
database,
tableName,
+ snapshotId,
relativeTablePath,
absoluteTablePath,
orderByKey,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index 9d9827096..64aa8863b 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -64,6 +64,7 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
path: String,
database: String,
tableName: String,
+ snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
@@ -81,6 +82,7 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
path,
database,
tableName,
+ snapshotId,
orderByKeyOption,
lowCardKeyOption,
minmaxIndexKeyOption,
@@ -128,6 +130,7 @@ object CHMergeTreeWriterInjects {
path: String,
database: String,
tableName: String,
+ snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
@@ -179,6 +182,7 @@ object CHMergeTreeWriterInjects {
-1,
database,
tableName,
+ snapshotId,
path,
"",
orderByKey,
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
index 8cefb8b97..1939883c8 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseHiveTableSuite.scala
@@ -19,10 +19,8 @@ package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters
-import org.apache.spark.SPARK_VERSION_SHORT
import org.apache.spark.SparkConf
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
@@ -89,11 +87,6 @@ class GlutenClickHouseHiveTableSuite
private var _hiveSpark: SparkSession = _
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
override protected def sparkConf: SparkConf = {
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
@@ -115,7 +108,7 @@ class GlutenClickHouseHiveTableSuite
.set("spark.gluten.sql.parquet.maxmin.index", "true")
.set(
"spark.sql.warehouse.dir",
- getClass.getResource("/").getPath +
"unit-tests-working-home/spark-warehouse")
+ getClass.getResource("/").getPath +
"tests-working-home/spark-warehouse")
.set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
.set("spark.gluten.supported.hive.udfs", "my_add")
.setMaster("local[*]")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index 915336082..e553a8c17 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import java.io.File
@@ -34,11 +34,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
@@ -48,16 +43,10 @@ class GlutenClickHouseMergeTreeOptimizeSuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path",
- "/tmp/user_defined")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows",
"10000"
- ) // so that we have enough parts to test
-// .set("spark.ui.enabled", "true")
-//
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline",
"true")
-//
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"debug")
+ )
.set(
"spark.databricks.delta.retentionDurationCheck.enabled",
"false"
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index fb2520453..923813ff2 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
@@ -32,7 +33,7 @@ import java.io.File
// scalastyle:off line.size.limit
class GlutenClickHouseMergeTreeWriteOnHDFSSuite
- extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+ extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
override protected val needCopyParquetToTablePath = true
@@ -41,6 +42,20 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
+ }
+
override protected def beforeEach(): Unit = {
super.beforeEach()
val conf = new Configuration
@@ -48,15 +63,12 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
val fs = FileSystem.get(conf)
fs.delete(new org.apache.hadoop.fs.Path(HDFS_URL), true)
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-// FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
FileUtils.forceMkdir(new File(HDFS_METADATA_PATH))
-// FileUtils.forceMkdir(new File(HDFS_CACHE_PATH))
}
override protected def afterEach(): Unit = {
super.afterEach()
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
-// FileUtils.deleteDirectory(new File(HDFS_CACHE_PATH))
}
ignore("test mergetree table write") {
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
index 24071a1cc..90db0f5d8 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.GlutenConfig
import org.apache.spark.sql.SparkSession
-import _root_.org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import _root_.org.apache.spark.SparkConf
import _root_.org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.commons.io.FileUtils
@@ -42,26 +42,6 @@ class
GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
- protected val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
- val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
- val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
- val S3_ENDPOINT = "s3://127.0.0.1:9000/"
- val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
- val BUCKET_NAME: String = sparkVersion.replace(".", "-")
- val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
-
- val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
- val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
- val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020"
- val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
-
- val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
- val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
-
override protected def initializeSession(): Unit = {
if (_spark == null) {
_spark = SparkSession
@@ -80,82 +60,7 @@ class
GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
-
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format",
"false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path",
- "/tmp/user_defined")
- .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
- .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
- .set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
- .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
- .set("spark.hadoop.fs.s3a.path.style.access", "true")
- .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type",
- "s3_gluten")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.endpoint",
- WHOLE_PATH)
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.access_key_id",
- S3_ACCESS_KEY)
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.secret_access_key",
- S3_SECRET_KEY)
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.metadata_path",
- S3_METADATA_PATH)
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.type",
- "cache")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.disk",
- "s3")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.path",
- S3_CACHE_PATH)
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.max_size",
- "10Gi")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes",
- "main")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk",
- "s3_cache")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type",
- "hdfs_gluten")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint",
- HDFS_URL_ENDPOINT + "/")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path",
- HDFS_METADATA_PATH)
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type",
- "cache")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk",
- "hdfs")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path",
- HDFS_CACHE_PATH)
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size",
- "10Gi")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes",
- "main")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk",
- "hdfs_cache")
- .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm",
"sparkMurmurHash3_32")
- .set(
-
"spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit",
- "false")
-
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_default_replica",
"1")
}
override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index ea8bacdf7..b4d5c3da1 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.execution
+import org.apache.spark.SparkConf
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
@@ -33,7 +34,7 @@ import java.util
// scalastyle:off line.size.limit
class GlutenClickHouseMergeTreeWriteOnS3Suite
- extends GlutenClickHouseMergeTreeWriteOnObjectStorageAbstractSuite
+ extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
override protected val needCopyParquetToTablePath = true
@@ -42,6 +43,20 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+ override protected def createTPCHNotNullTables(): Unit = {
+ createNotNullTPCHTablesInParquet(tablesPath)
+ }
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.io.compression.codec", "LZ4")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"error")
+ }
+
override protected def beforeEach(): Unit = {
super.beforeEach()
val client = MinioClient
@@ -64,18 +79,15 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
}
client.makeBucket(MakeBucketArgs.builder().bucket(BUCKET_NAME).build())
FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
- FileUtils.deleteDirectory(new File(S3_CACHE_PATH))
FileUtils.forceMkdir(new File(S3_METADATA_PATH))
- FileUtils.forceMkdir(new File(S3_CACHE_PATH))
}
override protected def afterEach(): Unit = {
super.afterEach()
FileUtils.deleteDirectory(new File(S3_METADATA_PATH))
- FileUtils.deleteDirectory(new File(S3_CACHE_PATH))
}
- ignore("test mergetree table write") {
+ test("test mergetree table write") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_s3;
|""".stripMargin)
@@ -158,7 +170,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
spark.sql("drop table lineitem_mergetree_s3") // clean up
}
- ignore("test mergetree write with orderby keys / primary keys") {
+ test("test mergetree write with orderby keys / primary keys") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3;
|""".stripMargin)
@@ -255,7 +267,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
spark.sql("drop table lineitem_mergetree_orderbykey_s3")
}
- ignore("test mergetree write with partition") {
+ test("test mergetree write with partition") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_partition_s3;
|""".stripMargin)
@@ -437,7 +449,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
}
- ignore("test mergetree write with bucket table") {
+ test("test mergetree write with bucket table") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_bucket_s3;
|""".stripMargin)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 6cd322396..d0945ba0f 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -16,13 +16,15 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
+import org.apache.commons.io.filefilter.WildcardFileFilter
+
import java.io.File
import scala.io.Source
@@ -40,11 +42,6 @@ class GlutenClickHouseMergeTreeWriteSuite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
@@ -1552,5 +1549,192 @@ class GlutenClickHouseMergeTreeWriteSuite
checkSelectedMarksCnt(df, 29)
})
}
+
+ test("GLUTEN-5219: Fix the table metadata sync issue for the CH backend") {
+ def checkQueryResult(tableName: String): Unit = {
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_returnflag,
+ | l_linestatus,
+ | sum(l_quantity) AS sum_qty,
+ | sum(l_extendedprice) AS sum_base_price,
+ | sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
+ | sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS
sum_charge,
+ | avg(l_quantity) AS avg_qty,
+ | avg(l_extendedprice) AS avg_price,
+ | avg(l_discount) AS avg_disc,
+ | count(*) AS count_order
+ |FROM
+ | $tableName
+ |WHERE
+ | l_shipdate <= date'1998-09-02' - interval 1 day
+ |GROUP BY
+ | l_returnflag,
+ | l_linestatus
+ |ORDER BY
+ | l_returnflag,
+ | l_linestatus;
+ |
+ |""".stripMargin
+ runTPCHQueryBySQL(1, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec(0)
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+ val addFiles =
+ fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+ assert(addFiles.size == 6)
+ assert(
+ addFiles.map(_.rows).sum
+ == 600572)
+ }
+ }
+
+ // test with ctas
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_ctas_5219;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE lineitem_mergetree_ctas_5219
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_ctas_5219'
+ | as select * from lineitem
+ |""".stripMargin)
+
+ checkQueryResult("lineitem_mergetree_ctas_5219")
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_ctas_5219;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE lineitem_mergetree_ctas_5219
+ |USING clickhouse
+ |TBLPROPERTIES (orderByKey='l_returnflag,l_shipdate',
+ | primaryKey='l_returnflag,l_shipdate')
+ |LOCATION '$basePath/lineitem_mergetree_ctas_5219_1'
+ | as select * from lineitem
+ |""".stripMargin)
+
+ checkQueryResult("lineitem_mergetree_ctas_5219")
+
+ var dataPath = new File(s"$basePath/lineitem_mergetree_ctas_5219_1")
+ assert(dataPath.isDirectory && dataPath.isDirectory)
+
+ val fileFilter = new WildcardFileFilter("*_0_*")
+ var dataFileList = dataPath.list(fileFilter)
+ assert(dataFileList.size == 6)
+
+ // test with the normal table
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_5219;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_5219
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (orderByKey='l_returnflag,l_shipdate',
+ | primaryKey='l_returnflag,l_shipdate')
+ |LOCATION '$basePath/lineitem_mergetree_5219'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_5219
+ | select * from lineitem
+ |""".stripMargin)
+
+ checkQueryResult("lineitem_mergetree_5219")
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_5219;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_5219
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (orderByKey='l_shipdate',
+ | primaryKey='l_shipdate')
+ |LOCATION '$basePath/lineitem_mergetree_5219_1'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_5219
+ | select * from lineitem
+ |""".stripMargin)
+
+ checkQueryResult("lineitem_mergetree_5219")
+
+ dataPath = new File(s"$basePath/lineitem_mergetree_5219_1")
+ assert(dataPath.isDirectory && dataPath.isDirectory)
+
+ dataFileList = dataPath.list(fileFilter)
+ assert(dataFileList.size == 6)
+
+ // re-create the same table
+ for (i <- 0 until 10) {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_5219_s purge;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE lineitem_mergetree_5219_s
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_5219_s'
+ | as select * from lineitem
+ |""".stripMargin)
+
+ checkQueryResult("lineitem_mergetree_5219_s")
+ }
+
+ dataPath = new File(s"$basePath/lineitem_mergetree_5219_s")
+ assert(dataPath.isDirectory && dataPath.isDirectory)
+
+ dataFileList = dataPath.list(fileFilter)
+ assert(dataFileList.size == 6)
+ }
}
// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
index 2aafef67e..8695e9483 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHBucketSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -39,11 +39,6 @@ class GlutenClickHouseTPCHBucketSuite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"bucket-queries-output"
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
index 3dc7b1166..14d3e0130 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetBucketSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.InputIteratorTransformer
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
@@ -42,10 +42,6 @@ class GlutenClickHouseTPCHParquetBucketSuite
protected val bucketTableResourcePath: String = rootPath +
"tpch-data-bucket/parquet_bucket"
protected val bucketTableDataPath: String = basePath + "/tpch-parquet-bucket"
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
override protected def sparkConf: SparkConf = {
super.sparkConf
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetRFSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetRFSuite.scala
index 0f2e6b44c..83e847a70 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetRFSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHParquetRFSuite.scala
@@ -16,15 +16,10 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import org.apache.spark.SparkConf
class GlutenClickHouseTPCHParquetRFSuite extends
GlutenClickHouseTPCHSaltNullParquetSuite {
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
override protected def sparkConf: SparkConf = {
super.sparkConf
// radically small threshold to force runtime bloom filter
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
index e751c2fda..36002b7e5 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTableAfterRestart.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog}
@@ -41,11 +41,6 @@ class GlutenClickHouseTableAfterRestart
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
super.sparkConf
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
index 22fd58ab0..a98cf329e 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala
@@ -19,18 +19,42 @@ package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters
-import org.apache.spark.SparkConf
+import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.commons.io.FileUtils
import java.io.File
+// Some sqls' line length exceeds 100
+// scalastyle:off line.size.limit
+
class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSuite {
val DBL_EPSILON = 2.2204460492503131e-16
val DBL_RELAX_EPSILON: Double = Math.pow(10, -11)
val FLT_EPSILON = 1.19209290e-07f
+
+ protected val sparkVersion: String = {
+ val version = SPARK_VERSION_SHORT.split("\\.")
+ version(0) + "." + version(1)
+ }
+
+ val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
+ val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
+ val S3_ENDPOINT = "s3://127.0.0.1:9000/"
+ val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
+ val BUCKET_NAME: String = sparkVersion.replace(".", "-")
+ val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"
+
+ val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
+ val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
+ val HDFS_URL_ENDPOINT = s"hdfs://127.0.0.1:8020"
+ val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
+
+ val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
+ val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
+
def AlmostEqualsIsRel(expected: Double, actual: Double, EPSILON: Double =
DBL_EPSILON): Unit = {
val diff = Math.abs(expected - actual)
val epsilon = EPSILON * Math.max(Math.abs(expected), Math.abs(actual))
@@ -43,14 +67,93 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
}
}
- override protected def sparkConf: SparkConf =
- super.sparkConf
+ override protected def sparkConf: SparkConf = {
+ val conf = super.sparkConf
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set(
"spark.gluten.sql.columnar.backend.ch.use.v2",
ClickHouseConfig.DEFAULT_USE_DATASOURCE_V2)
.set("spark.gluten.sql.enable.native.validation", "false")
.set("spark.sql.warehouse.dir", warehouse)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.user_defined_path",
+ "/tmp/user_defined")
+ if (UTSystemParameters.testMergeTreeOnObjectStorage) {
+ conf
+ .set("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY)
+ .set("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY)
+ .set("spark.hadoop.fs.s3a.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem")
+ .set("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT)
+ .set("spark.hadoop.fs.s3a.path.style.access", "true")
+ .set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.type",
+ "s3_gluten")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.endpoint",
+ WHOLE_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.access_key_id",
+ S3_ACCESS_KEY)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.secret_access_key",
+ S3_SECRET_KEY)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3.metadata_path",
+ S3_METADATA_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.type",
+ "cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.disk",
+ "s3")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.path",
+ S3_CACHE_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.s3_cache.max_size",
+ "10Gi")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes",
+ "main")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__s3_main.volumes.main.disk",
+ "s3_cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.type",
+ "hdfs_gluten")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.endpoint",
+ HDFS_URL_ENDPOINT + "/")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs.metadata_path",
+ HDFS_METADATA_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.type",
+ "cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.disk",
+ "hdfs")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.path",
+ HDFS_CACHE_PATH)
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.disks.hdfs_cache.max_size",
+ "10Gi")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes",
+ "main")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.storage_configuration.policies.__hdfs_main.volumes.main.disk",
+ "hdfs_cache")
+ .set(
+
"spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_client_read_shortcircuit",
+ "false")
+
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.hdfs.dfs_default_replica",
"1")
+ } else {
+ conf
+ }
+ }
override def beforeAll(): Unit = {
// prepare working paths
@@ -73,3 +176,4 @@ class GlutenClickHouseWholeStageTransformerSuite extends
WholeStageTransformerSu
final override protected val resourcePath: String = "" // ch not need this
override protected val fileFormat: String = "parquet"
}
+// scalastyle:off line.size.limit
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
index 255d2705a..204065e39 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenFunctionValidateSuite.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters
-import org.apache.spark.{SPARK_VERSION_SHORT, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding,
NullPropagation}
import org.apache.spark.sql.internal.SQLConf
@@ -32,11 +32,6 @@ import scala.reflect.ClassTag
class GlutenFunctionValidateSuite extends
GlutenClickHouseWholeStageTransformerSuite {
- protected lazy val sparkVersion: String = {
- val version = SPARK_VERSION_SHORT.split("\\.")
- version(0) + "." + version(1)
- }
-
protected val tablesPath: String = basePath + "/tpch-data"
protected val tpchQueries: String =
rootPath + "../../../../gluten-core/src/test/resources/tpch-queries"
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/UTSystemParameters.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/UTSystemParameters.scala
index 81f286b52..059d6cb04 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/UTSystemParameters.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/UTSystemParameters.scala
@@ -48,4 +48,14 @@ object UTSystemParameters {
}
}
+ private val TEST_MERGETREE_ON_OBJECT_STORAGE =
"gluten.ch.test.mergetree.object.storage"
+ private val TEST_MERGETREE_ON_OBJECT_STORAGE_DEFAULT_VALUE = "true"
+
+ def testMergeTreeOnObjectStorage: Boolean = {
+ System
+ .getProperty(
+ UTSystemParameters.TEST_MERGETREE_ON_OBJECT_STORAGE,
+ UTSystemParameters.TEST_MERGETREE_ON_OBJECT_STORAGE_DEFAULT_VALUE)
+ .toBoolean
+ }
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
index 4dc19e62e..fb4df1460 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
@@ -54,6 +54,7 @@ class MixedAffinitySuite extends QueryTest with
SharedSparkSession {
"",
"",
"",
+ "",
"fakePath",
"fakePath2",
"",
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 880f6668d..ba86fe306 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -797,6 +797,9 @@ void BackendInitializerUtil::init(std::string * plan)
// in case of running the multiple gluten ut in one process
ReadBufferBuilderFactory::instance().clean();
+ // Init the table metadata cache map
+ StorageMergeTreeFactory::init_cache_map();
+
std::call_once(
init_flag,
[&]
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index 39198b87e..3b517f30b 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -160,6 +160,8 @@ MergeTreeTable parseMergeTreeTableString(const std::string
& info)
assertChar('\n', in);
readString(table.table, in);
assertChar('\n', in);
+ readString(table.snapshot_id, in);
+ assertChar('\n', in);
String schema;
readString(schema, in);
google::protobuf::util::JsonStringToMessage(schema, &table.schema);
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h
b/cpp-ch/local-engine/Common/MergeTreeTool.h
index d8ca9d34e..0f0a1c1c7 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.h
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.h
@@ -53,6 +53,7 @@ struct MergeTreeTable
inline static const String TUPLE = "tuple()";
std::string database;
std::string table;
+ std::string snapshot_id;
substrait::NamedStruct schema;
std::string order_by_key;
std::string low_card_key;
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 811eaa6d9..eec239c73 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -82,6 +82,7 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
auto storage = storage_factory.getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table, uuid),
+ merge_tree_table.snapshot_id,
metadata->getColumns(),
[&]() -> CustomStorageMergeTreePtr
{
@@ -129,6 +130,7 @@ MergeTreeRelParser::parseReadRel(
StorageID table_id(merge_tree_table.database, merge_tree_table.table);
auto storage = storage_factory.getStorage(
table_id,
+ merge_tree_table.snapshot_id,
metadata->getColumns(),
[&]() -> CustomStorageMergeTreePtr
{
@@ -161,8 +163,7 @@ MergeTreeRelParser::parseReadRel(
query_info->prewhere_info = parsePreWhereInfo(rel.filter(), input);
}
- std::vector<DataPartPtr> selected_parts =
storage_factory.getDataParts(table_id, merge_tree_table.getPartNames());
-
+ std::vector<DataPartPtr> selected_parts =
storage_factory.getDataParts(table_id, merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
auto ranges = merge_tree_table.extractRange(selected_parts);
if (selected_parts.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
@@ -390,4 +391,4 @@ String MergeTreeRelParser::getCHFunctionName(const
substrait::Expression_ScalarF
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unsupported substrait
function on mergetree prewhere parser: {}", func_name);
return it->second;
}
-}
\ No newline at end of file
+}
diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
index d5b23ffbd..a38ef10ea 100644
--- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
+++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.cpp
@@ -26,143 +26,106 @@ StorageMergeTreeFactory &
StorageMergeTreeFactory::instance()
void StorageMergeTreeFactory::freeStorage(StorageID id)
{
+ if (!id.hasUUID())
+ {
+ return;
+ }
auto table_name = id.database_name + "." + id.table_name + "@" +
toString(id.uuid);
-
{
std::lock_guard lock(storage_map_mutex);
- if (storage_map.contains(table_name))
+ if (storage_map->has(table_name))
{
- storage_map.erase(table_name);
- }
- if (storage_columns_map.contains(table_name))
- {
- storage_columns_map.erase(table_name);
+ storage_map->remove(table_name);
}
}
{
std::lock_guard lock(datapart_mutex);
- if (datapart_map.contains(table_name))
+ if (datapart_map->has(table_name))
{
- datapart_map.erase(table_name);
+ datapart_map->remove(table_name);
}
}
-
- {
- std::lock_guard lock(metadata_map_mutex);
- if (metadata_map.contains(table_name))
- {
- metadata_map.erase(table_name);
- }
- }
-
}
CustomStorageMergeTreePtr
-StorageMergeTreeFactory::getStorage(StorageID id, ColumnsDescription columns,
std::function<CustomStorageMergeTreePtr()> creator)
+StorageMergeTreeFactory::getStorage(StorageID id, const String & snapshot_id,
ColumnsDescription columns, std::function<CustomStorageMergeTreePtr()> creator)
{
- auto table_name = id.database_name + "." + id.table_name + "@" +
toString(id.uuid);
+ auto table_name = id.database_name + "." + id.table_name;
+ // for optimize table
+ if (id.hasUUID())
+ {
+ table_name += "@" + toString(id.uuid);
+ }
+ else
+ {
+ table_name += "_" + snapshot_id;
+ }
std::lock_guard lock(storage_map_mutex);
- if (!storage_map.contains(table_name))
+ if (!storage_map->has(table_name))
{
- if (storage_map.contains(table_name))
- {
- std::set<std::string> existed_columns =
storage_columns_map.at(table_name);
- for (const auto & column : columns)
- {
- if (!existed_columns.contains(column.name))
- {
- storage_map.erase(table_name);
- storage_columns_map.erase(table_name);
- }
- }
- }
- if (!storage_map.contains(table_name))
- {
- storage_map.emplace(table_name, creator());
- storage_columns_map.emplace(table_name, std::set<std::string>());
- for (const auto & column :
storage_map.at(table_name)->getInMemoryMetadataPtr()->columns)
- {
- storage_columns_map.at(table_name).emplace(column.name);
- }
- }
+ storage_map->add(table_name, creator());
}
- return storage_map.at(table_name);
+ return *(storage_map->get(table_name));
}
-StorageInMemoryMetadataPtr StorageMergeTreeFactory::getMetadata(StorageID id,
std::function<StorageInMemoryMetadataPtr()> creator)
+DataPartsVector StorageMergeTreeFactory::getDataParts(StorageID id, const
String & snapshot_id, std::unordered_set<String> part_name)
{
- auto table_name = id.database_name + "." + id.table_name + "@" +
toString(id.uuid);
-
- std::lock_guard lock(metadata_map_mutex);
- if (!metadata_map.contains(table_name))
+ DataPartsVector res;
+ auto table_name = id.database_name + "." + id.table_name;
+ // for optimize table
+ if (id.hasUUID())
{
- if (!metadata_map.contains(table_name))
- metadata_map.emplace(table_name, creator());
+ table_name += "@" + toString(id.uuid);
}
- return metadata_map.at(table_name);
-}
-DataPartsVector StorageMergeTreeFactory::getDataParts(StorageID id,
std::unordered_set<String> part_name)
-{
- DataPartsVector res;
- auto table_name = id.database_name + "." + id.table_name + "@" +
toString(id.uuid);
- std::lock_guard lock(datapart_mutex);
- CustomStorageMergeTreePtr storage_merge_tree;
+ else
{
- std::lock_guard storage_lock(storage_map_mutex);
- storage_merge_tree = storage_map.at(table_name);
+ table_name += "_" + snapshot_id;
}
+ std::lock_guard lock(datapart_mutex);
std::unordered_set<String> missing_names;
-
- if (!datapart_map.contains(table_name)) [[unlikely]]
+ if (!datapart_map->has(table_name)) [[unlikely]]
{
- datapart_map.emplace(table_name, std::unordered_map<String,
DataPartPtr>());
+ auto cache = std::make_shared<Poco::LRUCache<std::string,
DataPartPtr>>(
+
SerializedPlanParser::global_context->getConfigRef().getInt64("table_part_metadata_cache_max_count",
1000000)
+ );
+ datapart_map->add(table_name, cache);
}
+ // find the missing cache part name
for (const auto & name : part_name)
{
- if (!datapart_map[table_name].contains(name))
+ if (!(*(datapart_map->get(table_name)))->has(name))
{
missing_names.emplace(name);
}
else
{
- res.emplace_back(datapart_map[table_name].at(name));
+
res.emplace_back((*((*(datapart_map->get(table_name)))->get(name))));
}
}
- auto missing_parts =
storage_merge_tree->loadDataPartsWithNames(missing_names);
- for (const auto & part : missing_parts)
+
+ if (!missing_names.empty())
{
- res.emplace_back(part);
- datapart_map[table_name].emplace(part->name, part);
+ CustomStorageMergeTreePtr storage_merge_tree;
+ {
+ std::lock_guard storage_lock(storage_map_mutex);
+ storage_merge_tree = *(storage_map->get(table_name));
+ }
+ auto missing_parts =
storage_merge_tree->loadDataPartsWithNames(missing_names);
+ for (const auto & part : missing_parts)
+ {
+ res.emplace_back(part);
+ (*(datapart_map->get(table_name)))->add(part->name, part);
+ }
}
return res;
}
-void StorageMergeTreeFactory::addDataPartToCache(StorageID id, String
part_name, DataPartPtr part)
-{
- auto table_name = id.database_name + "." + id.table_name + "@" +
toString(id.uuid);
- std::lock_guard lock(datapart_mutex);
- if (!datapart_map.contains(table_name))
- {
- std::unordered_map<String, DataPartPtr> item;
- item.emplace(part_name, part);
- datapart_map.emplace(table_name, item);
- }
- else
- {
- datapart_map[table_name].emplace(part_name, part);
- }
-}
-
-
-std::unordered_map<std::string, CustomStorageMergeTreePtr>
StorageMergeTreeFactory::storage_map;
-std::unordered_map<std::string, std::set<std::string>>
StorageMergeTreeFactory::storage_columns_map;
-std::unordered_map<std::string, std::unordered_map<std::string, DataPartPtr>>
StorageMergeTreeFactory::datapart_map;
+// will be inited in native init phase
+std::unique_ptr<Poco::LRUCache<std::string, CustomStorageMergeTreePtr>>
StorageMergeTreeFactory::storage_map = nullptr;
+std::unique_ptr<Poco::LRUCache<std::string,
std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>>
StorageMergeTreeFactory::datapart_map = nullptr;
std::mutex StorageMergeTreeFactory::storage_map_mutex;
std::mutex StorageMergeTreeFactory::datapart_mutex;
-std::unordered_map<std::string, StorageInMemoryMetadataPtr>
StorageMergeTreeFactory::metadata_map;
-std::mutex StorageMergeTreeFactory::metadata_map_mutex;
-
}
diff --git a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
index eb90ce0f9..82dae3745 100644
--- a/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
+++ b/cpp-ch/local-engine/Storages/StorageMergeTreeFactory.h
@@ -15,13 +15,14 @@
* limitations under the License.
*/
#pragma once
+#include <Poco/LRUCache.h>
+#include <Parser/SerializedPlanParser.h>
#include <Storages/CustomStorageMergeTree.h>
#include <Interpreters/MergeTreeTransaction.h>
namespace local_engine
{
using CustomStorageMergeTreePtr = std::shared_ptr<CustomStorageMergeTree>;
-using StorageInMemoryMetadataPtr =
std::shared_ptr<DB::StorageInMemoryMetadata>;
class StorageMergeTreeFactory
{
@@ -29,27 +30,42 @@ public:
static StorageMergeTreeFactory & instance();
static void freeStorage(StorageID id);
static CustomStorageMergeTreePtr
- getStorage(StorageID id, ColumnsDescription columns,
std::function<CustomStorageMergeTreePtr()> creator);
- static StorageInMemoryMetadataPtr getMetadata(StorageID id,
std::function<StorageInMemoryMetadataPtr()> creator);
- static DataPartsVector getDataParts(StorageID id,
std::unordered_set<String> part_name);
- static void addDataPartToCache(StorageID id, String part_name, DataPartPtr
part);
+ getStorage(StorageID id, const String & snapshot_id, ColumnsDescription
columns, std::function<CustomStorageMergeTreePtr()> creator);
+ static DataPartsVector getDataParts(StorageID id, const String &
snapshot_id, std::unordered_set<String> part_name);
+ static void init_cache_map()
+ {
+ auto & storage_map_v = storage_map;
+ if (!storage_map_v)
+ {
+ storage_map_v = std::make_unique<Poco::LRUCache<std::string,
CustomStorageMergeTreePtr>>(
+
SerializedPlanParser::global_context->getConfigRef().getInt64("table_metadata_cache_max_count",
100));
+ }
+ else
+ {
+ storage_map_v->clear();
+ }
+ auto & datapart_map_v = datapart_map;
+ if (!datapart_map_v)
+ {
+ datapart_map_v = std::make_unique<Poco::LRUCache<std::string,
std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>>(
+
SerializedPlanParser::global_context->getConfigRef().getInt64("table_metadata_cache_max_count",
100));
+ }
+ else
+ {
+ datapart_map_v->clear();
+ }
+ }
static void clear()
{
- storage_columns_map.clear();
- storage_map.clear();
- datapart_map.clear();
- metadata_map.clear();
+ if (storage_map) storage_map->clear();
+ if (datapart_map) datapart_map->clear();
}
private:
- static std::unordered_map<std::string, CustomStorageMergeTreePtr>
storage_map;
- static std::unordered_map<std::string, std::set<std::string>>
storage_columns_map;
- static std::unordered_map<std::string, std::unordered_map<std::string,
DataPartPtr>> datapart_map;
+ static std::unique_ptr<Poco::LRUCache<std::string,
CustomStorageMergeTreePtr>> storage_map;
+ static std::unique_ptr<Poco::LRUCache<std::string,
std::shared_ptr<Poco::LRUCache<std::string, DataPartPtr>>>> datapart_map;
static std::mutex storage_map_mutex;
static std::mutex datapart_mutex;
-
- static std::unordered_map<std::string, StorageInMemoryMetadataPtr>
metadata_map;
- static std::mutex metadata_map_mutex;
};
struct TempStorageFreer
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 10a0283c1..368227163 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1119,7 +1119,7 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
UUID uuid = UUIDHelpers::generateV4(); // each task using its own
CustomStorageMergeTree, don't reuse
auto storage = local_engine::MergeTreeRelParser::parseStorage(
- extension_table,
local_engine::SerializedPlanParser::global_context,uuid);
+ extension_table, local_engine::SerializedPlanParser::global_context,
uuid);
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
@@ -1127,7 +1127,7 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
DB::StorageID table_id(merge_tree_table.database, merge_tree_table.table,
uuid);
local_engine::TempStorageFreer freer {table_id}; // to release temp
CustomStorageMergeTree with RAII
auto storage_factory = local_engine::StorageMergeTreeFactory::instance();
- std::vector<DB::DataPartPtr> selected_parts =
storage_factory.getDataParts(table_id, merge_tree_table.getPartNames());
+ std::vector<DB::DataPartPtr> selected_parts =
storage_factory.getDataParts(table_id, merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
auto future_part = std::make_shared<DB::FutureMergedMutatedPart>();
future_part->uuid = DB::UUIDHelpers::generateV4();
diff --git
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
index 41765ec69..34c017d80 100644
---
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
+++
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableBuilder.java
@@ -27,6 +27,7 @@ public class ExtensionTableBuilder {
Long maxPartsNum,
String database,
String tableName,
+ String snapshotId,
String relativeTablePath,
String absoluteTablePath,
String orderByKey,
@@ -46,6 +47,7 @@ public class ExtensionTableBuilder {
maxPartsNum,
database,
tableName,
+ snapshotId,
relativeTablePath,
absoluteTablePath,
orderByKey,
diff --git
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
index 99bff4017..8da97fa12 100644
---
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
+++
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
@@ -33,6 +33,8 @@ public class ExtensionTableNode implements SplitInfo {
private Long maxPartsNum;
private String database;
private String tableName;
+
+ private String snapshotId;
private String relativePath;
private String absolutePath;
private String tableSchemaJson;
@@ -60,6 +62,7 @@ public class ExtensionTableNode implements SplitInfo {
Long maxPartsNum,
String database,
String tableName,
+ String snapshotId,
String relativePath,
String absolutePath,
String orderByKey,
@@ -78,6 +81,7 @@ public class ExtensionTableNode implements SplitInfo {
this.maxPartsNum = maxPartsNum;
this.database = database;
this.tableName = tableName;
+ this.snapshotId = snapshotId;
URI table_uri = URI.create(relativePath);
if (table_uri.getPath().startsWith("/")) { // file:///tmp/xxx => tmp/xxx
this.relativePath = table_uri.getPath().substring(1);
@@ -113,11 +117,13 @@ public class ExtensionTableNode implements SplitInfo {
}
extensionTableStr
- .append(database)
+ .append(this.database)
+ .append("\n")
+ .append(this.tableName)
.append("\n")
- .append(tableName)
+ .append(this.snapshotId)
.append("\n")
- .append(tableSchemaJson)
+ .append(this.tableSchemaJson)
.append("\n")
.append(this.orderByKey)
.append("\n");
diff --git
a/shims/common/src/main/scala/org/apache/gluten/execution/datasource/GlutenFormatWriterInjects.scala
b/shims/common/src/main/scala/org/apache/gluten/execution/datasource/GlutenFormatWriterInjects.scala
index 6d1859f3d..9786ecaac 100644
---
a/shims/common/src/main/scala/org/apache/gluten/execution/datasource/GlutenFormatWriterInjects.scala
+++
b/shims/common/src/main/scala/org/apache/gluten/execution/datasource/GlutenFormatWriterInjects.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.execution.datasource
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{BlockStripes, FakeRow,
OutputWriter}
@@ -41,6 +40,7 @@ trait GlutenFormatWriterInjects {
path: String,
database: String,
tableName: String,
+ snapshotId: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
minmaxIndexKeyOption: Option[Seq[String]],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]