This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new eef44f0e8bd7 [SPARK-42040][SQL] SPJ: Introduce a new API for V2 input
partition to report partition statistics
eef44f0e8bd7 is described below
commit eef44f0e8bd79c9df9ffbf48220637ade0b15267
Author: qzhu <[email protected]>
AuthorDate: Tue Mar 26 21:56:03 2024 -0700
[SPARK-42040][SQL] SPJ: Introduce a new API for V2 input partition to
report partition statistics
### What changes were proposed in this pull request?
This PR introduces a new V2 min-in HasPartitionStatistics which can be used
to return the partition statistics of a InputPartition.
### Why are the changes needed?
As part of the Storage Partitioned Join work
([SPIP](https://issues.apache.org/jira/browse/SPARK-37166)), we'll need to
introduce a way for a V2 InputPartition to return its partition size, It's
useful for a InputPartition to also report its size (in bytes), so that Spark
can use the info to decide whether partition grouping should be applied or not.
This will be used later in follow-up PRs.
### Does this PR introduce _any_ user-facing change?
Yes, a new V2 mix-in HasPartitionStatistics will be introduced.
### How was this patch tested?
Extended InMemoryTable to support this new interface, and added a new unit
test to verify the API, and the mock the partition statistics to test.
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45314 from zhuqi-lucas/SPARK-42040.
Authored-by: qzhu <[email protected]>
Signed-off-by: Your Name <[email protected]>
---
.../sql/connector/read/HasPartitionStatistics.java | 47 +++++++++++++++++
.../sql/connector/catalog/InMemoryBaseTable.scala | 5 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 61 ++++++++++++++++++++++
3 files changed, 112 insertions(+), 1 deletion(-)
diff --git
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/HasPartitionStatistics.java
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/HasPartitionStatistics.java
new file mode 100644
index 000000000000..7416f90ea1f5
--- /dev/null
+++
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/HasPartitionStatistics.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.read;
+
+import java.util.OptionalLong;
+
+/**
+ * A mix-in for input partitions whose records are clustered on the same set
of partition keys
+ * (provided via {@link SupportsReportPartitioning}, see below). Data sources
can opt-in to
+ * implement this interface for the partitions they report to Spark, which
will use the info
+ * to decide whether partition grouping should be applied or not.
+ *
+ * @see org.apache.spark.sql.connector.read.SupportsReportPartitioning
+ * @since 4.0.0
+ */
+public interface HasPartitionStatistics extends InputPartition {
+
+ /**
+ * Returns the size in bytes of the partition statistics associated to this
partition.
+ */
+ OptionalLong sizeInBytes();
+
+ /**
+ * Returns the number of rows in the partition statistics associated to this
partition.
+ */
+ OptionalLong numRows();
+
+ /**
+ * Returns the count of files in the partition statistics associated to this
partition.
+ */
+ OptionalLong filesCount();
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
index c1967f558c17..505a5a616920 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
@@ -605,7 +605,7 @@ object InMemoryBaseTable {
}
class BufferedRows(val key: Seq[Any] = Seq.empty) extends WriterCommitMessage
- with InputPartition with HasPartitionKey with Serializable {
+ with InputPartition with HasPartitionKey with HasPartitionStatistics with
Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()
val deletes = new mutable.ArrayBuffer[Int]()
@@ -617,6 +617,9 @@ class BufferedRows(val key: Seq[Any] = Seq.empty) extends
WriterCommitMessage
def keyString(): String = key.toArray.mkString("/")
override def partitionKey(): InternalRow = PartitionInternalRow(key.toArray)
+ override def sizeInBytes(): OptionalLong = OptionalLong.of(100L)
+ override def numRows(): OptionalLong = OptionalLong.of(rows.size)
+ override def filesCount(): OptionalLong = OptionalLong.of(100L)
def clear(): Unit = rows.clear()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 93f199dfd585..d89c0a2525fd 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2928,6 +2928,67 @@ class DataSourceV2SQLSuiteV1Filter
}
}
+ test("Check HasPartitionStatistics from InMemoryPartitionTable") {
+ val t = "testpart.tbl"
+ withTable(t) {
+ sql(s"CREATE TABLE $t (id string) USING foo PARTITIONED BY (key int)")
+ val table = catalog("testpart").asTableCatalog
+ .loadTable(Identifier.of(Array(), "tbl"))
+ .asInstanceOf[InMemoryPartitionTable]
+
+ var partSizes = table.data.map(_.sizeInBytes().getAsLong)
+ var partRowCounts = table.data.map(_.numRows().getAsLong)
+ var partFiles = table.data.map(_.filesCount().getAsLong)
+ assert(partSizes.length == 0)
+ assert(partRowCounts.length == 0)
+ assert(partFiles.length == 0)
+
+ sql(s"INSERT INTO $t VALUES ('a', 1), ('b', 2), ('c', 3)")
+ partSizes = table.data.map(_.sizeInBytes().getAsLong)
+ assert(partSizes.length == 3)
+ assert(partSizes.toSet == Set(100, 100, 100))
+ partRowCounts = table.data.map(_.numRows().getAsLong)
+ assert(partRowCounts.length == 3)
+ assert(partRowCounts.toSet == Set(1, 1, 1))
+ partFiles = table.data.map(_.filesCount().getAsLong)
+ assert(partFiles.length == 3)
+ assert(partFiles.toSet == Set(100, 100, 100))
+
+ sql(s"ALTER TABLE $t DROP PARTITION (key=3)")
+ partSizes = table.data.map(_.sizeInBytes().getAsLong)
+ assert(partSizes.length == 2)
+ assert(partSizes.toSet == Set(100, 100))
+ partRowCounts = table.data.map(_.numRows().getAsLong)
+ assert(partRowCounts.length == 2)
+ assert(partRowCounts.toSet == Set(1, 1))
+ partFiles = table.data.map(_.filesCount().getAsLong)
+ assert(partFiles.length == 2)
+ assert(partFiles.toSet == Set(100, 100))
+
+ sql(s"ALTER TABLE $t ADD PARTITION (key=4)")
+ partSizes = table.data.map(_.sizeInBytes().getAsLong)
+ assert(partSizes.length == 3)
+ assert(partSizes.toSet == Set(100, 100, 100))
+ partRowCounts = table.data.map(_.numRows().getAsLong)
+ assert(partRowCounts.length == 3)
+ assert(partRowCounts.toSet == Set(1, 1, 0))
+ partFiles = table.data.map(_.filesCount().getAsLong)
+ assert(partFiles.length == 3)
+ assert(partFiles.toSet == Set(100, 100, 100))
+
+ sql(s"INSERT INTO $t VALUES ('c', 3), ('e', 5)")
+ partSizes = table.data.map(_.sizeInBytes().getAsLong)
+ assert(partSizes.length == 5)
+ assert(partSizes.toSet == Set(100, 100, 100, 100, 100))
+ partRowCounts = table.data.map(_.numRows().getAsLong)
+ assert(partRowCounts.length == 5)
+ assert(partRowCounts.toSet == Set(1, 1, 0, 1, 1))
+ partFiles = table.data.map(_.filesCount().getAsLong)
+ assert(partFiles.length == 5)
+ assert(partFiles.toSet == Set(100, 100, 100, 100, 100))
+ }
+ }
+
test("time travel") {
sql("use testcat")
// The testing in-memory table simply append the version/timestamp to the
table name when
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]