This is an automated email from the ASF dual-hosted git repository.
mahongbin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 972597184 [Gluten-5018][CH] support minmax/bloomfilter/set skip index
(#5019)
972597184 is described below
commit 972597184e147fcf488fd6cda4b447356d61136d
Author: Hongbin Ma <[email protected]>
AuthorDate: Wed Mar 27 09:33:40 2024 +0800
[Gluten-5018][CH] support minmax/bloomfilter/set skip index (#5019)
* temp, by defualt all cols minmax index
basically works, dealing with nullable
nullable/not-null ok
remove unneceesary change
fix compile
* add ut
* remove dataschema
* fix spark32 bug
---
.../source/DeltaMergeTreeFileFormat.scala | 17 +-
.../source/DeltaMergeTreeFileFormat.scala | 17 +-
.../java/io/glutenproject/metrics/MetricsStep.java | 11 +
.../backendsapi/clickhouse/CHIteratorApi.scala | 3 +
.../backendsapi/clickhouse/CHMetricsApi.scala | 1 +
.../execution/GlutenMergeTreePartition.scala | 3 +
.../metrics/FileSourceScanMetricsUpdater.scala | 2 +
.../delta/ClickhouseOptimisticTransaction.scala | 7 +-
.../sql/delta/catalog/ClickHouseTableV2.scala | 35 ++-
.../utils/MergeTreePartsPartitionsUtil.scala | 33 +++
.../datasources/v1/CHMergeTreeWriterInjects.scala | 29 ++-
.../v1/clickhouse/MergeTreeFileFormatWriter.scala | 9 +
...GlutenClickHouseTPCHNotNullSkipIndexSuite.scala | 271 ++++++++++++++++++++
...lutenClickHouseTPCHNullableSkipIndexSuite.scala | 277 +++++++++++++++++++++
.../apache/spark/affinity/MixedAffinitySuite.scala | 3 +
cpp-ch/local-engine/Common/MergeTreeTool.cpp | 84 ++++++-
cpp-ch/local-engine/Common/MergeTreeTool.h | 3 +
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 18 +-
cpp-ch/local-engine/Parser/RelMetric.cpp | 3 +
cpp-ch/local-engine/Parser/TypeParser.cpp | 4 +-
cpp-ch/local-engine/Parser/TypeParser.h | 50 ++--
.../substrait/rel/ExtensionTableBuilder.java | 6 +
.../substrait/rel/ExtensionTableNode.java | 12 +
.../datasource/GlutenFormatWriterInjects.scala | 4 +-
24 files changed, 843 insertions(+), 59 deletions(-)
diff --git
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index fef109d35..d4ca321a9 100644
---
a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++
b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.v2.clickhouse.source
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter,
OutputWriterFactory}
@@ -31,9 +30,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
protected var database = ""
protected var tableName = ""
- protected var dataSchemas = Seq.empty[Attribute]
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
+ protected var minmaxIndexKeyOption: Option[Seq[String]] = None
+ protected var bfIndexKeyOption: Option[Seq[String]] = None
+ protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
@@ -42,18 +43,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
metadata: Metadata,
database: String,
tableName: String,
- schemas: Seq[Attribute],
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
- this.dataSchemas = schemas
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
+ this.minmaxIndexKeyOption = minmaxIndexKeyOption
+ this.bfIndexKeyOption = bfIndexKeyOption
+ this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
@@ -102,10 +107,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
tableName,
orderByKeyOption,
lowCardKeyOption,
+ minmaxIndexKeyOption,
+ bfIndexKeyOption,
+ setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
- dataSchemas,
clickhouseTableConfigs,
context,
nativeConf
diff --git
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index b87420787..002e636af 100644
---
a/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++
b/backends-clickhouse/src/main/delta-22/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution.datasources.v2.clickhouse.source
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter,
OutputWriterFactory}
@@ -30,9 +29,11 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends
DeltaParquetFileForma
protected var database = ""
protected var tableName = ""
- protected var dataSchemas = Seq.empty[Attribute]
protected var orderByKeyOption: Option[Seq[String]] = None
protected var lowCardKeyOption: Option[Seq[String]] = None
+ protected var minmaxIndexKeyOption: Option[Seq[String]] = None
+ protected var bfIndexKeyOption: Option[Seq[String]] = None
+ protected var setIndexKeyOption: Option[Seq[String]] = None
protected var primaryKeyOption: Option[Seq[String]] = None
protected var partitionColumns: Seq[String] = Seq.empty[String]
protected var clickhouseTableConfigs: Map[String, String] = Map.empty
@@ -41,18 +42,22 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends
DeltaParquetFileForma
metadata: Metadata,
database: String,
tableName: String,
- schemas: Seq[Attribute],
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
clickhouseTableConfigs: Map[String, String],
partitionColumns: Seq[String]) {
this(metadata)
this.database = database
this.tableName = tableName
- this.dataSchemas = schemas
this.orderByKeyOption = orderByKeyOption
this.lowCardKeyOption = lowCardKeyOption
+ this.minmaxIndexKeyOption = minmaxIndexKeyOption
+ this.bfIndexKeyOption = bfIndexKeyOption
+ this.setIndexKeyOption = setIndexKeyOption
this.primaryKeyOption = primaryKeyOption
this.clickhouseTableConfigs = clickhouseTableConfigs
this.partitionColumns = partitionColumns
@@ -101,10 +106,12 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
extends DeltaParquetFileForma
tableName,
orderByKeyOption,
lowCardKeyOption,
+ minmaxIndexKeyOption,
+ bfIndexKeyOption,
+ setIndexKeyOption,
primaryKeyOption,
partitionColumns,
metadata.schema,
- dataSchemas,
clickhouseTableConfigs,
context,
nativeConf
diff --git
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
index c569cd2ee..d1714d825 100644
---
a/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
+++
b/backends-clickhouse/src/main/java/io/glutenproject/metrics/MetricsStep.java
@@ -32,6 +32,9 @@ public class MetricsStep {
@JsonProperty("selected_marks_pk")
protected long selectedMarksPk;
+ @JsonProperty("selected_marks")
+ protected long selectedMarks;
+
public String getName() {
return name;
}
@@ -64,6 +67,14 @@ public class MetricsStep {
this.selectedMarksPk = selectedMarksPk;
}
+ public long getSelectedMarks() {
+ return selectedMarks;
+ }
+
+ public void setSelectedMarks(long selectedMarks) {
+ this.selectedMarks = selectedMarks;
+ }
+
public long getTotalMarksPk() {
return totalMarksPk;
}
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
index 6fecb2c5f..08786a00b 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala
@@ -99,6 +99,9 @@ class CHIteratorApi extends IteratorApi with Logging with
LogLevelUtil {
p.absoluteTablePath,
p.orderByKey,
p.lowCardKey,
+ p.minmaxIndexKey,
+ p.bfIndexKey,
+ p.setIndexKey,
p.primaryKey,
partLists,
starts,
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
index 3012d5371..0157b370f 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala
@@ -127,6 +127,7 @@ class CHMetricsApi extends MetricsApi with Logging with
LogLevelUtil {
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra
operators time"),
"selectedMarksPk" -> SQLMetrics.createMetric(sparkContext, "selected
marks primary"),
+ "selectedMarks" -> SQLMetrics.createMetric(sparkContext, "selected
marks"),
"totalMarksPk" -> SQLMetrics.createMetric(sparkContext, "total marks
primary")
)
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
index df41191f2..be17c713e 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/execution/GlutenMergeTreePartition.scala
@@ -39,6 +39,9 @@ case class GlutenMergeTreePartition(
absoluteTablePath: String,
orderByKey: String,
lowCardKey: String,
+ minmaxIndexKey: String,
+ bfIndexKey: String,
+ setIndexKey: String,
primaryKey: String,
partList: Array[MergeTreePartSplit],
tableSchemaJson: String,
diff --git
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
index 8c79536bd..497e8b780 100644
---
a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
+++
b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala
@@ -36,6 +36,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
val inputWaitTime: SQLMetric = metrics("inputWaitTime")
val outputWaitTime: SQLMetric = metrics("outputWaitTime")
val selected_marks_pk: SQLMetric = metrics("selectedMarksPk")
+ val selected_marks: SQLMetric = metrics("selectedMarks")
val total_marks_pk: SQLMetric = metrics("totalMarksPk")
override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
@@ -56,6 +57,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics:
Map[String, SQLMetric
metricsData.getSteps.forEach(
step => {
selected_marks_pk += step.selectedMarksPk
+ selected_marks += step.selectedMarks
total_marks_pk += step.totalMarksPk
})
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
index 9111bea7f..95119f842 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/ClickhouseOptimisticTransaction.scala
@@ -128,9 +128,11 @@ class ClickhouseOptimisticTransaction(
metadata,
tableV2.dataBaseName,
tableV2.tableName,
- output,
tableV2.orderByKeyOption,
tableV2.lowCardKeyOption,
+ tableV2.minmaxIndexKeyOption,
+ tableV2.bfIndexKeyOption,
+ tableV2.setIndexKeyOption,
tableV2.primaryKeyOption,
tableV2.clickhouseTableConfigs,
tableV2.partitionColumns
@@ -144,6 +146,9 @@ class ClickhouseOptimisticTransaction(
// scalastyle:on deltahadoopconfiguration
orderByKeyOption = tableV2.orderByKeyOption,
lowCardKeyOption = tableV2.lowCardKeyOption,
+ minmaxIndexKeyOption = tableV2.minmaxIndexKeyOption,
+ bfIndexKeyOption = tableV2.bfIndexKeyOption,
+ setIndexKeyOption = tableV2.setIndexKeyOption,
primaryKeyOption = tableV2.primaryKeyOption,
partitionColumns = partitioningColumns,
bucketSpec = tableV2.bucketOption,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index e06a01edf..92d12c05f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -114,18 +114,34 @@ class ClickHouseTableV2(
}
lazy val lowCardKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("lowCardKey")
+ }
+
+ lazy val minmaxIndexKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("minmaxIndexKey")
+ }
+
+ lazy val bfIndexKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("bloomfilterIndexKey")
+ }
+
+ lazy val setIndexKeyOption: Option[Seq[String]] = {
+ getCommaSeparatedColumns("setIndexKey")
+ }
+
+ private def getCommaSeparatedColumns(keyName: String) = {
val tableProperties = properties()
- if (tableProperties.containsKey("lowCardKey")) {
- if (tableProperties.get("lowCardKey").nonEmpty) {
- val lowCardKeys =
tableProperties.get("lowCardKey").split(",").map(_.trim).toSeq
- lowCardKeys.foreach(
+ if (tableProperties.containsKey(keyName)) {
+ if (tableProperties.get(keyName).nonEmpty) {
+ val keys = tableProperties.get(keyName).split(",").map(_.trim).toSeq
+ keys.foreach(
s => {
if (s.contains(".")) {
throw new IllegalStateException(
- s"lowCardKey $s can not contain '.' (not support nested column
yet)")
+ s"$keyName $s can not contain '.' (not support nested column
yet)")
}
})
- Some(lowCardKeys.map(s => s.toLowerCase()))
+ Some(keys.map(s => s.toLowerCase()))
} else {
None
}
@@ -259,12 +275,15 @@ class ClickHouseTableV2(
meta,
dataBaseName,
tableName,
- Seq.empty[Attribute],
orderByKeyOption,
lowCardKeyOption,
+ minmaxIndexKeyOption,
+ bfIndexKeyOption,
+ setIndexKeyOption,
primaryKeyOption,
clickhouseTableConfigs,
- partitionColumns)
+ partitionColumns
+ )
}
def cacheThis(): Unit = {
deltaLog2Table.put(deltaLog, this)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
index a7ac2ce16..7d202b4d1 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
@@ -74,6 +74,21 @@ object MergeTreePartsPartitionsUtil extends Logging {
case None => ""
}
+ val minmaxIndexKey = table.minmaxIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+
+ val bfIndexKey = table.bfIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+
+ val setIndexKey = table.setIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+
val tableSchemaJson = ConverterUtils.convertNamedStructJson(table.schema())
// bucket table
@@ -92,6 +107,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions,
orderByKey,
lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
primaryKey,
table.clickhouseTableConfigs,
sparkSession
@@ -109,6 +127,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions,
orderByKey,
lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
primaryKey,
table.clickhouseTableConfigs,
sparkSession
@@ -129,6 +150,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions: ArrayBuffer[InputPartition],
orderByKey: String,
lowCardKey: String,
+ minmaxIndexKey: String,
+ bfIndexKey: String,
+ setIndexKey: String,
primaryKey: String,
clickhouseTableConfigs: Map[String, String],
sparkSession: SparkSession): Unit = {
@@ -214,6 +238,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
absoluteTablePath,
orderByKey,
lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
primaryKey,
currentFiles.toArray,
tableSchemaJson,
@@ -256,6 +283,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
partitions: ArrayBuffer[InputPartition],
orderByKey: String,
lowCardKey: String,
+ minmaxIndexKey: String,
+ bfIndexKey: String,
+ setIndexKey: String,
primaryKey: String,
clickhouseTableConfigs: Map[String, String],
sparkSession: SparkSession): Unit = {
@@ -318,6 +348,9 @@ object MergeTreePartsPartitionsUtil extends Logging {
absoluteTablePath,
orderByKey,
lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
primaryKey,
currentFiles.toArray,
tableSchemaJson,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index d05945a81..6175f76cb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -68,10 +68,12 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
tableName: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[String],
tableSchema: StructType,
- dataSchema: Seq[Attribute],
clickhouseTableConfigs: Map[String, String],
context: TaskAttemptContext,
nativeConf: JMap[String, String]): OutputWriter = {
@@ -83,11 +85,14 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
tableName,
orderByKeyOption,
lowCardKeyOption,
+ minmaxIndexKeyOption,
+ bfIndexKeyOption,
+ setIndexKeyOption,
primaryKeyOption,
partitionColumns,
ConverterUtils.convertNamedStructJson(tableSchema),
clickhouseTableConfigs,
- dataSchema
+ tableSchema.toAttributes // use table schema instead of data schema
)
val datasourceJniWrapper = new CHDatasourceJniWrapper()
@@ -119,17 +124,22 @@ class CHMergeTreeWriterInjects extends
GlutenFormatWriterInjectsBase {
object CHMergeTreeWriterInjects {
+ // scalastyle:off argcount
def genMergeTreeWriteRel(
path: String,
database: String,
tableName: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[String],
tableSchemaJson: String,
clickhouseTableConfigs: Map[String, String],
output: Seq[Attribute]): PlanWithSplitInfo = {
+ // scalastyle:on argcount
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = output.map {
@@ -150,6 +160,18 @@ object CHMergeTreeWriterInjects {
case Some(keys) => keys.mkString(",")
case None => ""
}
+ val minmaxIndexKey = minmaxIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+ val bfIndexKey = bfIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+ val setIndexKey = setIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
val substraitContext = new SubstraitContext
val extensionTableNode = ExtensionTableBuilder.makeExtensionTable(
@@ -161,6 +183,9 @@ object CHMergeTreeWriterInjects {
"",
orderByKey,
lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
primaryKey,
new JList[String](),
new JList[JLong](),
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
index 874a4ede3..90f01e744 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala
@@ -57,6 +57,9 @@ object MergeTreeFileFormatWriter extends Logging {
hadoopConf: Configuration,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
@@ -71,6 +74,9 @@ object MergeTreeFileFormatWriter extends Logging {
hadoopConf = hadoopConf,
orderByKeyOption = orderByKeyOption,
lowCardKeyOption = lowCardKeyOption,
+ minmaxIndexKeyOption = minmaxIndexKeyOption,
+ bfIndexKeyOption = bfIndexKeyOption,
+ setIndexKeyOption = setIndexKeyOption,
primaryKeyOption = primaryKeyOption,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
@@ -89,6 +95,9 @@ object MergeTreeFileFormatWriter extends Logging {
hadoopConf: Configuration,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala
new file mode 100644
index 000000000..73462780c
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNotNullSkipIndexSuite.scala
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.SparkConf
+
+import java.io.File
+
+class GlutenClickHouseTPCHNotNullSkipIndexSuite extends
GlutenClickHouseTPCHAbstractSuite {
+
+ override protected val tablesPath: String = basePath + "/tpch-data-ch"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager", "sort")
+ .set("spark.io.compression.codec", "SNAPPY")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+// .set("spark.ui.enabled", "true")
+//
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline",
"true")
+//
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"debug")
+ }
+
+ test("test simple minmax index") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_minmax;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax
+ |(
+ | l_orderkey bigint not null,
+ | l_partkey bigint not null,
+ | l_suppkey bigint not null,
+ | l_linenumber bigint not null,
+ | l_quantity double not null,
+ | l_extendedprice double not null,
+ | l_discount double not null,
+ | l_tax double not null,
+ | l_returnflag string not null,
+ | l_linestatus string not null,
+ | l_shipdate date not null,
+ | l_commitdate date not null,
+ | l_receiptdate date not null,
+ | l_shipinstruct string not null,
+ | l_shipmode string not null,
+ | l_comment string not null
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_minmax'
+ |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_minmax
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ |select count(*) from lineitem_mergetree_minmax where
l_receiptdate = '1998-12-27'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 1)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_minmax")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+ assert(
+ partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+ }
+
+ test("test simple bloom filter index") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_bf;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ CREATE TABLE IF NOT EXISTS lineitem_mergetree_bf
+ |(
+ | l_orderkey bigint not null,
+ | l_partkey bigint not null,
+ | l_suppkey bigint not null,
+ | l_linenumber bigint not null,
+ | l_quantity double not null,
+ | l_extendedprice double not null,
+ | l_discount double not null,
+ | l_tax double not null,
+ | l_returnflag string not null,
+ | l_linestatus string not null,
+ | l_shipdate date not null,
+ | l_commitdate date not null,
+ | l_receiptdate date not null,
+ | l_shipinstruct string not null,
+ | l_shipmode string not null,
+ | l_comment string not null
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_bf'
+ |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_bf
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ select count(*) from lineitem_mergetree_bf where l_orderkey =
'600000'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 2)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_bf")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+ assert(
+ partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx")))
+ }
+
+ test("test simple set index") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_set;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ CREATE TABLE IF NOT EXISTS lineitem_mergetree_set
+ |(
+ | l_orderkey bigint not null,
+ | l_partkey bigint not null,
+ | l_suppkey bigint not null,
+ | l_linenumber bigint not null,
+ | l_quantity double not null,
+ | l_extendedprice double not null,
+ | l_discount double not null,
+ | l_tax double not null,
+ | l_returnflag string not null,
+ | l_linestatus string not null,
+ | l_shipdate date not null,
+ | l_commitdate date not null,
+ | l_receiptdate date not null,
+ | l_shipinstruct string not null,
+ | l_shipmode string not null,
+ | l_comment string not null
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_set'
+ |TBLPROPERTIES('setIndexKey'='l_orderkey')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_set
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ select count(*) from lineitem_mergetree_set where l_orderkey =
'600000'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 2)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_set")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+ assert(partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__set_l_orderkey.idx")))
+ }
+
+ test("test not null dataset inserted into nullable schema") {
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_minmax2;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax2
+ |(
+ l_orderkey bigint ,
+ | l_partkey bigint ,
+ | l_suppkey bigint ,
+ | l_linenumber bigint ,
+ | l_quantity double ,
+ | l_extendedprice double ,
+ | l_discount double ,
+ | l_tax double ,
+ | l_returnflag string ,
+ | l_linestatus string ,
+ | l_shipdate date ,
+ | l_commitdate date ,
+ | l_receiptdate date ,
+ | l_shipinstruct string ,
+ | l_shipmode string ,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_minmax2'
+ |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_minmax2
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ |select count(*) from lineitem_mergetree_minmax2 where
l_receiptdate = '1998-12-27'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 1)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_minmax2")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(partDir.listFiles().exists(p => p.getName.contains("null")))
+ assert(
+ partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala
new file mode 100644
index 000000000..27fa79018
--- /dev/null
+++
b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHNullableSkipIndexSuite.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.glutenproject.execution
+
+import org.apache.spark.SparkConf
+
+import java.io.File
+
+class GlutenClickHouseTPCHNullableSkipIndexSuite extends
GlutenClickHouseTPCHAbstractSuite {
+
+ override protected val createNullableTables = true
+
+ override protected val tablesPath: String = basePath + "/tpch-data-ch"
+ override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
+ override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager", "sort")
+ .set("spark.io.compression.codec", "SNAPPY")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.autoBroadcastJoinThreshold", "10MB")
+// .set("spark.ui.enabled", "true")
+//
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.dump_pipeline",
"true")
+//
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.logger.level",
"debug")
+ }
+
+ test("test simple minmax index") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_minmax;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_minmax'
+ |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_minmax
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ |select count(*) from lineitem_mergetree_minmax where
l_receiptdate = '1998-12-27'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 1)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_minmax")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(
+ partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+ }
+
+ test("test simple bloom filter index") {
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_bf;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_bf
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_bf'
+ |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_bf
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ |select count(*) from lineitem_mergetree_bf where l_orderkey =
'600000'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 2)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_bf")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(
+ partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx")))
+
+ }
+
+ test("test simple set index") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_set;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ CREATE TABLE IF NOT EXISTS lineitem_mergetree_set
+ |(
+ | l_orderkey bigint ,
+ | l_partkey bigint ,
+ | l_suppkey bigint ,
+ | l_linenumber bigint ,
+ | l_quantity double ,
+ | l_extendedprice double ,
+ | l_discount double ,
+ | l_tax double ,
+ | l_returnflag string ,
+ | l_linestatus string ,
+ | l_shipdate date ,
+ | l_commitdate date ,
+ | l_receiptdate date ,
+ | l_shipinstruct string ,
+ | l_shipmode string ,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_set'
+ |TBLPROPERTIES('setIndexKey'='l_orderkey')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_set
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ |select count(*) from lineitem_mergetree_set where l_orderkey =
'600000'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 2)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_set")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__set_l_orderkey.idx")))
+ }
+
+ test("test nullable dataset inserted into not null schema") {
+
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_minmax2;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_minmax2
+ |(
+ | l_orderkey bigint not null,
+ | l_partkey bigint not null,
+ | l_suppkey bigint not null,
+ | l_linenumber bigint not null,
+ | l_quantity double not null,
+ | l_extendedprice double not null,
+ | l_discount double not null,
+ | l_tax double not null,
+ | l_returnflag string not null,
+ | l_linestatus string not null,
+ | l_shipdate date not null,
+ | l_commitdate date not null,
+ | l_receiptdate date not null,
+ | l_shipinstruct string not null,
+ | l_shipmode string not null,
+ | l_comment string not null
+ |)
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_minmax2'
+ |TBLPROPERTIES('minmaxIndexKey'='l_receiptdate')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_minmax2
+ | select * from lineitem
+ |""".stripMargin)
+
+ val df = spark
+ .sql(s"""
+ |select count(*) from lineitem_mergetree_minmax2 where
l_receiptdate = '1998-12-27'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 1)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_minmax2")
+ // find a folder whose name is like
48b70783-b3b8-4bf8-9c52-5261aead8e3e_0_006
+ val partDir = directory.listFiles().filter(f => f.getName.length > 20).head
+ assert(!partDir.listFiles().exists(p => p.getName.contains("null")))
+ assert(
+ partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__minmax_l_receiptdate.idx2")))
+ }
+
+ // TODO:
+ // 1. auto check visited granule (effectiveness of index)
+ // 2. set index is implemented, but not encouraged because we by default
does not cap set size
+ // 3. need to test minmax/bf/set index on every type (bloom filter on
date32, e.g. is not working)
+ // 4. complex case where a column has many types of indexes / a type of
index on many columns
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
index e6910a430..53fcf3cbc 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/affinity/MixedAffinitySuite.scala
@@ -59,6 +59,9 @@ class MixedAffinitySuite extends QueryTest with
SharedSparkSession {
"",
"",
"",
+ "",
+ "",
+ "",
Array(file),
"",
Map.empty)
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index c5122905e..39198b87e 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -24,12 +24,77 @@
#include <google/protobuf/util/json_util.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/document.h>
+#include <Poco/StringTokenizer.h>
using namespace DB;
namespace local_engine
{
-std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(const
DB::NamesAndTypesList &columns, ContextPtr context, const MergeTreeTable &table)
+
+// set skip index for each column if specified
+void setSecondaryIndex(
+ const DB::NamesAndTypesList & columns,
+ ContextPtr context,
+ const MergeTreeTable & table,
+ std::shared_ptr<DB::StorageInMemoryMetadata> metadata)
+{
+ std::unordered_set<std::string> minmax_index_cols;
+ std::unordered_set<std::string> bf_index_cols;
+ std::unordered_set<std::string> set_index_cols;
+ {
+ Poco::StringTokenizer tokenizer(table.minmax_index_key, ",");
+ for (const auto & token : tokenizer)
+ minmax_index_cols.insert(token);
+ }
+ {
+ Poco::StringTokenizer tokenizer(table.bf_index_key, ",");
+ for (const auto & token : tokenizer)
+ bf_index_cols.insert(token);
+ }
+ {
+ Poco::StringTokenizer tokenizer(table.set_index_key, ",");
+ for (const auto & token : tokenizer)
+ set_index_cols.insert(token);
+ }
+
+ std::stringstream ss;
+ bool first = true;
+ for (const auto & column : columns)
+ {
+ if (minmax_index_cols.contains(column.name))
+ {
+ if (!first)
+ ss << ", ";
+ else
+ first = false;
+ ss << "_minmax_" << column.name << " " << column.name << " TYPE
minmax GRANULARITY 1";
+ }
+
+ if (bf_index_cols.contains(column.name))
+ {
+ if (!first)
+ ss << ", ";
+ else
+ first = false;
+ ss << "_bloomfilter_" << column.name << " " << column.name << "
TYPE bloom_filter GRANULARITY 1";
+ }
+
+ if (set_index_cols.contains(column.name))
+ {
+ if (!first)
+ ss << ", ";
+ else
+ first = false;
+ ss << "_set_" << column.name << " " << column.name << " TYPE
set(0) GRANULARITY 1";
+ }
+ }
+ metadata->setSecondaryIndices(IndicesDescription::parse(ss.str(),
metadata->getColumns(), context));
+}
+
+std::shared_ptr<DB::StorageInMemoryMetadata> buildMetaData(
+ const DB::NamesAndTypesList & columns,
+ ContextPtr context,
+ const MergeTreeTable & table)
{
std::shared_ptr<DB::StorageInMemoryMetadata> metadata =
std::make_shared<DB::StorageInMemoryMetadata>();
ColumnsDescription columns_description;
@@ -38,6 +103,9 @@ std::shared_ptr<DB::StorageInMemoryMetadata>
buildMetaData(const DB::NamesAndTyp
columns_description.add(ColumnDescription(item.name, item.type));
}
metadata->setColumns(std::move(columns_description));
+
+ setSecondaryIndex(columns, context, table, metadata);
+
metadata->partition_key.expression_list_ast =
std::make_shared<ASTExpressionList>();
metadata->sorting_key = KeyDescription::parse(table.order_by_key,
metadata->getColumns(), context);
if (table.primary_key.empty())
@@ -85,7 +153,6 @@ void parseTableConfig(MergeTreeTableSettings & settings,
String config_json)
MergeTreeTable parseMergeTreeTableString(const std::string & info)
{
-
ReadBufferFromString in(info);
assertString("MergeTree;", in);
MergeTreeTable table;
@@ -106,6 +173,12 @@ MergeTreeTable parseMergeTreeTableString(const std::string
& info)
}
readString(table.low_card_key, in);
assertChar('\n', in);
+ readString(table.minmax_index_key, in);
+ assertChar('\n', in);
+ readString(table.bf_index_key, in);
+ assertChar('\n', in);
+ readString(table.set_index_key, in);
+ assertChar('\n', in);
readString(table.relative_path, in);
assertChar('\n', in);
readString(table.absolute_path, in);
@@ -139,13 +212,13 @@ std::unordered_set<String> MergeTreeTable::getPartNames()
const
RangesInDataParts MergeTreeTable::extractRange(DataPartsVector parts_vector)
const
{
std::unordered_map<String, DataPartPtr> name_index;
- std::ranges::for_each(parts_vector, [&](const DataPartPtr & part)
{name_index.emplace(part->name, part);});
+ std::ranges::for_each(parts_vector, [&](const DataPartPtr & part) {
name_index.emplace(part->name, part); });
RangesInDataParts ranges_in_data_parts;
std::ranges::transform(
parts,
std::inserter(ranges_in_data_parts, ranges_in_data_parts.end()),
- [&](const MergeTreePart& part)
+ [&](const MergeTreePart & part)
{
RangesInDataPart ranges_in_data_part;
ranges_in_data_part.data_part = name_index.at(part.name);
@@ -155,5 +228,4 @@ RangesInDataParts
MergeTreeTable::extractRange(DataPartsVector parts_vector) con
});
return ranges_in_data_parts;
}
-
-}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.h
b/cpp-ch/local-engine/Common/MergeTreeTool.h
index bde632f0d..d8ca9d34e 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.h
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.h
@@ -56,6 +56,9 @@ struct MergeTreeTable
substrait::NamedStruct schema;
std::string order_by_key;
std::string low_card_key;
+ std::string minmax_index_key;
+ std::string bf_index_key;
+ std::string set_index_key;
std::string primary_key = "";
std::string relative_path;
std::string absolute_path;
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 82a64b999..f1381a1f7 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -38,7 +38,6 @@ extern const int NO_SUCH_DATA_PART;
extern const int LOGICAL_ERROR;
extern const int UNKNOWN_FUNCTION;
extern const int UNKNOWN_TYPE;
-
}
}
@@ -69,7 +68,10 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
table.ParseFromString(extension_table.detail().value());
auto merge_tree_table =
local_engine::parseMergeTreeTableString(table.value());
DB::Block header;
- header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema,
merge_tree_table.low_card_key);
+
+ header = TypeParser::buildBlockFromNamedStruct(
+ merge_tree_table.schema,
+ merge_tree_table.low_card_key);
auto names_and_types_list = header.getNamesAndTypesList();
auto storage_factory = StorageMergeTreeFactory::instance();
auto metadata = buildMetaData(names_and_types_list, context,
merge_tree_table);
@@ -161,7 +163,8 @@ MergeTreeRelParser::parseReadRel(
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
auto read_step =
query_context.custom_storage_merge_tree->reader.readFromParts(
selected_parts,
- /* alter_conversions = */ {},
+ /* alter_conversions = */
+ {},
names_and_types_list.getNames(),
query_context.storage_snapshot,
*query_info,
@@ -176,6 +179,7 @@ MergeTreeRelParser::parseReadRel(
source_step_with_filter->addFilter(storage_prewhere_info->prewhere_actions,
storage_prewhere_info->prewhere_column_name);
source_step_with_filter->applyFilters();
}
+
query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree
*>(read_step.get()), ranges);
steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));
@@ -266,7 +270,10 @@ void MergeTreeRelParser::parseToAction(ActionsDAGPtr &
filter_action, const subs
}
void MergeTreeRelParser::analyzeExpressions(
- Conditions & res, const substrait::Expression & rel, std::set<Int64> &
pk_positions, Block & block)
+ Conditions & res,
+ const substrait::Expression & rel,
+ std::set<Int64> & pk_positions,
+ Block & block)
{
if (rel.has_scalar_function() && getCHFunctionName(rel.scalar_function())
== "and")
{
@@ -378,5 +385,4 @@ String MergeTreeRelParser::getCHFunctionName(const
substrait::Expression_ScalarF
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unsupported substrait
function on mergetree prewhere parser: {}", func_name);
return it->second;
}
-
-}
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Parser/RelMetric.cpp
b/cpp-ch/local-engine/Parser/RelMetric.cpp
index 2449fa4e5..eec31213a 100644
--- a/cpp-ch/local-engine/Parser/RelMetric.cpp
+++ b/cpp-ch/local-engine/Parser/RelMetric.cpp
@@ -120,9 +120,12 @@ void RelMetric::serialize(Writer<StringBuffer> & writer,
bool) const
if (auto read_mergetree =
dynamic_cast<DB::ReadFromMergeTree*>(step))
{
auto selected_marks_pk =
read_mergetree->getAnalysisResult().selected_marks_pk;
+ auto selected_marks =
read_mergetree->getAnalysisResult().selected_marks;
auto total_marks_pk =
read_mergetree->getAnalysisResult().total_marks_pk;
writer.Key("selected_marks_pk");
writer.Uint64(selected_marks_pk);
+ writer.Key("selected_marks");
+ writer.Uint64(selected_marks);
writer.Key("total_marks_pk");
writer.Uint64(total_marks_pk);
}
diff --git a/cpp-ch/local-engine/Parser/TypeParser.cpp
b/cpp-ch/local-engine/Parser/TypeParser.cpp
index 3ffa0b0d9..12c23e606 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.cpp
+++ b/cpp-ch/local-engine/Parser/TypeParser.cpp
@@ -238,7 +238,9 @@ DB::DataTypePtr TypeParser::parseType(const substrait::Type
& substrait_type, st
}
-DB::Block TypeParser::buildBlockFromNamedStruct(const substrait::NamedStruct &
struct_, const std::string & low_card_cols)
+DB::Block TypeParser::buildBlockFromNamedStruct(
+ const substrait::NamedStruct & struct_,
+ const std::string & low_card_cols)
{
std::unordered_set<std::string> low_card_columns;
Poco::StringTokenizer tokenizer(low_card_cols, ",");
diff --git a/cpp-ch/local-engine/Parser/TypeParser.h
b/cpp-ch/local-engine/Parser/TypeParser.h
index 55420ee1a..c687c3024 100644
--- a/cpp-ch/local-engine/Parser/TypeParser.h
+++ b/cpp-ch/local-engine/Parser/TypeParser.h
@@ -24,36 +24,38 @@
namespace local_engine
{
-class TypeParser
-{
-public:
- TypeParser() = default;
- ~TypeParser() = default;
+ class TypeParser
+ {
+ public:
+ TypeParser() = default;
+ ~TypeParser() = default;
- static String getCHTypeName(const String & spark_type_name);
+ static String getCHTypeName(const String& spark_type_name);
- static DB::DataTypePtr getCHTypeByName(const String & spark_type_name);
+ static DB::DataTypePtr getCHTypeByName(const String& spark_type_name);
- /// When parsing named structure, we need the field names.
- static DB::DataTypePtr parseType(const substrait::Type & substrait_type,
std::list<String> * field_names);
+ /// When parsing named structure, we need the field names.
+ static DB::DataTypePtr parseType(const substrait::Type&
substrait_type, std::list<String>* field_names);
- inline static DB::DataTypePtr parseType(const substrait::Type &
substrait_type)
- {
- return parseType(substrait_type, nullptr);
- }
+ inline static DB::DataTypePtr parseType(const substrait::Type&
substrait_type)
+ {
+ return parseType(substrait_type, nullptr);
+ }
+
+ // low_card_cols is in format of "cola,colb". Currently does not
nested column to be LowCardinality.
+ static DB::Block buildBlockFromNamedStruct(const
substrait::NamedStruct& struct_,
+ const std::string&
low_card_cols = "");
- // low_card_cols is in format of "cola,colb". Currently does not nested
column to be LowCardinality.
- static DB::Block buildBlockFromNamedStruct(const substrait::NamedStruct &
struct_, const std::string& low_card_cols = "");
+ /// Build block from substrait NamedStruct without DFS rules,
different from buildBlockFromNamedStruct
+ static DB::Block buildBlockFromNamedStructWithoutDFS(const
substrait::NamedStruct& struct_);
- /// Build block from substrait NamedStruct without DFS rules, different
from buildBlockFromNamedStruct
- static DB::Block buildBlockFromNamedStructWithoutDFS(const
substrait::NamedStruct & struct_);
+ static bool isTypeMatched(const substrait::Type& substrait_type, const
DB::DataTypePtr& ch_type);
+ static bool isTypeMatchedWithNullability(const substrait::Type&
substrait_type, const DB::DataTypePtr& ch_type);
- static bool isTypeMatched(const substrait::Type & substrait_type, const
DB::DataTypePtr & ch_type);
- static bool isTypeMatchedWithNullability(const substrait::Type &
substrait_type, const DB::DataTypePtr & ch_type);
-private:
- /// Mapping spark type names to CH type names.
- static std::unordered_map<String, String> type_names_mapping;
+ private:
+ /// Mapping spark type names to CH type names.
+ static std::unordered_map<String, String> type_names_mapping;
- static DB::DataTypePtr tryWrapNullable(substrait::Type_Nullability
nullable, DB::DataTypePtr nested_type);
-};
+ static DB::DataTypePtr tryWrapNullable(substrait::Type_Nullability
nullable, DB::DataTypePtr nested_type);
+ };
}
diff --git
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
index 9dc26215b..086d475b0 100644
---
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
+++
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableBuilder.java
@@ -31,6 +31,9 @@ public class ExtensionTableBuilder {
String absoluteTablePath,
String orderByKey,
String lowCardKey,
+ String minmaxIndexKey,
+ String bfIndexKey,
+ String setIndexKey,
String primaryKey,
List<String> partList,
List<Long> starts,
@@ -47,6 +50,9 @@ public class ExtensionTableBuilder {
absoluteTablePath,
orderByKey,
lowCardKey,
+ minmaxIndexKey,
+ bfIndexKey,
+ setIndexKey,
primaryKey,
partList,
starts,
diff --git
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
index f07e6fccb..583ca9549 100644
---
a/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
+++
b/gluten-core/src/main/java/io/glutenproject/substrait/rel/ExtensionTableNode.java
@@ -45,6 +45,9 @@ public class ExtensionTableNode implements SplitInfo {
private String primaryKey;
private String lowCardKey;
+ private String minmaxIndexKey;
+ private String bfIndexKey;
+ private String setIndexKey;
private List<String> partList;
private List<Long> starts;
@@ -61,6 +64,9 @@ public class ExtensionTableNode implements SplitInfo {
String absolutePath,
String orderByKey,
String lowCardKey,
+ String minmaxIndexKey,
+ String bfIndexKey,
+ String setIndexKey,
String primaryKey,
List<String> partList,
List<Long> starts,
@@ -82,6 +88,9 @@ public class ExtensionTableNode implements SplitInfo {
this.tableSchemaJson = tableSchemaJson;
this.orderByKey = orderByKey;
this.lowCardKey = lowCardKey;
+ this.minmaxIndexKey = minmaxIndexKey;
+ this.bfIndexKey = bfIndexKey;
+ this.setIndexKey = setIndexKey;
this.primaryKey = primaryKey;
this.partList = partList;
this.starts = starts;
@@ -117,6 +126,9 @@ public class ExtensionTableNode implements SplitInfo {
extensionTableStr.append(this.primaryKey).append("\n");
}
extensionTableStr.append(this.lowCardKey).append("\n");
+ extensionTableStr.append(this.minmaxIndexKey).append("\n");
+ extensionTableStr.append(this.bfIndexKey).append("\n");
+ extensionTableStr.append(this.setIndexKey).append("\n");
extensionTableStr.append(this.relativePath).append("\n");
extensionTableStr.append(this.absolutePath).append("\n");
diff --git
a/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
b/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
index 856974a0e..3dd2bfaba 100644
---
a/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
+++
b/shims/common/src/main/scala/io/glutenproject/execution/datasource/GlutenFormatWriterInjects.scala
@@ -43,10 +43,12 @@ trait GlutenFormatWriterInjects {
tableName: String,
orderByKeyOption: Option[Seq[String]],
lowCardKeyOption: Option[Seq[String]],
+ minmaxIndexKeyOption: Option[Seq[String]],
+ bfIndexKeyOption: Option[Seq[String]],
+ setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
partitionColumns: Seq[String],
tableSchema: StructType,
- dataSchema: Seq[Attribute],
clickhouseTableConfigs: Map[String, String],
context: TaskAttemptContext,
nativeConf: java.util.Map[String, String]): OutputWriter = null
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]