This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.10 by this push:
     new 576fc6b281 [KYUUBI #6857] Spark 3.4: MaxScanStrategy supports DSv2
576fc6b281 is described below

commit 576fc6b281ff6753573653ba5ccc69812101337a
Author: zhaohehuhu <[email protected]>
AuthorDate: Tue Dec 24 20:24:03 2024 +0800

    [KYUUBI #6857] Spark 3.4: MaxScanStrategy supports DSv2
    
    ### Why are the changes needed?
    
    Backport https://github.com/apache/kyuubi/pull/5852 to Spark 3.4, to 
enhance MaxScanStrategy to include support for the datasourcev2 in Spark 3.4
    
    ### How was this patch tested?
    
    Add some UTs
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #6857 from zhaohehuhu/dev-1224.
    
    Closes #6857
    
    c72c62984 [zhaohehuhu] remove the import
    dfbf2bc2d [zhaohehuhu] MaxScanStrategy supports DSv2 in Spark 3.4
    
    Authored-by: zhaohehuhu <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
    (cherry picked from commit 3c72fef4766a3f9a4e3ca3df85e38e4b656853f9)
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../kyuubi/sql/watchdog/MaxScanStrategy.scala      | 35 ++++++++++++
 ...portStatisticsAndPartitionAwareDataSource.scala | 64 ++++++++++++++++++++++
 .../spark/sql/ReportStatisticsDataSource.scala     | 53 ++++++++++++++++++
 .../org/apache/spark/sql/WatchDogSuiteBase.scala   | 30 ++++++++++
 4 files changed, 182 insertions(+)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
index 1ed55ebc2f..e647ad3250 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/watchdog/MaxScanStrategy.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.sql.KyuubiSQLConf
@@ -232,6 +233,40 @@ case class MaxScanStrategy(session: SparkSession)
               logicalRelation.catalogTable)
           }
         }
+      case ScanOperation(
+            _,
+            _,
+            _,
+            relation @ DataSourceV2ScanRelation(_, _, _, _, _)) =>
+        val table = relation.relation.table
+        if (table.partitioning().nonEmpty) {
+          val partitionColumnNames = table.partitioning().map(_.describe())
+          val stats = relation.computeStats()
+          lazy val scanFileSize = stats.sizeInBytes
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw new MaxFileSizeExceedException(
+              s"""
+                 |SQL job scan file size in bytes: $scanFileSize
+                 |exceed restrict of table scan maxFileSize 
${maxFileSizeOpt.get}
+                 |You should optimize your SQL logical according partition 
structure
+                 |or shorten query scope such as p_date, detail as below:
+                 |Table: ${table.name()}
+                 |Partition Structure: ${partitionColumnNames.mkString(",")}
+                 |""".stripMargin)
+          }
+        } else {
+          val stats = relation.computeStats()
+          lazy val scanFileSize = stats.sizeInBytes
+          if (maxFileSizeOpt.exists(_ < scanFileSize)) {
+            throw new MaxFileSizeExceedException(
+              s"""
+                 |SQL job scan file size in bytes: $scanFileSize
+                 |exceed restrict of table scan maxFileSize 
${maxFileSizeOpt.get}
+                 |detail as below:
+                 |Table: ${table.name()}
+                 |""".stripMargin)
+          }
+        }
       case _ =>
     }
   }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
new file mode 100644
index 0000000000..670d9ce7e4
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsAndPartitionAwareDataSource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.OptionalLong
+
+import org.apache.spark.sql.connector.{RangeInputPartition, SimpleBatchTable, 
SimpleScanBuilder, SimpleWritableDataSource}
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.expressions.{Expressions, 
FieldReference, Transform}
+import org.apache.spark.sql.connector.read._
+import 
org.apache.spark.sql.connector.read.partitioning.{KeyGroupedPartitioning, 
Partitioning}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class ReportStatisticsAndPartitionAwareDataSource extends 
SimpleWritableDataSource {
+
+  class MyScanBuilder(
+      val partitionKeys: Seq[String]) extends SimpleScanBuilder
+    with SupportsReportStatistics with SupportsReportPartitioning {
+
+    override def estimateStatistics(): Statistics = {
+      new Statistics {
+        override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
+
+        override def numRows(): OptionalLong = OptionalLong.of(10)
+
+      }
+    }
+
+    override def planInputPartitions(): Array[InputPartition] = {
+      Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+    }
+
+    override def outputPartitioning(): Partitioning = {
+      new KeyGroupedPartitioning(partitionKeys.map(FieldReference(_)).toArray, 
10)
+    }
+  }
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    new SimpleBatchTable {
+      override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+        new MyScanBuilder(Seq("i"))
+      }
+
+      override def partitioning(): Array[Transform] = {
+        Array(Expressions.identity("i"))
+      }
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
new file mode 100644
index 0000000000..2035d35256
--- /dev/null
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/ReportStatisticsDataSource.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.util.OptionalLong
+
+import org.apache.spark.sql.connector._
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read._
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class ReportStatisticsDataSource extends SimpleWritableDataSource {
+
+  class MyScanBuilder extends SimpleScanBuilder
+    with SupportsReportStatistics {
+
+    override def estimateStatistics(): Statistics = {
+      new Statistics {
+        override def sizeInBytes(): OptionalLong = OptionalLong.of(80)
+
+        override def numRows(): OptionalLong = OptionalLong.of(10)
+      }
+    }
+
+    override def planInputPartitions(): Array[InputPartition] = {
+      Array(RangeInputPartition(0, 5), RangeInputPartition(5, 10))
+    }
+
+  }
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    new SimpleBatchTable {
+      override def newScanBuilder(options: CaseInsensitiveStringMap): 
ScanBuilder = {
+        new MyScanBuilder
+      }
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
index 139efd9ca0..aec51cbd37 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/WatchDogSuiteBase.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 
 import org.apache.kyuubi.sql.{KyuubiSQLConf, KyuubiSQLExtensionException}
 import org.apache.kyuubi.sql.watchdog.{MaxFileSizeExceedException, 
MaxPartitionExceedException}
@@ -607,4 +608,33 @@ trait WatchDogSuiteBase extends 
KyuubiSparkSQLExtensionTest {
       assert(e.getMessage == "Script transformation is not allowed")
     }
   }
+
+  test("watchdog with scan maxFileSize -- data source v2") {
+    val df = 
spark.read.format(classOf[ReportStatisticsAndPartitionAwareDataSource].getName).load()
+    df.createOrReplaceTempView("test")
+    val logical = df.queryExecution.optimizedPlan.collect {
+      case d: DataSourceV2ScanRelation => d
+    }.head
+    val tableSize = logical.computeStats().sizeInBytes.toLong
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> 
tableSize.toString) {
+      sql("SELECT * FROM test").queryExecution.sparkPlan
+    }
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (tableSize / 
2).toString) {
+      intercept[MaxFileSizeExceedException](
+        sql("SELECT * FROM test").queryExecution.sparkPlan)
+    }
+    val nonPartDf = 
spark.read.format(classOf[ReportStatisticsDataSource].getName).load()
+    nonPartDf.createOrReplaceTempView("test_non_part")
+    val nonPartLogical = nonPartDf.queryExecution.optimizedPlan.collect {
+      case d: DataSourceV2ScanRelation => d
+    }.head
+    val nonPartTableSize = nonPartLogical.computeStats().sizeInBytes.toLong
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> 
nonPartTableSize.toString) {
+      sql("SELECT * FROM test_non_part").queryExecution.sparkPlan
+    }
+    withSQLConf(KyuubiSQLConf.WATCHDOG_MAX_FILE_SIZE.key -> (nonPartTableSize 
/ 2).toString) {
+      intercept[MaxFileSizeExceedException](
+        sql("SELECT * FROM test_non_part").queryExecution.sparkPlan)
+    }
+  }
 }

Reply via email to