This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e93c408e19f [SPARK-45815][SQL][STREAMING] Provide an interface for 
other Streaming sources to add `_metadata` columns
1e93c408e19f is described below

commit 1e93c408e19f4ce8cec8220fd5eb6c1cb76468ff
Author: Yaohua Zhao <[email protected]>
AuthorDate: Thu Nov 9 19:35:51 2023 +0800

    [SPARK-45815][SQL][STREAMING] Provide an interface for other Streaming 
sources to add `_metadata` columns
    
    ### What changes were proposed in this pull request?
    
    Currently, only the native V1 file-based streaming source can read the 
`_metadata` column: 
https://github.com/apache/spark/blob/370870b7a0303e4a2c4b3dea1b479b4fcbc93f8d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala#L63
    
    Our goal is to create an interface that allows other streaming sources to 
add `_metadata` columns. For instance, we would like the Delta Streaming 
source, which you can find here: 
https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala#L49,
 to extend this interface and provide the `_metadata` column for its underlying 
storage format, such as Parquet.
    
    ### Why are the changes needed?
    A generic interface to enable other streaming sources to expose and add 
`_metadata` columns.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    N/A
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43692 from Yaohua628/spark-45815.
    
    Authored-by: Yaohua Zhao <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/streaming/StreamingRelation.scala    | 11 ++++----
 .../org/apache/spark/sql/sources/interfaces.scala  | 31 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 135d46c5291e..c5d5a79d3454 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat}
+import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
@@ -60,11 +61,11 @@ case class StreamingRelation(dataSource: DataSource, 
sourceName: String, output:
   override def newInstance(): LogicalPlan = this.copy(output = 
output.map(_.newInstance()))
 
   override lazy val metadataOutput: Seq[AttributeReference] = {
-    dataSource.providingClass match {
-      // If the dataSource provided class is a same or subclass of FileFormat 
class
-      case f if classOf[FileFormat].isAssignableFrom(f) =>
-        metadataOutputWithOutConflicts(
-          
Seq(dataSource.providingInstance().asInstanceOf[FileFormat].createFileMetadataCol()))
+    dataSource.providingInstance() match {
+      case f: FileFormat => 
metadataOutputWithOutConflicts(Seq(f.createFileMetadataCol()))
+      case s: SupportsStreamSourceMetadataColumns =>
+        metadataOutputWithOutConflicts(s.getMetadataOutput(
+          dataSource.sparkSession, dataSource.options, 
dataSource.userSpecifiedSchema))
       case _ => Nil
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 63e57c6804e1..d194ae77e968 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -309,3 +309,34 @@ trait InsertableRelation {
 trait CatalystScan {
   def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): 
RDD[Row]
 }
+
+/**
+ * Implemented by StreamSourceProvider objects that can generate file metadata 
columns.
+ * This trait extends the basic StreamSourceProvider by allowing the addition 
of metadata
+ * columns to the schema of the Stream Data Source.
+ */
+trait SupportsStreamSourceMetadataColumns extends StreamSourceProvider {
+
+  /**
+   * Returns the metadata columns that should be added to the schema of the 
Stream Source.
+   * These metadata columns supplement the columns
+   * defined in the sourceSchema() of the StreamSourceProvider.
+   *
+   * The final schema for the Stream Source, therefore, consists of the source 
schema as
+   * defined by StreamSourceProvider.sourceSchema(), with the metadata columns 
added at the end.
+   * The caller is responsible for resolving any naming conflicts with the 
source schema.
+   *
+   * An example of using this streaming source metadata output interface is
+   * when a customized file-based streaming source needs to expose file 
metadata columns,
+   * leveraging the hidden file metadata columns from its underlying storage 
format.
+   *
+   * @param spark               The SparkSession used for the operation.
+   * @param options             A map of options of the Stream Data Source.
+   * @param userSpecifiedSchema An optional user-provided schema of the Stream 
Data Source.
+   * @return A Seq of AttributeReference representing the metadata output 
attributes.
+   */
+  def getMetadataOutput(
+      spark: SparkSession,
+      options: Map[String, String],
+      userSpecifiedSchema: Option[StructType]): Seq[AttributeReference]
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to