This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7c066a221 [GLUTEN-5225][CH] Add mergetree index filter on driver
(#5308)
7c066a221 is described below
commit 7c066a221c776a44fc6e80ed2623413f5ed7eb1e
Author: Shuai li <[email protected]>
AuthorDate: Mon Apr 22 14:11:45 2024 +0800
[GLUTEN-5225][CH] Add mergetree index filter on driver (#5308)
<h2 dir="auto" style="box-sizing: border-box; margin-top: 0px !important;
margin-bottom: 16px; font-size: 1.5em; font-weight:
var(--base-text-weight-semibold, 600); line-height: 1.25; padding-bottom:
0.3em; border-bottom: 1px solid var(--borderColor-muted,
var(--color-border-muted)); color: rgb(31, 35, 40); font-family: -apple-system,
BlinkMacSystemFont, "Segoe UI", "Noto Sans", Helvetica,
Arial, sans-serif, "Apple Color Emoji", "Segoe UI Emoji"
[...]
name | 改动前 | 改动后 | DIFF | DIFF
-- | -- | -- | -- | --
q01 | 2062 | 2050 | -12 | 1.01
q02 | 449 | 437 | -12 | 1.03
q03 | 2010 | 1993 | -17 | 1.01
q04 | 1867 | 1816 | -51 | 1.03
q05 | 3092 | 3085 | -7 | 1.00
q06 | 179 | 172 | -7 | 1.04
q07 | 2168 | 2115 | -53 | 1.03
q08 | 2698 | 2635 | -63 | 1.02
q09 | 6183 | 6162 | -21 | 1.00
q10 | 2002 | 2026 | 24 | 0.99
q11 | 600 | 604 | 4 | 0.99
q12 | 1049 | 1021 | -28 | 1.03
q13 | 3141 | 3185 | 44 | 0.99
q14 | 357 | 341 | -16 | 1.05
q15 | 756 | 744 | -12 | 1.02
q16 | 1652 | 1622 | -30 | 1.02
q17 | 1214 | 1217 | 3 | 1.00
q18 | 2699 | 2610 | -89 | 1.03
q19 | 2362 | 2360 | -2 | 1.00
q20 | 1233 | 1194 | -39 | 1.03
q21 | | | |
q22 | 935 | 939 | 4 | 1.00
| 38708 | 38328 | -380 | 1.01
What changes were proposed in this pull request?
By changing the parameter
spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index
, you can control whether to filter the primary index on the driver side.
Why do this?
When the mergetree part is too large, dividing tasks by continuous ranges
may cause some tasks to be empty. If filtered on the driver side and then
split, it can be guaranteed that each task has an actual task.
(Fixes: https://github.com/apache/incubator-gluten/issues/5225)
How was this patch tested?
Test BY ui
<style> td {white-space:nowrap;border:1px solid
#dee0e3;font-size:10pt;font-style:normal;font-weight:normal;vertical-align:middle;word-break:normal;word-wrap:normal;}</style>
name 改动前 改动后 DIFF DIFF
q01 2062 2050 -12 1.01
q02 449 437 -12 1.03
q03 2010 1993 -17 1.01
q04 1867 1816 -51 1.03
q05 3092 3085 -7 1.00
q06 179 172 -7 1.04
q07 2168 2115 -53 1.03
q08 2698 2635 -63 1.02
q09 6183 6162 -21 1.00
q10 2002 2026 24 0.99
q11 600 604 4 0.99
q12 1049 1021 -28 1.03
q13 3141 3185 44 0.99
q14 357 341 -16 1.05
q15 756 744 -12 1.02
q16 1652 1622 -30 1.02
q17 1214 1217 3 1.00
q18 2699 2610 -89 1.03
q19 2362 2360 -2 1.00
q20 1233 1194 -39 1.03
q21
q22 935 939 4 1.00
38708 38328 -380 1.01
---
.../datasources/CHDatasourceJniWrapper.java | 2 +
.../MergeTreePartFilterReturnedRange.java | 42 ++-
.../backendsapi/clickhouse/CHTransformerApi.scala | 8 +-
.../execution/GlutenMergeTreePartition.scala | 13 +
.../sql/delta/catalog/ClickHouseTableV2.scala | 36 +-
.../utils/MergeTreePartsPartitionsUtil.scala | 366 +++++++++++++++------
.../GlutenClickHouseMergeTreeWriteSuite.scala | 268 +++++++++++++++
.../backendsapi/velox/TransformerApiImpl.scala | 5 +-
cpp-ch/local-engine/Common/MergeTreeTool.cpp | 1 +
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 137 +++++---
cpp-ch/local-engine/Parser/MergeTreeRelParser.h | 37 +--
.../local-engine/Parser/SerializedPlanParser.cpp | 40 +--
cpp-ch/local-engine/Parser/SerializedPlanParser.h | 23 +-
.../Storages/CustomStorageMergeTree.cpp | 49 +++
.../local-engine/Storages/CustomStorageMergeTree.h | 1 +
.../Storages/Mergetree/MetaDataHelper.h | 3 +-
cpp-ch/local-engine/local_engine_jni.cpp | 47 ++-
.../gluten/substrait/rel/ExtensionTableNode.java | 4 +
.../apache/gluten/backendsapi/TransformerApi.scala | 5 +-
.../execution/FileSourceScanExecTransformer.scala | 3 +-
20 files changed, 855 insertions(+), 235 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
index 0241624eb..9ca301efb 100644
---
a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
+++
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
@@ -37,6 +37,8 @@ public class CHDatasourceJniWrapper {
String partition_dir,
String bucket_dir);
+ public static native String filterRangesOnDriver(byte[] plan, byte[] read);
+
public native void write(long instanceId, long blockAddress);
public native void writeToMergeTree(long instanceId, long blockAddress);
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/MergeTreePartFilterReturnedRange.java
similarity index 54%
copy from cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
copy to
backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/MergeTreePartFilterReturnedRange.java
index 59d7af4e3..357e34e06 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
+++
b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/MergeTreePartFilterReturnedRange.java
@@ -14,21 +14,41 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.spark.sql.execution.datasources.clickhouse;
-#pragma once
+import com.fasterxml.jackson.annotation.JsonProperty;
-#include <Common/MergeTreeTool.h>
-#include <Storages/StorageMergeTreeFactory.h>
+public class MergeTreePartFilterReturnedRange {
+ @JsonProperty("part_name")
+ protected String partName;
-namespace local_engine
-{
+ @JsonProperty("begin")
+ protected long begin;
-void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable
& mergeTreeTable, const Context & context);
+ @JsonProperty("end")
+ protected long end;
-void saveFileStatus(
- const DB::MergeTreeData & storage,
- const DB::ContextPtr& context,
- IDataPartStorage & data_part_storage);
+ public String getPartName() {
+ return partName;
+ }
-}
+ public void setPartName(String partName) {
+ this.partName = partName;
+ }
+
+ public long getBegin() {
+ return begin;
+ }
+
+ public void setBegin(long begin) {
+ this.begin = begin;
+ }
+ public long getEnd() {
+ return end;
+ }
+
+ public void setEnd(long end) {
+ this.end = end;
+ }
+}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index 65e6165e5..df1ca9c68 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -23,7 +23,7 @@ import
org.apache.gluten.substrait.expression.{BooleanLiteralNode, ExpressionBui
import org.apache.gluten.utils.{CHInputPartitionsUtil, ExpressionDocUtil}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
@@ -49,7 +49,8 @@ class CHTransformerApi extends TransformerApi with Logging {
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
- disableBucketedScan: Boolean): Seq[InputPartition] = {
+ disableBucketedScan: Boolean,
+ filterExprs: Seq[Expression]): Seq[InputPartition] = {
relation.location match {
case index: TahoeFileIndex
if relation.fileFormat
@@ -64,7 +65,8 @@ class CHTransformerApi extends TransformerApi with Logging {
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
- disableBucketedScan
+ disableBucketedScan,
+ filterExprs
)
case _: TahoeFileIndex =>
throw new UnsupportedOperationException("Does not support
delta-parquet")
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
index 7532a29f8..d4c915dc1 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/GlutenMergeTreePartition.scala
@@ -18,6 +18,19 @@ package org.apache.gluten.execution
import org.apache.gluten.substrait.plan.PlanBuilder
+case class MergeTreePartRange(
+ name: String,
+ dirName: String,
+ targetNode: String,
+ bucketNum: String,
+ start: Long,
+ marks: Long,
+ size: Long) {
+ override def toString: String = {
+ s"part name: $name, range: $start-${start + marks}"
+ }
+}
+
case class MergeTreePartSplit(
name: String,
dirName: String,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index 3b825513f..fcef796cb 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -236,6 +236,36 @@ class ClickHouseTableV2(
}
cacheThis()
+
+ def primaryKey(): String = primaryKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+
+ def orderByKey(): String = orderByKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => "tuple()"
+ }
+
+ def lowCardKey(): String = lowCardKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+
+ def minmaxIndexKey(): String = minmaxIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+
+ def bfIndexKey(): String = bfIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
+
+ def setIndexKey(): String = setIndexKeyOption match {
+ case Some(keys) => keys.mkString(",")
+ case None => ""
+ }
}
class TempClickHouseTableV2(
@@ -278,7 +308,8 @@ object ClickHouseTableV2 extends Logging {
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
- disableBucketedScan: Boolean): Seq[InputPartition] = {
+ disableBucketedScan: Boolean,
+ filterExprs: Seq[Expression]): Seq[InputPartition] = {
val tableV2 = ClickHouseTableV2.getTable(deltaLog)
MergeTreePartsPartitionsUtil.getMergeTreePartsPartitions(
@@ -290,7 +321,8 @@ object ClickHouseTableV2 extends Logging {
tableV2,
optionalBucketSet,
optionalNumCoalescedBuckets,
- disableBucketedScan)
+ disableBucketedScan,
+ filterExprs)
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
index 2b37ae787..80257c3b5 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreePartsPartitionsUtil.scala
@@ -16,22 +16,38 @@
*/
package org.apache.spark.sql.execution.datasources.utils
-import org.apache.gluten.execution.{GlutenMergeTreePartition,
MergeTreePartSplit}
-import org.apache.gluten.expression.ConverterUtils
+import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
+import org.apache.gluten.execution.{GlutenMergeTreePartition,
MergeTreePartRange, MergeTreePartSplit}
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.substrait.`type`.ColumnTypeNode
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{ExtensionTableBuilder, RelBuilder}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.BucketSpec
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
+import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper,
HadoopFsRelation, PartitionDirectory}
+import
org.apache.spark.sql.execution.datasources.clickhouse.MergeTreePartFilterReturnedRange
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import
org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
+import org.apache.spark.sql.types.BooleanType
import org.apache.spark.util.collection.BitSet
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.google.protobuf.{Any, StringValue}
+import io.substrait.proto.Plan
+
+import java.lang.{Long => JLong}
+import java.util.{ArrayList => JArrayList}
+
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
// scalastyle:off argcount
@@ -46,7 +62,8 @@ object MergeTreePartsPartitionsUtil extends Logging {
table: ClickHouseTableV2,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
- disableBucketedScan: Boolean): Seq[InputPartition] = {
+ disableBucketedScan: Boolean,
+ filterExprs: Seq[Expression]): Seq[InputPartition] = {
if (
!relation.location.isInstanceOf[TahoeFileIndex] || !relation.fileFormat
.isInstanceOf[DeltaMergeTreeFileFormat]
@@ -68,30 +85,6 @@ object MergeTreePartsPartitionsUtil extends Logging {
val engine = "MergeTree"
val relativeTablePath =
fileIndex.deltaLog.dataPath.toUri.getPath.substring(1)
val absoluteTablePath = fileIndex.deltaLog.dataPath.toUri.toString
-
- val (orderByKey, primaryKey) =
- MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(table.orderByKeyOption,
table.primaryKeyOption)
-
- val lowCardKey = table.lowCardKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
-
- val minmaxIndexKey = table.minmaxIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
-
- val bfIndexKey = table.bfIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
-
- val setIndexKey = table.setIndexKeyOption match {
- case Some(keys) => keys.mkString(",")
- case None => ""
- }
-
val tableSchemaJson = ConverterUtils.convertNamedStructJson(table.schema())
// bucket table
@@ -109,13 +102,10 @@ object MergeTreePartsPartitionsUtil extends Logging {
selectedPartitions,
tableSchemaJson,
partitions,
- orderByKey,
- lowCardKey,
- minmaxIndexKey,
- bfIndexKey,
- setIndexKey,
- primaryKey,
+ table,
table.clickhouseTableConfigs,
+ output,
+ filterExprs,
sparkSession
)
} else {
@@ -130,13 +120,10 @@ object MergeTreePartsPartitionsUtil extends Logging {
selectedPartitions,
tableSchemaJson,
partitions,
- orderByKey,
- lowCardKey,
- minmaxIndexKey,
- bfIndexKey,
- setIndexKey,
- primaryKey,
+ table,
table.clickhouseTableConfigs,
+ output,
+ filterExprs,
sparkSession
)
}
@@ -154,15 +141,23 @@ object MergeTreePartsPartitionsUtil extends Logging {
selectedPartitions: Array[PartitionDirectory],
tableSchemaJson: String,
partitions: ArrayBuffer[InputPartition],
- orderByKey: String,
- lowCardKey: String,
- minmaxIndexKey: String,
- bfIndexKey: String,
- setIndexKey: String,
- primaryKey: String,
+ table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
+ output: Seq[Attribute],
+ filterExprs: Seq[Expression],
sparkSession: SparkSession): Unit = {
+ val bucketingEnabled = sparkSession.sessionState.conf.bucketingEnabled
+ val shouldProcess: String => Boolean = optionalBucketSet match {
+ case Some(bucketSet) if bucketingEnabled =>
+ name =>
+ // find bucket it in name pattern of:
+ // "partition_col=1/00001/373c9386-92a4-44ef-baaf-a67e1530b602_0_006"
+
name.split("/").dropRight(1).filterNot(_.contains("=")).map(_.toInt).forall(bucketSet.get)
+ case _ =>
+ _ => true
+ }
+
val selectPartsFiles = selectedPartitions
.flatMap(
partition =>
@@ -173,7 +168,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
val ret = ClickhouseSnapshot.pathToAddMTPCache.getIfPresent(path)
if (ret == null) {
val keys =
ClickhouseSnapshot.pathToAddMTPCache.asMap().keySet()
- val keySample = keys.isEmpty() match {
+ val keySample = keys.isEmpty match {
case true => "<empty>"
case false => keys.iterator().next()
}
@@ -186,46 +181,54 @@ object MergeTreePartsPartitionsUtil extends Logging {
}
ret
}))
+ .filter(part => shouldProcess(part.name))
.toSeq
if (selectPartsFiles.isEmpty) {
return
}
- val maxSplitBytes = getMaxSplitBytes(sparkSession, selectPartsFiles)
- val total_marks = selectPartsFiles.map(p => p.marks).sum
- val total_Bytes = selectPartsFiles.map(p => p.size).sum
- val markCntPerPartition = maxSplitBytes * total_marks / total_Bytes + 1
+ val selectRanges: Seq[MergeTreePartRange] =
+ getMergeTreePartRange(
+ selectPartsFiles,
+ snapshotId,
+ database,
+ tableName,
+ relativeTablePath,
+ absoluteTablePath,
+ tableSchemaJson,
+ table,
+ clickhouseTableConfigs,
+ filterExprs,
+ output,
+ sparkSession
+ )
- val bucketingEnabled = sparkSession.sessionState.conf.bucketingEnabled
- val shouldProcess: String => Boolean = optionalBucketSet match {
- case Some(bucketSet) if bucketingEnabled =>
- name =>
- // find bucket it in name pattern of:
- // "partition_col=1/00001/373c9386-92a4-44ef-baaf-a67e1530b602_0_006"
-
name.split("/").dropRight(1).filterNot(_.contains("=")).map(_.toInt).forall(bucketSet.get)
- case _ =>
- _ => true
+ if (selectRanges.isEmpty) {
+ return
}
+ val maxSplitBytes = getMaxSplitBytes(sparkSession, selectRanges)
+ val total_marks = selectRanges.map(p => p.marks).sum
+ val total_Bytes = selectRanges.map(p => p.size).sum
+ // maxSplitBytes / (total_Bytes / total_marks) + 1
+ val markCntPerPartition = maxSplitBytes * total_marks / total_Bytes + 1
+
logInfo(s"Planning scan with bin packing, max mark: $markCntPerPartition")
- val splitFiles = selectPartsFiles
+ val splitFiles = selectRanges
.flatMap {
part =>
- if (shouldProcess(part.name)) {
- (0L until part.marks by markCntPerPartition).map {
- offset =>
- val remaining = part.marks - offset
- val size = if (remaining > markCntPerPartition)
markCntPerPartition else remaining
- MergeTreePartSplit(
- part.name,
- part.dirName,
- part.targetNode,
- offset,
- size,
- size * part.size / part.marks)
- }
- } else {
- None
+ val end = part.marks + part.start
+ (part.start until end by markCntPerPartition).map {
+ offset =>
+ val remaining = end - offset
+ val size = if (remaining > markCntPerPartition)
markCntPerPartition else remaining
+ MergeTreePartSplit(
+ part.name,
+ part.dirName,
+ part.targetNode,
+ offset,
+ size,
+ size * part.size / part.marks)
}
}
@@ -243,12 +246,12 @@ object MergeTreePartsPartitionsUtil extends Logging {
snapshotId,
relativeTablePath,
absoluteTablePath,
- orderByKey,
- lowCardKey,
- minmaxIndexKey,
- bfIndexKey,
- setIndexKey,
- primaryKey,
+ table.orderByKey(),
+ table.lowCardKey(),
+ table.minmaxIndexKey(),
+ table.bfIndexKey(),
+ table.setIndexKey(),
+ table.primaryKey(),
currentFiles.toArray,
tableSchemaJson,
clickhouseTableConfigs
@@ -289,13 +292,10 @@ object MergeTreePartsPartitionsUtil extends Logging {
selectedPartitions: Array[PartitionDirectory],
tableSchemaJson: String,
partitions: ArrayBuffer[InputPartition],
- orderByKey: String,
- lowCardKey: String,
- minmaxIndexKey: String,
- bfIndexKey: String,
- setIndexKey: String,
- primaryKey: String,
+ table: ClickHouseTableV2,
clickhouseTableConfigs: Map[String, String],
+ output: Seq[Attribute],
+ filterExprs: Seq[Expression],
sparkSession: SparkSession): Unit = {
val selectPartsFiles = selectedPartitions
@@ -325,7 +325,27 @@ object MergeTreePartsPartitionsUtil extends Logging {
return
}
- val bucketGroupParts = selectPartsFiles.groupBy(p =>
Integer.parseInt(p.bucketNum))
+ val selectRanges: Seq[MergeTreePartRange] =
+ getMergeTreePartRange(
+ selectPartsFiles,
+ snapshotId,
+ database,
+ tableName,
+ relativeTablePath,
+ absoluteTablePath,
+ tableSchemaJson,
+ table,
+ clickhouseTableConfigs,
+ filterExprs,
+ output,
+ sparkSession
+ )
+
+ if (selectRanges.isEmpty) {
+ return
+ }
+
+ val bucketGroupParts = selectRanges.groupBy(p =>
Integer.parseInt(p.bucketNum))
val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
val bucketSet = optionalBucketSet.get
@@ -340,12 +360,18 @@ object MergeTreePartsPartitionsUtil extends Logging {
}
Seq.tabulate(bucketSpec.numBuckets) {
bucketId =>
- val currBucketParts: Seq[AddMergeTreeParts] =
+ val currBucketParts: Seq[MergeTreePartRange] =
prunedFilesGroupedToBuckets.getOrElse(bucketId, Seq.empty)
if (!currBucketParts.isEmpty) {
val currentFiles = currBucketParts.map {
part =>
- MergeTreePartSplit(part.name, part.dirName, part.targetNode, 0,
part.marks, part.size)
+ MergeTreePartSplit(
+ part.name,
+ part.dirName,
+ part.targetNode,
+ part.start,
+ part.marks,
+ part.size)
}
val newPartition = GlutenMergeTreePartition(
partitions.size,
@@ -355,12 +381,12 @@ object MergeTreePartsPartitionsUtil extends Logging {
snapshotId,
relativeTablePath,
absoluteTablePath,
- orderByKey,
- lowCardKey,
- minmaxIndexKey,
- bfIndexKey,
- setIndexKey,
- primaryKey,
+ table.orderByKey(),
+ table.lowCardKey(),
+ table.minmaxIndexKey(),
+ table.bfIndexKey(),
+ table.setIndexKey(),
+ table.primaryKey(),
currentFiles.toArray,
tableSchemaJson,
clickhouseTableConfigs
@@ -370,12 +396,152 @@ object MergeTreePartsPartitionsUtil extends Logging {
}
}
- def getMaxSplitBytes(sparkSession: SparkSession, selectedParts:
Seq[AddMergeTreeParts]): Long = {
+ def getMergeTreePartRange(
+ selectPartsFiles: Seq[AddMergeTreeParts],
+ snapshotId: String,
+ database: String,
+ tableName: String,
+ relativeTablePath: String,
+ absoluteTablePath: String,
+ tableSchemaJson: String,
+ table: ClickHouseTableV2,
+ clickhouseTableConfigs: Map[String, String],
+ filterExprs: Seq[Expression],
+ output: Seq[Attribute],
+ sparkSession: SparkSession): Seq[MergeTreePartRange] = {
+ val enableDriverFilter =
s"${CHBackendSettings.getBackendConfigPrefix}.runtime_settings" +
+ s".enabled_driver_filter_mergetree_index"
+
+ if (
+ filterExprs.nonEmpty && sparkSession.sessionState.conf.getConfString(
+ enableDriverFilter,
+ "false") == "true"
+ ) {
+ val size_per_mark = selectPartsFiles.map(part => (part.size,
part.marks)).unzip match {
+ case (l1, l2) => l1.sum / l2.sum
+ }
+
+ val partLists = new JArrayList[String]()
+ val starts = new JArrayList[JLong]()
+ val lengths = new JArrayList[JLong]()
+ selectPartsFiles.foreach(
+ part => {
+ partLists.add(part.name)
+ starts.add(0)
+ lengths.add(part.marks)
+ })
+
+ val extensionTableNode = ExtensionTableBuilder
+ .makeExtensionTable(
+ -1L,
+ -1L,
+ database,
+ tableName,
+ snapshotId,
+ relativeTablePath,
+ absoluteTablePath,
+ table.orderByKey(),
+ table.lowCardKey(),
+ table.minmaxIndexKey(),
+ table.bfIndexKey(),
+ table.setIndexKey(),
+ table.primaryKey(),
+ partLists,
+ starts,
+ lengths,
+ tableSchemaJson,
+ clickhouseTableConfigs.asJava,
+ new JArrayList[String]()
+ )
+
+ val transformer = filterExprs
+ .map {
+ case ar: AttributeReference if ar.dataType == BooleanType =>
+ EqualNullSafe(ar, Literal.TrueLiteral)
+ case e => e
+ }
+ .reduceLeftOption(And)
+ .map(ExpressionConverter.replaceWithExpressionTransformer(_, output))
+
+ val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
+ val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
+ val columnTypeNodes = output.map {
+ attr =>
+ if (table.partitionColumns.exists(_.equals(attr.name))) {
+ new ColumnTypeNode(1)
+ } else {
+ new ColumnTypeNode(0)
+ }
+ }.asJava
+ val substraitContext = new SubstraitContext
+ val enhancement =
+
Any.pack(StringValue.newBuilder.setValue(extensionTableNode.getExtensionTableStr).build)
+ val extensionNode = ExtensionBuilder.makeAdvancedExtension(enhancement)
+ val readNode = RelBuilder.makeReadRel(
+ typeNodes,
+ nameList,
+ columnTypeNodes,
+
transformer.map(_.doTransform(substraitContext.registeredFunction)).orNull,
+ extensionNode,
+ substraitContext,
+ substraitContext.nextOperatorId("readRel")
+ )
+
+ val planBuilder = Plan.newBuilder
+ substraitContext.registeredFunction.forEach(
+ (k, v) =>
planBuilder.addExtensions(ExtensionBuilder.makeFunctionMapping(k,
v).toProtobuf))
+
+ val filter_ranges = CHDatasourceJniWrapper.filterRangesOnDriver(
+ planBuilder.build().toByteArray,
+ readNode.toProtobuf.toByteArray
+ )
+
+ val mapper: ObjectMapper = new ObjectMapper()
+ val values: JArrayList[MergeTreePartFilterReturnedRange] =
+ mapper.readValue(
+ filter_ranges,
+ new TypeReference[JArrayList[MergeTreePartFilterReturnedRange]]() {})
+
+ val partMap = selectPartsFiles.map(part => (part.name, part)).toMap
+ values.asScala
+ .map(
+ range => {
+ val part = partMap.get(range.getPartName).orNull
+ val marks = range.getEnd - range.getBegin
+ MergeTreePartRange(
+ part.name,
+ part.dirName,
+ part.targetNode,
+ part.bucketNum,
+ range.getBegin,
+ marks,
+ marks * size_per_mark)
+ })
+ .toSeq
+ } else {
+ selectPartsFiles
+ .map(
+ part =>
+ MergeTreePartRange(
+ part.name,
+ part.dirName,
+ part.targetNode,
+ part.bucketNum,
+ 0,
+ part.marks,
+ part.size))
+ .toSeq
+ }
+ }
+
+ def getMaxSplitBytes(
+ sparkSession: SparkSession,
+ selectedRanges: Seq[MergeTreePartRange]): Long = {
val defaultMaxSplitBytes =
sparkSession.sessionState.conf.filesMaxPartitionBytes
val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
val minPartitionNum = sparkSession.sessionState.conf.filesMinPartitionNum
.getOrElse(sparkSession.leafNodeDefaultParallelism)
- val totalBytes = selectedParts.map(_.size + openCostInBytes).sum
+ val totalBytes = selectedRanges.map(_.size + openCostInBytes).sum
val bytesPerCore = totalBytes / minPartitionNum
Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
index 72f38b99d..6f10035fa 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -1393,6 +1393,92 @@ class GlutenClickHouseMergeTreeWriteSuite
}
}
+ test("test mergetree with order keys filter") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey3;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_orderbykey3
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (orderByKey='l_shipdate')
+ |LOCATION '$basePath/lineitem_mergetree_orderbykey3'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_orderbykey3
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr = s"""
+ |SELECT
+ | sum(l_extendedprice * l_discount) AS revenue
+ |FROM
+ | lineitem_mergetree_orderbykey3
+ |WHERE
+ | l_shipdate >= date'1994-01-01'
+ | AND l_shipdate < date'1994-01-01' + interval 1 year
+ | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+ | AND l_quantity < 24
+ |""".stripMargin
+ runTPCHQueryBySQL(6, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec(0)
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val fileIndex =
mergetreeScan.relation.location.asInstanceOf[TahoeFileIndex]
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).clickhouseTableConfigs.nonEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).bucketOption.isEmpty)
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .orderByKeyOption
+ .get
+ .mkString(",")
+ .equals("l_shipdate"))
+ assert(
+ ClickHouseTableV2
+ .getTable(fileIndex.deltaLog)
+ .primaryKeyOption
+ .isEmpty)
+
assert(ClickHouseTableV2.getTable(fileIndex.deltaLog).partitionColumns.isEmpty)
+ val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f =>
f.asInstanceOf[AddMergeTreeParts])
+
+ assert(addFiles.size == 6)
+ assert(addFiles.map(_.rows).sum == 600572)
+
+ val plans = collect(df.queryExecution.executedPlan) {
+ case scanExec: BasicScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans(0).metrics("selectedMarksPk").value === 17)
+ assert(plans(0).metrics("totalMarksPk").value === 74)
+ }
+ }
+
test(
"GLUTEN-5061: Fix assert error when writing mergetree data with select *
from table limit n") {
spark.sql(s"""
@@ -1731,5 +1817,187 @@ class GlutenClickHouseMergeTreeWriteSuite
dataFileList = dataPath.list(fileFilter)
assert(dataFileList.size == 6)
}
+
+ test("test mergetree with primary keys filter pruning by driver") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_pk_pruning_by_driver;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS
lineitem_mergetree_pk_pruning_by_driver
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |TBLPROPERTIES (orderByKey='l_shipdate')
+ |LOCATION '$basePath/lineitem_mergetree_pk_pruning_by_driver'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table lineitem_mergetree_pk_pruning_by_driver
+ | select * from lineitem
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | sum(l_extendedprice * l_discount) AS revenue
+ |FROM
+ | lineitem_mergetree_pk_pruning_by_driver
+ |WHERE
+ | l_shipdate >= date'1994-01-01'
+ | AND l_shipdate < date'1994-01-01' + interval 1 year
+ | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01
+ | AND l_quantity < 24
+ |""".stripMargin
+
+ Seq(("true", 2), ("false", 3)).foreach(
+ conf => {
+ withSQLConf(
+
("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
-> conf._1)) {
+ runTPCHQueryBySQL(6, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+
+ val mergetreeScan = scanExec(0)
+ assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))
+
+ val plans = collect(df.queryExecution.executedPlan) {
+ case scanExec: BasicScanExecTransformer => scanExec
+ }
+ assert(plans.size == 1)
+ assert(plans(0).getSplitInfos.size == conf._2)
+ }
+ }
+ })
+ }
+
+ test("test mergetree with primary keys filter pruning by driver with
bucket") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS
lineitem_mergetree_pk_pruning_by_driver_bucket;
+ |""".stripMargin)
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS
orders_mergetree_pk_pruning_by_driver_bucket;
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |CREATE TABLE IF NOT EXISTS
lineitem_mergetree_pk_pruning_by_driver_bucket
+ |(
+ | l_orderkey bigint,
+ | l_partkey bigint,
+ | l_suppkey bigint,
+ | l_linenumber bigint,
+ | l_quantity double,
+ | l_extendedprice double,
+ | l_discount double,
+ | l_tax double,
+ | l_returnflag string,
+ | l_linestatus string,
+ | l_shipdate date,
+ | l_commitdate date,
+ | l_receiptdate date,
+ | l_shipinstruct string,
+ | l_shipmode string,
+ | l_comment string
+ |)
+ |USING clickhouse
+ |CLUSTERED by (l_orderkey)
+ |${if (sparkVersion.equals("3.2")) "" else "SORTED BY
(l_receiptdate)"} INTO 2 BUCKETS
+ |LOCATION '$basePath/lineitem_mergetree_pk_pruning_by_driver_bucket'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS
orders_mergetree_pk_pruning_by_driver_bucket (
+ | o_orderkey bigint,
+ | o_custkey bigint,
+ | o_orderstatus string,
+ | o_totalprice double,
+ | o_orderdate date,
+ | o_orderpriority string,
+ | o_clerk string,
+ | o_shippriority bigint,
+ | o_comment string)
+ |USING clickhouse
+ |CLUSTERED by (o_orderkey)
+ |${if (sparkVersion.equals("3.2")) "" else "SORTED BY
(o_orderdate)"} INTO 2 BUCKETS
+ |LOCATION
'$basePath/orders_mergetree_pk_pruning_by_driver_bucket'
+ |""".stripMargin)
+
+ spark.sql(s"""
+ | insert into table
lineitem_mergetree_pk_pruning_by_driver_bucket
+ | select * from lineitem
+ |""".stripMargin)
+ spark.sql(s"""
+ | insert into table
orders_mergetree_pk_pruning_by_driver_bucket
+ | select * from orders
+ |""".stripMargin)
+
+ val sqlStr =
+ s"""
+ |SELECT
+ | l_shipmode,
+ | sum(
+ | CASE WHEN o_orderpriority = '1-URGENT'
+ | OR o_orderpriority = '2-HIGH' THEN
+ | 1
+ | ELSE
+ | 0
+ | END) AS high_line_count,
+ | sum(
+ | CASE WHEN o_orderpriority <> '1-URGENT'
+ | AND o_orderpriority <> '2-HIGH' THEN
+ | 1
+ | ELSE
+ | 0
+ | END) AS low_line_count
+ |FROM
+ | orders_mergetree_pk_pruning_by_driver_bucket,
+ | lineitem_mergetree_pk_pruning_by_driver_bucket
+ |WHERE
+ | o_orderkey = l_orderkey
+ | AND l_shipmode IN ('MAIL', 'SHIP')
+ | AND l_commitdate < l_receiptdate
+ | AND l_shipdate < l_commitdate
+ | AND l_receiptdate >= date'1994-01-01' AND l_receiptdate <
date'1994-01-01' + interval 1 year
+ |GROUP BY
+ | l_shipmode
+ |ORDER BY
+ | l_shipmode;
+ |""".stripMargin
+
+ Seq(("true", 2), ("false", 2)).foreach(
+ conf => {
+ withSQLConf(
+
("spark.gluten.sql.columnar.backend.ch.runtime_settings.enabled_driver_filter_mergetree_index"
-> conf._1)) {
+ runTPCHQueryBySQL(12, sqlStr) {
+ df =>
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: BasicScanExecTransformer => f
+ }
+ assert(scanExec.size == 2)
+ assert(scanExec(1).getSplitInfos.size == conf._2)
+ }
+ }
+ })
+ }
}
// scalastyle:off line.size.limit
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/TransformerApiImpl.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/TransformerApiImpl.scala
index fa9103f6f..954b07eb0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/TransformerApiImpl.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/TransformerApiImpl.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.utils.InputPartitionsUtil
import org.apache.gluten.vectorized.PlanEvaluatorJniWrapper
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
import org.apache.spark.sql.types._
@@ -44,7 +44,8 @@ class TransformerApiImpl extends TransformerApi with Logging {
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
- disableBucketedScan: Boolean): Seq[InputPartition] = {
+ disableBucketedScan: Boolean,
+ filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition] = {
InputPartitionsUtil(
relation,
selectedPartitions,
diff --git a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
index 3b517f30b..f430c9306 100644
--- a/cpp-ch/local-engine/Common/MergeTreeTool.cpp
+++ b/cpp-ch/local-engine/Common/MergeTreeTool.cpp
@@ -226,6 +226,7 @@ RangesInDataParts
MergeTreeTable::extractRange(DataPartsVector parts_vector) con
ranges_in_data_part.data_part = name_index.at(part.name);
ranges_in_data_part.part_index_in_query = 0;
ranges_in_data_part.ranges.emplace_back(MarkRange(part.begin,
part.end));
+ ranges_in_data_part.alter_conversions =
std::make_shared<AlterConversions>();
return ranges_in_data_part;
});
return ranges_in_data_parts;
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index 57ad97fae..c89632919 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -20,19 +20,14 @@
#include <Parser/FunctionParser.h>
#include <Parser/TypeParser.h>
-#include <Storages/StorageMergeTreeFactory.h>
-#include <Common/CHUtil.h>
-#include <Common/MergeTreeTool.h>
+#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
+#include <Storages/Mergetree/MergeSparkMergeTreeTask.h>
#include <Storages/Mergetree/MetaDataHelper.h>
+#include <Poco/StringTokenizer.h>
+#include <Common/CHUtil.h>
#include "MergeTreeRelParser.h"
-#include <Poco/StringTokenizer.h>
-#include <Storages/MergeTree/FutureMergedMutatedPart.h>
-#include <Storages/MergeTree/MergeMutateSelectedEntry.h>
-#include <Storages/MergeTree/MergeTreeDataMergerMutator.h>
-#include "Storages/Mergetree/MergeSparkMergeTreeTask.h"
-
namespace DB
{
@@ -64,18 +59,16 @@ static Int64 findMinPosition(const NameSet &
condition_table_columns, const Name
return min_position;
}
-CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
- const substrait::ReadRel::ExtensionTable & extension_table,
- ContextMutablePtr context, UUID uuid)
+MergeTreeTable MergeTreeRelParser::parseMergeTreeTable(const
substrait::ReadRel::ExtensionTable & extension_table)
{
google::protobuf::StringValue table;
table.ParseFromString(extension_table.detail().value());
- auto merge_tree_table =
local_engine::parseMergeTreeTableString(table.value());
- DB::Block header;
+ return parseMergeTreeTableString(table.value());
+}
- header = TypeParser::buildBlockFromNamedStruct(
- merge_tree_table.schema,
- merge_tree_table.low_card_key);
+CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(const
MergeTreeTable & merge_tree_table, ContextMutablePtr context, UUID uuid)
+{
+ DB::Block header =
TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema,
merge_tree_table.low_card_key);
auto names_and_types_list = header.getNamesAndTypesList();
auto storage_factory = StorageMergeTreeFactory::instance();
auto metadata = buildMetaData(names_and_types_list, context,
merge_tree_table);
@@ -126,18 +119,18 @@ CustomStorageMergeTreePtr
MergeTreeRelParser::parseStorage(
return storage;
}
-DB::QueryPlanPtr
-MergeTreeRelParser::parseReadRel(
- DB::QueryPlanPtr query_plan,
- const substrait::ReadRel & rel,
- const substrait::ReadRel::ExtensionTable & extension_table,
- std::list<const substrait::Rel *> & /*rel_stack_*/)
+CustomStorageMergeTreePtr
+MergeTreeRelParser::parseStorage(const substrait::ReadRel::ExtensionTable &
extension_table, ContextMutablePtr context, UUID uuid)
{
- google::protobuf::StringValue table;
- table.ParseFromString(extension_table.detail().value());
- auto merge_tree_table =
local_engine::parseMergeTreeTableString(table.value());
- DB::Block header;
- header = TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema,
merge_tree_table.low_card_key);
+ auto merge_tree_table = parseMergeTreeTable(extension_table);
+ return parseStorage(merge_tree_table, context, uuid);
+}
+
+DB::QueryPlanPtr MergeTreeRelParser::parseReadRel(
+ DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const
substrait::ReadRel::ExtensionTable & extension_table)
+{
+ auto merge_tree_table = parseMergeTreeTable(extension_table);
+ DB::Block header =
TypeParser::buildBlockFromNamedStruct(merge_tree_table.schema,
merge_tree_table.low_card_key);
DB::Block input;
if (rel.has_base_schema() && rel.base_schema().names_size())
{
@@ -152,7 +145,6 @@ MergeTreeRelParser::parseReadRel(
}
auto storage_factory = StorageMergeTreeFactory::instance();
auto metadata = buildMetaData(header.getNamesAndTypesList(), context,
merge_tree_table);
- query_context.metadata = metadata;
StorageID table_id(merge_tree_table.database, merge_tree_table.table);
auto storage = storage_factory.getStorage(
table_id,
@@ -175,8 +167,7 @@ MergeTreeRelParser::parseReadRel(
restoreMetaData(storage, merge_tree_table, *context);
for (const auto & [name, sizes] : storage->getColumnSizes())
column_sizes[name] = sizes.data_compressed;
- query_context.storage_snapshot =
std::make_shared<StorageSnapshot>(*storage, metadata);
- query_context.custom_storage_merge_tree = storage;
+ auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage,
metadata);
auto names_and_types_list = input.getNamesAndTypesList();
auto query_info = buildQueryInfo(names_and_types_list);
@@ -190,15 +181,14 @@ MergeTreeRelParser::parseReadRel(
}
std::vector<DataPartPtr> selected_parts =
storage_factory.getDataParts(table_id, merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
- auto ranges = merge_tree_table.extractRange(selected_parts);
if (selected_parts.empty())
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
- auto read_step =
query_context.custom_storage_merge_tree->reader.readFromParts(
+ auto read_step = storage->reader.readFromParts(
selected_parts,
/* alter_conversions = */
{},
names_and_types_list.getNames(),
- query_context.storage_snapshot,
+ storage_snapshot,
*query_info,
context,
context->getSettingsRef().max_block_size,
@@ -212,7 +202,13 @@ MergeTreeRelParser::parseReadRel(
source_step_with_filter->applyFilters();
}
-
query_context.custom_storage_merge_tree->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree
*>(read_step.get()), ranges);
+ auto ranges = merge_tree_table.extractRange(selected_parts);
+ std::string ret;
+ if
(context->getSettings().tryGetString("enabled_driver_filter_mergetree_index",
ret) && ret == "'true'")
+ storage->analysisPartsByRanges(*reinterpret_cast<ReadFromMergeTree
*>(read_step.get()), ranges);
+ else
+ storage->wrapRangesInDataParts(*reinterpret_cast<ReadFromMergeTree
*>(read_step.get()), ranges);
+
steps.emplace_back(read_step.get());
query_plan->addStep(std::move(read_step));
if (!non_nullable_columns.empty())
@@ -302,10 +298,7 @@ void MergeTreeRelParser::parseToAction(ActionsDAGPtr &
filter_action, const subs
}
void MergeTreeRelParser::analyzeExpressions(
- Conditions & res,
- const substrait::Expression & rel,
- std::set<Int64> & pk_positions,
- Block & block)
+ Conditions & res, const substrait::Expression & rel, std::set<Int64> &
pk_positions, Block & block)
{
if (rel.has_scalar_function() && getCHFunctionName(rel.scalar_function())
== "and")
{
@@ -417,4 +410,70 @@ String MergeTreeRelParser::getCHFunctionName(const
substrait::Expression_ScalarF
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "Unsupported substrait
function on mergetree prewhere parser: {}", func_name);
return it->second;
}
+
+
+String MergeTreeRelParser::filterRangesOnDriver(const substrait::ReadRel &
read_rel)
+{
+ google::protobuf::StringValue table;
+ table.ParseFromString(read_rel.advanced_extension().enhancement().value());
+ auto merge_tree_table = parseMergeTreeTableString(table.value());
+ auto custom_storage_mergetree = parseStorage(merge_tree_table,
global_context);
+
+ auto input = TypeParser::buildBlockFromNamedStruct(read_rel.base_schema());
+ auto names_and_types_list = input.getNamesAndTypesList();
+ auto query_info = buildQueryInfo(names_and_types_list);
+
+ query_info->prewhere_info = parsePreWhereInfo(read_rel.filter(), input);
+
+ auto storage_factory = StorageMergeTreeFactory::instance();
+ std::vector<DataPartPtr> selected_parts
+ = storage_factory.getDataParts(StorageID(merge_tree_table.database,
merge_tree_table.table), merge_tree_table.snapshot_id,
merge_tree_table.getPartNames());
+
+ auto storage_snapshot =
std::make_shared<StorageSnapshot>(*custom_storage_mergetree,
custom_storage_mergetree->getInMemoryMetadataPtr());
+ if (selected_parts.empty())
+ throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "no data part found.");
+ auto read_step = custom_storage_mergetree->reader.readFromParts(
+ selected_parts,
+ /* alter_conversions = */
+ {},
+ names_and_types_list.getNames(),
+ storage_snapshot,
+ *query_info,
+ context,
+ context->getSettingsRef().max_block_size,
+ 10); // TODO: Expect use driver cores.
+
+ auto * read_from_mergetree = static_cast<ReadFromMergeTree
*>(read_step.get());
+ if (const auto & storage_prewhere_info = query_info->prewhere_info)
+ {
+ ActionDAGNodes filter_nodes;
+ filter_nodes.nodes.emplace_back(
+
&storage_prewhere_info->prewhere_actions->findInOutputs(storage_prewhere_info->prewhere_column_name));
+ read_from_mergetree->applyFilters(std::move(filter_nodes));
+ }
+
+ auto analysis = read_from_mergetree->getAnalysisResult();
+ rapidjson::StringBuffer result;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(result);
+ writer.StartArray();
+ for (auto & parts_with_range : analysis.parts_with_ranges)
+ {
+ MarkRanges final_ranges;
+ for (auto & range : parts_with_range.ranges)
+ {
+ writer.StartObject();
+ writer.Key("part_name");
+ writer.String(parts_with_range.data_part->name.c_str());
+ writer.Key("begin");
+ writer.Uint(range.begin);
+ writer.Key("end");
+ writer.Uint(range.end);
+ writer.EndObject();
+ }
+ }
+
+ writer.EndArray();
+ return result.GetString();
+}
+
}
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
index 5d878d3cc..7619851d9 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.h
@@ -21,6 +21,9 @@
#include <Parser/RelParser.h>
#include <Parser/SerializedPlanParser.h>
+#include <Storages/StorageMergeTreeFactory.h>
+#include <Common/MergeTreeTool.h>
+
namespace DB
{
@@ -37,38 +40,35 @@ using namespace DB;
class MergeTreeRelParser : public RelParser
{
public:
- static std::shared_ptr<CustomStorageMergeTree> parseStorage(
- const substrait::ReadRel::ExtensionTable & extension_table,
- ContextMutablePtr context,
- UUID uuid = UUIDHelpers::Nil
- );
-
- explicit MergeTreeRelParser(
- SerializedPlanParser * plan_paser_, ContextPtr & context_,
QueryContext & query_context_, ContextMutablePtr & global_context_)
- : RelParser(plan_paser_), context(context_),
query_context(query_context_), global_context(global_context_)
+ static CustomStorageMergeTreePtr
+ parseStorage(const substrait::ReadRel::ExtensionTable & extension_table,
ContextMutablePtr context, UUID uuid = UUIDHelpers::Nil);
+ static CustomStorageMergeTreePtr
+ parseStorage(const MergeTreeTable & merge_tree_table, ContextMutablePtr
context, UUID uuid = UUIDHelpers::Nil);
+
+ static MergeTreeTable parseMergeTreeTable(const
substrait::ReadRel::ExtensionTable & extension_table);
+
+ explicit MergeTreeRelParser(SerializedPlanParser * plan_paser_, const
ContextPtr & context_)
+ : RelParser(plan_paser_), context(context_),
global_context(plan_paser_->global_context)
{
}
~MergeTreeRelParser() override = default;
- DB::QueryPlanPtr
- parse(DB::QueryPlanPtr query_plan, const substrait::Rel & rel,
std::list<const substrait::Rel *> & rel_stack_) override
+ DB::QueryPlanPtr parse(DB::QueryPlanPtr query_plan, const substrait::Rel &
rel, std::list<const substrait::Rel *> & rel_stack_) override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser can't
call parse(), call parseReadRel instead.");
}
- DB::QueryPlanPtr
- parseReadRel(
- DB::QueryPlanPtr query_plan,
- const substrait::ReadRel & read_rel,
- const substrait::ReadRel::ExtensionTable & extension_table,
- std::list<const substrait::Rel *> & rel_stack_);
+ DB::QueryPlanPtr parseReadRel(
+ DB::QueryPlanPtr query_plan, const substrait::ReadRel & read_rel,
const substrait::ReadRel::ExtensionTable & extension_table);
const substrait::Rel & getSingleInput(const substrait::Rel &) override
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeRelParser can't
call getSingleInput().");
}
+ String filterRangesOnDriver(const substrait::ReadRel & read_rel);
+
struct Condition
{
explicit Condition(const substrait::Expression & node_) : node(node_)
{ }
@@ -98,8 +98,7 @@ private:
void collectColumns(const substrait::Expression & rel, NameSet & columns,
Block & block);
UInt64 getColumnsSize(const NameSet & columns);
- ContextPtr & context;
- QueryContext & query_context;
+ const ContextPtr & context;
ContextMutablePtr & global_context;
};
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index 8e0284c90..4c7f4c775 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -335,36 +335,6 @@ IQueryPlanStep *
SerializedPlanParser::addRemoveNullableStep(QueryPlan & plan, c
return step_ptr;
}
-PrewhereInfoPtr SerializedPlanParser::parsePreWhereInfo(const
substrait::Expression & rel, Block & input)
-{
- auto prewhere_info = std::make_shared<PrewhereInfo>();
- prewhere_info->prewhere_actions =
std::make_shared<ActionsDAG>(input.getNamesAndTypesList());
- std::string filter_name;
- // for in function
- if (rel.has_singular_or_list())
- {
- const auto * in_node =
parseExpression(prewhere_info->prewhere_actions, rel);
- prewhere_info->prewhere_actions->addOrReplaceInOutputs(*in_node);
- filter_name = in_node->result_name;
- }
- else
- {
- parseFunctionWithDAG(rel, filter_name,
prewhere_info->prewhere_actions, true);
- }
- prewhere_info->prewhere_column_name = filter_name;
- prewhere_info->need_filter = true;
- prewhere_info->remove_prewhere_column = true;
- auto cols = prewhere_info->prewhere_actions->getRequiredColumnsNames();
- // Keep it the same as the input.
- prewhere_info->prewhere_actions->removeUnusedActions(Names{filter_name},
false, true);
- prewhere_info->prewhere_actions->projectInput(false);
- for (const auto & name : input.getNames())
- {
- prewhere_info->prewhere_actions->tryRestoreColumn(name);
- }
- return prewhere_info;
-}
-
DataTypePtr wrapNullableType(substrait::Type_Nullability nullable, DataTypePtr
nested_type)
{
return wrapNullableType(nullable ==
substrait::Type_Nullability_NULLABILITY_NULLABLE, nested_type);
@@ -534,9 +504,8 @@ QueryPlanPtr SerializedPlanParser::parseOp(const
substrait::Rel & rel, std::list
else
extension_table =
parseExtensionTable(split_infos.at(nextSplitInfoIndex()));
- MergeTreeRelParser mergeTreeParser(this, context,
query_context, global_context);
- std::list<const substrait::Rel *> stack;
- query_plan =
mergeTreeParser.parseReadRel(std::make_unique<QueryPlan>(), read,
extension_table, stack);
+ MergeTreeRelParser mergeTreeParser(this, context);
+ query_plan =
mergeTreeParser.parseReadRel(std::make_unique<QueryPlan>(), read,
extension_table);
steps = mergeTreeParser.getSteps();
}
break;
@@ -2256,9 +2225,8 @@ Block & LocalExecutor::getHeader()
return header;
}
-LocalExecutor::LocalExecutor(QueryContext & _query_context, ContextPtr
context_)
- : query_context(_query_context)
- , context(context_)
+LocalExecutor::LocalExecutor(ContextPtr context_)
+ : context(context_)
{
}
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
index 019d49b02..5bf7da25d 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.h
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.h
@@ -215,13 +215,6 @@ static const std::map<std::string, std::string>
SCALAR_FUNCTIONS
static const std::set<std::string> FUNCTION_NEED_KEEP_ARGUMENTS = {"alias"};
-struct QueryContext
-{
- StorageSnapshotPtr storage_snapshot;
- std::shared_ptr<const DB::StorageInMemoryMetadata> metadata;
- std::shared_ptr<CustomStorageMergeTree> custom_storage_merge_tree;
-};
-
DataTypePtr wrapNullableType(substrait::Type_Nullability nullable, DataTypePtr
nested_type);
DataTypePtr wrapNullableType(bool nullable, DataTypePtr nested_type);
@@ -275,7 +268,6 @@ public:
DB::QueryPlanStepPtr parseReadRealWithLocalFile(const substrait::ReadRel &
rel);
DB::QueryPlanStepPtr parseReadRealWithJavaIter(const substrait::ReadRel &
rel);
- PrewhereInfoPtr parsePreWhereInfo(const substrait::Expression & rel, Block
& input);
static bool isReadRelFromJava(const substrait::ReadRel & rel);
static bool isReadFromMergeTree(const substrait::ReadRel & rel);
@@ -289,15 +281,16 @@ public:
materialize_inputs.emplace_back(materialize_input);
}
- void addSplitInfo(std::string & split_info)
- {
- split_infos.emplace_back(std::move(split_info));
- }
+ void addSplitInfo(std::string & split_info) {
split_infos.emplace_back(std::move(split_info)); }
int nextSplitInfoIndex()
{
if (split_info_index >= split_infos.size())
- throw Exception(ErrorCodes::LOGICAL_ERROR, "split info index out
of range, split_info_index: {}, split_infos.size(): {}", split_info_index,
split_infos.size());
+ throw Exception(
+ ErrorCodes::LOGICAL_ERROR,
+ "split info index out of range, split_info_index: {},
split_infos.size(): {}",
+ split_info_index,
+ split_infos.size());
return split_info_index++;
}
@@ -316,7 +309,6 @@ public:
static ContextMutablePtr global_context;
static Context::ConfigurationPtr config;
static SharedContextHolder shared_context;
- QueryContext query_context;
std::vector<QueryPlanPtr> extra_plan_holder;
private:
@@ -415,7 +407,7 @@ class LocalExecutor : public BlockIterator
{
public:
LocalExecutor() = default;
- explicit LocalExecutor(QueryContext & _query_context, ContextPtr context);
+ explicit LocalExecutor(ContextPtr context);
void execute(QueryPlanPtr query_plan);
SparkRowInfoPtr next();
Block * nextColumnar();
@@ -430,7 +422,6 @@ public:
void setExtraPlanHolder(std::vector<QueryPlanPtr> & extra_plan_holder_) {
extra_plan_holder = std::move(extra_plan_holder_); }
private:
- QueryContext query_context;
std::unique_ptr<SparkRowInfo> writeBlockToSparkRow(DB::Block & block);
QueryPipeline query_pipeline;
std::unique_ptr<PullingPipelineExecutor> executor;
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index ee481c93f..cff32a83f 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -32,6 +32,55 @@ extern const int DUPLICATE_DATA_PART;
namespace local_engine
{
+
+void CustomStorageMergeTree::analysisPartsByRanges(DB::ReadFromMergeTree &
source, DB::RangesInDataParts ranges_in_data_parts)
+{
+ ReadFromMergeTree::AnalysisResult result;
+ result.column_names_to_read = source.getAllColumnNames();
+ /// If there are only virtual columns in the query, you must request at
least one non-virtual one.
+ if (result.column_names_to_read.empty())
+ {
+ NamesAndTypesList available_real_columns =
source.getStorageMetadata()->getColumns().getAllPhysical();
+
result.column_names_to_read.push_back(ExpressionActions::getSmallestColumn(available_real_columns).name);
+ }
+
+ result.sampling = MergeTreeDataSelectSamplingData();
+ result.parts_with_ranges = ranges_in_data_parts;
+
+ size_t sum_marks = 0;
+ size_t sum_ranges = 0;
+ size_t sum_rows = 0;
+ size_t total_marks_pk = 0;
+ size_t sum_marks_pk = 0;
+
+ for (const auto & part : result.parts_with_ranges)
+ {
+ sum_ranges += part.ranges.size();
+ sum_marks += part.getMarksCount();
+ sum_rows += part.getRowsCount();
+ total_marks_pk +=
part.data_part->index_granularity.getMarksCountWithoutFinal();
+
+ for (auto range : part.ranges)
+ sum_marks_pk += range.getNumberOfMarks();
+ }
+
+ result.total_parts = ranges_in_data_parts.size();
+ result.parts_before_pk = ranges_in_data_parts.size();
+ result.selected_parts = ranges_in_data_parts.size();
+ result.selected_ranges = sum_ranges;
+ result.selected_marks = sum_marks;
+ result.selected_marks_pk = sum_marks_pk;
+ result.total_marks_pk = total_marks_pk;
+ result.selected_rows = sum_rows;
+
+ if (source.getQueryInfo().input_order_info)
+ result.read_type = (source.getQueryInfo().input_order_info->direction
> 0)
+ ? MergeTreeReadType::InOrder
+ : MergeTreeReadType::InReverseOrder;
+
+
source.setAnalyzedResult(std::make_shared<ReadFromMergeTree::AnalysisResult>(std::move(result)));
+}
+
void CustomStorageMergeTree::wrapRangesInDataParts(DB::ReadFromMergeTree &
source, DB::RangesInDataParts ranges)
{
auto result = source.getAnalysisResult();
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
index dfaba5b71..0aeee4ef9 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
@@ -38,6 +38,7 @@ class CustomStorageMergeTree final : public MergeTreeData
public:
static void wrapRangesInDataParts(DB::ReadFromMergeTree & source,
DB::RangesInDataParts ranges);
+ void analysisPartsByRanges(DB::ReadFromMergeTree & source,
DB::RangesInDataParts ranges_in_data_parts);
CustomStorageMergeTree(
const StorageID & table_id_,
const String & relative_data_path_,
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
index 59d7af4e3..b15a15322 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
@@ -17,8 +17,8 @@
#pragma once
-#include <Common/MergeTreeTool.h>
#include <Storages/StorageMergeTreeFactory.h>
+#include <Common/MergeTreeTool.h>
namespace local_engine
{
@@ -31,4 +31,3 @@ void saveFileStatus(
IDataPartStorage & data_part_storage);
}
-
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 1518f8518..eb4d16d4c 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -280,7 +280,7 @@ JNIEXPORT jlong
Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_
std::string plan_string;
plan_string.assign(reinterpret_cast<const char *>(plan_address),
plan_size);
auto query_plan = parser.parse(plan_string);
- local_engine::LocalExecutor * executor = new
local_engine::LocalExecutor(parser.query_context, query_context);
+ local_engine::LocalExecutor * executor = new
local_engine::LocalExecutor(query_context);
executor->setMetric(parser.getMetric());
executor->setExtraPlanHolder(parser.extra_plan_holder);
executor->execute(std::move(query_plan));
@@ -1036,6 +1036,49 @@ JNIEXPORT jlong
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
+
+JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_filterRangesOnDriver(
+ JNIEnv * env, jclass, jbyteArray plan_, jbyteArray read_)
+{
+ LOCAL_ENGINE_JNI_METHOD_START
+ jsize plan_buf_size = env->GetArrayLength(plan_);
+ jbyte * plan_buf_addr = env->GetByteArrayElements(plan_, nullptr);
+ std::string plan_str;
+ plan_str.assign(reinterpret_cast<const char *>(plan_buf_addr),
plan_buf_size);
+
+ auto plan_ptr = std::make_unique<substrait::Plan>();
+ if (!plan_ptr->ParseFromString(plan_str))
+ throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse
substrait::Plan from string failed");
+
+ jsize read_buf_size = env->GetArrayLength(read_);
+ jbyte * read_buf_addr = env->GetByteArrayElements(read_, nullptr);
+ std::string filter_str;
+ filter_str.assign(reinterpret_cast<const char *>(read_buf_addr),
read_buf_size);
+
+ auto read_ptr = std::make_unique<substrait::Rel>();
+ ///
https://stackoverflow.com/questions/52028583/getting-error-parsing-protobuf-data
+ /// Parsing may fail when the number of recursive layers is large.
+ /// Here, set a limit large enough to avoid this problem.
+ /// Once this problem occurs, it is difficult to troubleshoot, because the
pb of c++ will not provide any valid information
+ google::protobuf::io::CodedInputStream coded_in(
+ reinterpret_cast<const uint8_t *>(filter_str.data()),
static_cast<int>(filter_str.size()));
+ coded_in.SetRecursionLimit(100000);
+
+ if (!read_ptr->ParseFromCodedStream(&coded_in))
+ throw Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse
substrait::Expression from string failed");
+
+
+ local_engine::SerializedPlanParser
parser(local_engine::SerializedPlanParser::global_context);
+ parser.parseExtensions(plan_ptr->extensions());
+ local_engine::MergeTreeRelParser mergeTreeParser(&parser,
local_engine::SerializedPlanParser::global_context);
+ auto res = mergeTreeParser.filterRangesOnDriver(read_ptr->read());
+
+ env->ReleaseByteArrayElements(plan_, plan_buf_addr, JNI_ABORT);
+ env->ReleaseByteArrayElements(read_, read_buf_addr, JNI_ABORT);
+ return stringTojstring(env, res.c_str());
+ LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
+}
+
JNIEXPORT void
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNIEnv
* env, jobject, jlong instanceId, jlong block_address)
{
@@ -1368,7 +1411,7 @@
Java_org_apache_gluten_vectorized_SimpleExpressionEval_createNativeInstance(JNIE
std::string plan_string;
plan_string.assign(reinterpret_cast<const char *>(plan_address),
plan_size);
auto query_plan = parser.parse(plan_string);
- local_engine::LocalExecutor * executor = new
local_engine::LocalExecutor(parser.query_context, context);
+ local_engine::LocalExecutor * executor = new
local_engine::LocalExecutor(context);
executor->execute(std::move(query_plan));
env->ReleaseByteArrayElements(plan, plan_address, JNI_ABORT);
return reinterpret_cast<jlong>(executor);
diff --git
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
index f5cfd1da5..d233ff85a 100644
---
a/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
+++
b/gluten-core/src/main/java/org/apache/gluten/substrait/rel/ExtensionTableNode.java
@@ -198,4 +198,8 @@ public class ExtensionTableNode implements SplitInfo {
public List<String> getPartList() {
return partList;
}
+
+ public String getExtensionTableStr() {
+ return extensionTableStr.toString();
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
index f5a1b2699..49a97a8a4 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi
import org.apache.gluten.substrait.expression.ExpressionNode
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
PartitionDirectory}
@@ -39,7 +39,8 @@ trait TransformerApi {
bucketedScan: Boolean,
optionalBucketSet: Option[BitSet],
optionalNumCoalescedBuckets: Option[Int],
- disableBucketedScan: Boolean): Seq[InputPartition]
+ disableBucketedScan: Boolean,
+ filterExprs: Seq[Expression] = Seq.empty): Seq[InputPartition]
/**
* Post process native config For example, for ClickHouse backend, sync
'spark.executor.cores' to
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index af7d1ed72..c3d2da7f0 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -115,7 +115,8 @@ abstract class FileSourceScanExecTransformerBase(
bucketedScan,
optionalBucketSet,
optionalNumCoalescedBuckets,
- disableBucketedScan)
+ disableBucketedScan,
+ filterExprs())
}
override def getPartitionSchema: StructType = relation.partitionSchema
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]