This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new 238c893f3 [KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2
238c893f3 is described below
commit 238c893f39076ecfc58b50347f69bc9dab620999
Author: hezhao2 <[email protected]>
AuthorDate: Wed Apr 17 16:29:50 2024 +0800
[KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2
# :mag: Description
## Issue References ๐
Now, MaxScanStrategy can be adopted to limit max scan file size in some
datasources, such as Hive. Hopefully we can enhance MaxScanStrategy to include
support for the datasourcev2.
## Describe Your Solution ๐ง
get the statistics about files scanned through datasourcev2 API
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my feature
works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [ ] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5852 from zhaohehuhu/dev-1213.
Closes #6315
3c5b0c276 [hezhao2] reformat
fb113d625 [hezhao2] disable the rule that checks the maxPartitions for dsv2
acc358732 [hezhao2] disable the rule that checks the maxPartitions for dsv2
c8399a021 [hezhao2] fix header
70c845bee [hezhao2] add UTs
3a0739686 [hezhao2] add ut
4d26ce131 [hezhao2] reformat
f87cb072c [hezhao2] reformat
b307022b8 [hezhao2] move code to Spark 3.5
73258c2ae [hezhao2] fix unused import
cf893a0e1 [hezhao2] drop reflection for loading iceberg class
dc128bc8e [hezhao2] refactor code
661834cce [hezhao2] revert code
6061f42ab [hezhao2] delete IcebergSparkPlanHelper
5f1c3c082 [hezhao2] fix
b15652f05 [hezhao2] remove iceberg dependency
fe620ca92 [hezhao2] enable MaxScanStrategy when accessing iceberg datasource
Authored-by: hezhao2 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 8edcb005ee8c863d5fd070e276a05d9c0c30d018)
Signed-off-by: Cheng Pan <[email protected]>
---
.../kyuubi/sql/watchdog/MaxScanStrategy.scala | 35 ++++++++++++
...portStatisticsAndPartitionAwareDataSource.scala | 64 ++++++++++++++++++++++
.../spark/sql/ReportStatisticsDataSource.scala | 53 ++++++++++++++++++
.../org/apache/spark/sql/WatchDogSuiteBase.scala | 33 +++++++++++
4 files changed, 185 insertions(+)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
index 1ed55ebc2..e647ad325 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{CatalogFileIndex,
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.sql.KyuubiSQLConf
@@ -232,6 +233,40 @@ case class MaxScanStrategy(session: SparkSession)
logicalRelation.catalogTable)
}
}
+ case ScanOperation(
+ _,
+ _,
+ _,
+ relation @ DataSourceV2ScanRelation(_, _, _, _, _)) =>
+ val table = relation.relation.table
+ if (table.partitioning().nonEmpty) {
+ val partitionColumnNames = table.partitioning().map(_.describe())
+ val stats = relation.computeStats()
+ lazy val scanFileSize = stats.sizeInBytes
+ if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+ throw new MaxFileSizeExceedException(
+ s"""
+ |SQL job scan file size in bytes: $scanFileSize
+ |exceed restrict of table scan maxFileSize
${maxFileSizeOpt.get}
+ |You should optimize your SQL logical according partition
structure
+ |or shorten query scope such as p_date, detail as below:
+ |Table: ${table.name()}
+ |Partition Structure: ${partitionColumnNames.mkString(",")}
+ |""".stripMargin)
+ }
+ } else {
+ val stats = relation.computeStats()
+ lazy val scanFileSize = stats.sizeInBytes
+ if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+ throw new MaxFileSizeExceedException(
+ s"""
+ |SQL job scan file size in bytes: $scanFileSize
+ |exceed restrict of table scan maxFileSize
${maxFileSizeOpt.get}
+ |detail as below:
+ |Table: ${table.name()}
+ |""".stripMargin)
+ }
+ }
case _ =>
}
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
new file mode 100644
index 000000000..136ced538
--- /dev/null
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.OptionalLong
+
+import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable,
SimpleScanBuilder, SimpleWritableDataSource}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.expressions.{Expressions,
FieldReference, Transform}
+import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder,
Statistics, SupportsReportPartitioning, SupportsReportStatistics}
+import
org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning,
Partitioning}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class ReportStatisticsAndPartitionAwareDataSource extends
SimpleWritableDataSource {
+
+ class MyScanBuilder(
+ val partitionKeys: Seq[String]) extends SimpleScanBuilder
+ with SupportsReportStatistics with SupportsReportPartitioning {
+
+ override def estimateStatistics(): Statistics = {
+ new Statistics {
+ override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
+
+ override def numRows(): OptionalLong = OptionalLong.of(10)
+
+ }
+ }
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+ }
+
+ override def outputPartitioning(): Partitioning = {
+ new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray,
10)
+ }
+ }
+
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
+ new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
ScanBuilder = {
+ new MyScanBuilder(Seq("i"))
+ }
+
+ override def partitioning(): Array[Transform] = {
+ Array(Expressions.identity("i"))
+ }
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
new file mode 100644
index 000000000..2035d3525
--- /dev/null
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.OptionalLong
+
+import org.apache.spark.sql.connector._
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class ReportStatisticsDataSource extends SimpleWritableDataSource {
+
+ class MyScanBuilder extends SimpleScanBuilder
+ with SupportsReportStatistics {
+
+ override def estimateStatistics(): Statistics = {
+ new Statistics {
+ override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
+
+ override def numRows(): OptionalLong = OptionalLong.of(10)
+ }
+ }
+
+ override def planInputPartitions(): Array[InputPartition] = {
+ Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+ }
+
+ }
+
+ override def getTable(options: CaseInsensitiveStringMap): Table = {
+ new SimpleBatchTable {
+ override def newScanBuilder(options: CaseInsensitiveStringMap):
ScanBuilder = {
+ new MyScanBuilder
+ }
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index 139efd9ca..22c998d3b 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException,
MaxPartitionExceedException}
@@ -607,4 +608,36 @@ trait WatchDogSuiteBase extends
KyuubiSparkSQLExtensionTest {
assert(e.getMessage == "Script transformation is not allowed")
}
}
+
+ test("watchdog with scan maxFileSize -- data source v2") {
+ val df =
spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load()
+ df.createOrReplaceTempView("test")
+ val logical = df.queryExecution.optimizedPlan.collect {
+ case d: DataSourceV2ScanRelation => d
+ }.head
+ val tableSize = logical.computeStats().sizeInBytes.toLong
+ withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key ->
tableSize.toString) {
+ sql("SELECT * FROM test").queryExecution.sparkPlan
+ }
+ withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize /
2).toString) {
+ intercept[MaxFileSizeExceedException](
+ sql("SELECT * FROM test").queryExecution.sparkPlan)
+ }
+
+ val nonPartDf =
spark.read.format(classOf[ReportStatisticsDataSource].getName).load()
+ nonPartDf.createOrReplaceTempView("test_non_part")
+ val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect {
+ case d: DataSourceV2ScanRelation => d
+ }.head
+ val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong
+
+ withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key ->
nonPartTableSize.toString) {
+ sql("SELECT * FROM test_non_part").queryExecution.sparkPlan
+ }
+
+ withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize
/ 2).toString) {
+ intercept[MaxFileSizeExceedException](
+ sql("SELECT * FROM test_non_part").queryExecution.sparkPlan)
+ }
+ }
}