This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 428a7a1f981 [SPARK-44865][SS] Make StreamingRelationV2 support 
metadata column
428a7a1f981 is described below

commit 428a7a1f981b5dc2ce08832a0d839b29106dead6
Author: zeruibao <[email protected]>
AuthorDate: Fri Aug 25 22:08:57 2023 +0800

    [SPARK-44865][SS] Make StreamingRelationV2 support metadata column
    
    ### What changes were proposed in this pull request?
    Make StreamingRelationV2 support metadata column
    
    ### Why are the changes needed?
    It is useful for CDC connector since CDC streaming needs some metadata 
columns.
    
    ### How was this patch tested?
    Unit test
    
    Closes #42554 from zeruibao/SPARK-44865-support-metadata-column.
    
    Authored-by: zeruibao <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../catalyst/streaming/StreamingRelationV2.scala   | 29 +++++++++++++++++---
 .../streaming/test/DataStreamTableAPISuite.scala   | 31 +++++++++++++++++++---
 2 files changed, 53 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
index 06beb61f9ec..ab0352b606e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
@@ -18,9 +18,10 @@
 package org.apache.spark.sql.catalyst.streaming
 
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
Statistics}
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, 
Table, TableProvider}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns, 
LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, 
SupportsMetadataColumns, Table, TableProvider}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 // We have to pack in the V1 data source as a shim, for the case when a source 
implements
@@ -39,11 +40,31 @@ case class StreamingRelationV2(
     catalog: Option[CatalogPlugin],
     identifier: Option[Identifier],
     v1Relation: Option[LogicalPlan])
-  extends LeafNode with MultiInstanceRelation {
+  extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
   override lazy val resolved = v1Relation.forall(_.resolved)
   override def isStreaming: Boolean = true
   override def toString: String = sourceName
 
+  import DataSourceV2Implicits._
+
+  override lazy val metadataOutput: Seq[AttributeReference] = table match {
+    case hasMeta: SupportsMetadataColumns =>
+      metadataOutputWithOutConflicts(
+        hasMeta.metadataColumns.toAttributes, 
hasMeta.canRenameConflictingMetadataColumns)
+    case _ =>
+      Nil
+  }
+
+  def withMetadataColumns(): StreamingRelationV2 = {
+    val newMetadata = metadataOutput.filterNot(outputSet.contains)
+    if (newMetadata.nonEmpty) {
+      StreamingRelationV2(source, sourceName, table, extraOptions,
+        output ++ newMetadata, catalog, identifier, v1Relation)
+    } else {
+      this
+    }
+  }
+
   override def computeStats(): Statistics = Statistics(
     sizeInBytes = BigInt(conf.defaultSizeInBytes)
   )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index abe606ad9c1..d049f27c21e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
 import org.apache.spark.sql.connector.{FakeV2Provider, 
InMemoryTableSessionCatalog}
-import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, SupportsRead, Table, TableCapability, 
V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.catalog.{Identifier, 
InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, 
Table, TableCapability, V2TableWithV1Fallback}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
MemoryStreamScanBuilder, StreamingQueryWrapper}
@@ -36,7 +36,7 @@ import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.StreamTest
 import org.apache.spark.sql.streaming.sources.FakeScanBuilder
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.tags.SlowSQLTest
 import org.apache.spark.util.Utils
@@ -499,6 +499,20 @@ class DataStreamTableAPISuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
+  test("SPARK-44865: Test StreamingRelationV2 with metadata column") {
+    val tblName = "teststream.table_name"
+    withTable(tblName) {
+      spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+      val stream = MemoryStream[Int]
+      val testCatalog = 
spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+      val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+      table.asInstanceOf[InMemoryStreamTable].setStream(stream)
+      // It will not throw UNRESOLVED_COLUMN exception because
+      // we add metadata column to StreamingRelationV2
+      spark.readStream.table(tblName).select("value", "_seq")
+    }
+  }
+
   private def checkForStreamTable(dir: Option[File], tableName: String): Unit 
= {
     val memory = MemoryStream[Int]
     val dsw = memory.toDS().writeStream.format("parquet")
@@ -576,7 +590,10 @@ object DataStreamTableAPISuite {
   val V1FallbackTestTableName = "fallbackV1Test"
 }
 
-class InMemoryStreamTable(override val name: String) extends Table with 
SupportsRead {
+class InMemoryStreamTable(override val name: String)
+  extends Table
+  with SupportsRead
+  with SupportsMetadataColumns {
   var stream: MemoryStream[Int] = _
 
   def setStream(inputData: MemoryStream[Int]): Unit = stream = inputData
@@ -590,6 +607,14 @@ class InMemoryStreamTable(override val name: String) 
extends Table with Supports
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     new MemoryStreamScanBuilder(stream)
   }
+
+  private object SeqColumn extends MetadataColumn {
+    override def name: String = "_seq"
+    override def dataType: DataType = IntegerType
+    override def comment: String = "Seq"
+  }
+
+  override val metadataColumns: Array[MetadataColumn] = Array(SeqColumn)
 }
 
 class NonStreamV2Table(override val name: String)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to