This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1e93c408e19f [SPARK-45815][SQL][STREAMING] Provide an interface for
other Streaming sources to add `_metadata` columns
1e93c408e19f is described below
commit 1e93c408e19f4ce8cec8220fd5eb6c1cb76468ff
Author: Yaohua Zhao <[email protected]>
AuthorDate: Thu Nov 9 19:35:51 2023 +0800
[SPARK-45815][SQL][STREAMING] Provide an interface for other Streaming
sources to add `_metadata` columns
### What changes were proposed in this pull request?
Currently, only the native V1 file-based streaming source can read the
`_metadata` column:
https://github.com/apache/spark/blob/370870b7a0303e4a2c4b3dea1b479b4fcbc93f8d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala#L63
Our goal is to create an interface that allows other streaming sources to
add `_metadata` columns. For instance, we would like the Delta Streaming
source, which you can find here:
https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala#L49,
to extend this interface and provide the `_metadata` column for its underlying
storage format, such as Parquet.
### Why are the changes needed?
A generic interface to enable other streaming sources to expose and add
`_metadata` columns.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
N/A
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43692 from Yaohua628/spark-45815.
Authored-by: Yaohua Zhao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/streaming/StreamingRelation.scala | 11 ++++----
.../org/apache/spark/sql/sources/interfaces.scala | 31 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 135d46c5291e..c5d5a79d3454 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -29,6 +29,7 @@ import
org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat}
+import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
@@ -60,11 +61,11 @@ case class StreamingRelation(dataSource: DataSource,
sourceName: String, output:
override def newInstance(): LogicalPlan = this.copy(output =
output.map(_.newInstance()))
override lazy val metadataOutput: Seq[AttributeReference] = {
- dataSource.providingClass match {
- // If the dataSource provided class is a same or subclass of FileFormat
class
- case f if classOf[FileFormat].isAssignableFrom(f) =>
- metadataOutputWithOutConflicts(
-
Seq(dataSource.providingInstance().asInstanceOf[FileFormat].createFileMetadataCol()))
+ dataSource.providingInstance() match {
+ case f: FileFormat =>
metadataOutputWithOutConflicts(Seq(f.createFileMetadataCol()))
+ case s: SupportsStreamSourceMetadataColumns =>
+ metadataOutputWithOutConflicts(s.getMetadataOutput(
+ dataSource.sparkSession, dataSource.options,
dataSource.userSpecifiedSchema))
case _ => Nil
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 63e57c6804e1..d194ae77e968 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -309,3 +309,34 @@ trait InsertableRelation {
trait CatalystScan {
def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]):
RDD[Row]
}
+
+/**
+ * Implemented by StreamSourceProvider objects that can generate file metadata
columns.
+ * This trait extends the basic StreamSourceProvider by allowing the addition
of metadata
+ * columns to the schema of the Stream Data Source.
+ */
+trait SupportsStreamSourceMetadataColumns extends StreamSourceProvider {
+
+ /**
+ * Returns the metadata columns that should be added to the schema of the
Stream Source.
+ * These metadata columns supplement the columns
+ * defined in the sourceSchema() of the StreamSourceProvider.
+ *
+ * The final schema for the Stream Source, therefore, consists of the source
schema as
+ * defined by StreamSourceProvider.sourceSchema(), with the metadata columns
added at the end.
+ * The caller is responsible for resolving any naming conflicts with the
source schema.
+ *
+ * An example of using this streaming source metadata output interface is
+ * when a customized file-based streaming source needs to expose file
metadata columns,
+ * leveraging the hidden file metadata columns from its underlying storage
format.
+ *
+ * @param spark The SparkSession used for the operation.
+ * @param options A map of options of the Stream Data Source.
+ * @param userSpecifiedSchema An optional user-provided schema of the Stream
Data Source.
+ * @return A Seq of AttributeReference representing the metadata output
attributes.
+ */
+ def getMetadataOutput(
+ spark: SparkSession,
+ options: Map[String, String],
+ userSpecifiedSchema: Option[StructType]): Seq[AttributeReference]
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]