This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 238c893f3 [KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2
238c893f3 is described below

commit 238c893f39076ecfc58b50347f69bc9dab620999
Author: hezhao2 <[email protected]>
AuthorDate: Wed Apr 17 16:29:50 2024 +0800

    [KYUUBI #6315] Spark 3.5: MaxScanStrategy supports DSv2
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    Now, MaxScanStrategy can be adopted to limit max scan file size in some 
datasources, such as Hive. Hopefully we can enhance MaxScanStrategy to include 
support for the datasourcev2.
    ## Describe Your Solution ๐Ÿ”ง
    
    get the statistics about files scanned through datasourcev2 API
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [x] I have performed a self-review
    - [x] I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [x] My changes generate no new warnings
    - [x] I have added tests that prove my fix is effective or that my feature 
works
    - [x] New and existing unit tests pass locally with my changes
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [x] Pull request title is okay.
    - [x] No license issues.
    - [x] Milestone correctly set?
    - [x] Test coverage is ok
    - [x] Assignees are selected.
    - [ ] Minimum number of approvals
    - [ ] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5852 from zhaohehuhu/dev-1213.
    
    Closes #6315
    
    3c5b0c276 [hezhao2] reformat
    fb113d625 [hezhao2] disable the rule that checks the maxPartitions for dsv2
    acc358732 [hezhao2] disable the rule that checks the maxPartitions for dsv2
    c8399a021 [hezhao2] fix header
    70c845bee [hezhao2] add UTs
    3a0739686 [hezhao2] add ut
    4d26ce131 [hezhao2] reformat
    f87cb072c [hezhao2] reformat
    b307022b8 [hezhao2] move code to Spark 3.5
    73258c2ae [hezhao2] fix unused import
    cf893a0e1 [hezhao2] drop reflection for loading iceberg class
    dc128bc8e [hezhao2] refactor code
    661834cce [hezhao2] revert code
    6061f42ab [hezhao2] delete IcebergSparkPlanHelper
    5f1c3c082 [hezhao2] fix
    b15652f05 [hezhao2] remove iceberg dependency
    fe620ca92 [hezhao2] enable MaxScanStrategy when accessing iceberg datasource
    
    Authored-by: hezhao2 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 8edcb005ee8c863d5fd070e276a05d9c0c30d018)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/sql/watchdog/MaxScanStrategy.scala      | 35 ++++++++++++
 ...portStatisticsAndPartitionAwareDataSource.scala | 64 ++++++++++++++++++++++
 .../spark/sql/ReportStatisticsDataSource.scala     | 53 ++++++++++++++++++
 .../org/apache/spark/sql/WatchDogSuiteBase.scala   | 33 +++++++++++
 4 files changed, 185 insertions(+)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
index 1ed55ebc2..e647ad325 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
@@ -232,6 +233,40 @@ case class MaxScanStrategy(session: SparkSession)
               logicalRelation.catalogTable)
           }
         }
+      case ScanOperation(
+            _,
+            _,
+            _,
+            relation @ DataSourceV2ScanRelation(_, _, _, _, _)) =>
+        val table = relation.relation.table
+        if (table.partitioning().nonEmpty) {
+          val partitionColumnNames = table.partitioning().map(_.describe())
+          val stats = relation.computeStats()
+          lazy val scanFileSize = stats.sizeInBytes
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw new MaxFileSizeExceedException(
+              s"""
+                 |SQL job scan file size in bytes: $scanFileSize
+                 |exceed restrict of table scan maxFileSize 
${maxFileSizeOpt.get}
+                 |You should optimize your SQL logical according partition 
structure
+                 |or shorten query scope such as p_date, detail as below:
+                 |Table: ${table.name()}
+                 |Partition Structure: ${partitionColumnNames.mkString(",")}
+                 |""".stripMargin)
+          }
+        } else {
+          val stats = relation.computeStats()
+          lazy val scanFileSize = stats.sizeInBytes
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw new MaxFileSizeExceedException(
+              s"""
+                 |SQL job scan file size in bytes: $scanFileSize
+                 |exceed restrict of table scan maxFileSize 
${maxFileSizeOpt.get}
+                 |detail as below:
+                 |Table: ${table.name()}
+                 |""".stripMargin)
+          }
+        }
       case _ =>
     }
   }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
new file mode 100644
index 000000000..136ced538
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.OptionalLong
+
+import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, 
SimpleScanBuilder, SimpleWritableDataSource}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.expressions.{Expressions, 
FieldReference, Transform}
+import org.apache.spark.sql.connector.read.{InputPartition, ScanBuilder, 
Statistics, SupportsReportPartitioning, SupportsReportStatistics}
+import 
org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, 
Partitioning}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class ReportStatisticsAndPartitionAwareDataSource extends 
SimpleWritableDataSource {
+
+  class MyScanBuilder(
+      val partitionKeys: Seq[String]) extends SimpleScanBuilder
+    with SupportsReportStatistics with SupportsReportPartitioning {
+
+    override def estimateStatistics(): Statistics = {
+      new Statistics {
+        override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
+
+        override def numRows(): OptionalLong = OptionalLong.of(10)
+
+      }
+    }
+
+    override def planInputPartitions(): Array[InputPartition] = {
+      Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+    }
+
+    override def outputPartitioning(): Partitioning = {
+      new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 
10)
+    }
+  }
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    new SimpleBatchTable {
+      override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+        new MyScanBuilder(Seq("i"))
+      }
+
+      override def partitioning(): Array[Transform] = {
+        Array(Expressions.identity("i"))
+      }
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
new file mode 100644
index 000000000..2035d3525
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.OptionalLong
+
+import org.apache.spark.sql.connector._
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class ReportStatisticsDataSource extends SimpleWritableDataSource {
+
+  class MyScanBuilder extends SimpleScanBuilder
+    with SupportsReportStatistics {
+
+    override def estimateStatistics(): Statistics = {
+      new Statistics {
+        override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
+
+        override def numRows(): OptionalLong = OptionalLong.of(10)
+      }
+    }
+
+    override def planInputPartitions(): Array[InputPartition] = {
+      Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+    }
+
+  }
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    new SimpleBatchTable {
+      override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+        new MyScanBuilder
+      }
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index 139efd9ca..22c998d3b 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
 import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, 
MaxPartitionExceedException}
@@ -607,4 +608,36 @@ trait WatchDogSuiteBase extends 
KyuubiSparkSQLExtensionTest {
       assert(e.getMessage == "Script transformation is not allowed")
     }
   }
+
+  test("watchdog with scan maxFileSize -- data source v2") {
+    val df = 
spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load()
+    df.createOrReplaceTempView("test")
+    val logical = df.queryExecution.optimizedPlan.collect {
+      case d: DataSourceV2ScanRelation => d
+    }.head
+    val tableSize = logical.computeStats().sizeInBytes.toLong
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> 
tableSize.toString) {
+      sql("SELECT * FROM test").queryExecution.sparkPlan
+    }
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 
2).toString) {
+      intercept[MaxFileSizeExceedException](
+        sql("SELECT * FROM test").queryExecution.sparkPlan)
+    }
+
+    val nonPartDf = 
spark.read.format(classOf[ReportStatisticsDataSource].getName).load()
+    nonPartDf.createOrReplaceTempView("test_non_part")
+    val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect {
+      case d: DataSourceV2ScanRelation => d
+    }.head
+    val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> 
nonPartTableSize.toString) {
+      sql("SELECT * FROM test_non_part").queryExecution.sparkPlan
+    }
+
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize 
/ 2).toString) {
+      intercept[MaxFileSizeExceedException](
+        sql("SELECT * FROM test_non_part").queryExecution.sparkPlan)
+    }
+  }
 }

Reply via email to