This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 428a7a1f981 [SPARK-44865][SS] Make StreamingRelationV2 support
metadata column
428a7a1f981 is described below
commit 428a7a1f981b5dc2ce08832a0d839b29106dead6
Author: zeruibao <[email protected]>
AuthorDate: Fri Aug 25 22:08:57 2023 +0800
[SPARK-44865][SS] Make StreamingRelationV2 support metadata column
### What changes were proposed in this pull request?
Make StreamingRelationV2 support metadata column
### Why are the changes needed?
It is useful for CDC connector since CDC streaming needs some metadata
columns.
### How was this patch tested?
Unit test
Closes #42554 from zeruibao/SPARK-44865-support-metadata-column.
Authored-by: zeruibao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../catalyst/streaming/StreamingRelationV2.scala | 29 +++++++++++++++++---
.../streaming/test/DataStreamTableAPISuite.scala | 31 +++++++++++++++++++---
2 files changed, 53 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
index 06beb61f9ec..ab0352b606e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/StreamingRelationV2.scala
@@ -18,9 +18,10 @@
package org.apache.spark.sql.catalyst.streaming
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan,
Statistics}
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier,
Table, TableProvider}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.{ExposesMetadataColumns,
LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier,
SupportsMetadataColumns, Table, TableProvider}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
import org.apache.spark.sql.util.CaseInsensitiveStringMap
// We have to pack in the V1 data source as a shim, for the case when a source
implements
@@ -39,11 +40,31 @@ case class StreamingRelationV2(
catalog: Option[CatalogPlugin],
identifier: Option[Identifier],
v1Relation: Option[LogicalPlan])
- extends LeafNode with MultiInstanceRelation {
+ extends LeafNode with MultiInstanceRelation with ExposesMetadataColumns {
override lazy val resolved = v1Relation.forall(_.resolved)
override def isStreaming: Boolean = true
override def toString: String = sourceName
+ import DataSourceV2Implicits._
+
+ override lazy val metadataOutput: Seq[AttributeReference] = table match {
+ case hasMeta: SupportsMetadataColumns =>
+ metadataOutputWithOutConflicts(
+ hasMeta.metadataColumns.toAttributes,
hasMeta.canRenameConflictingMetadataColumns)
+ case _ =>
+ Nil
+ }
+
+ def withMetadataColumns(): StreamingRelationV2 = {
+ val newMetadata = metadataOutput.filterNot(outputSet.contains)
+ if (newMetadata.nonEmpty) {
+ StreamingRelationV2(source, sourceName, table, extraOptions,
+ output ++ newMetadata, catalog, identifier, v1Relation)
+ } else {
+ this
+ }
+ }
+
override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(conf.defaultSizeInBytes)
)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
index abe606ad9c1..d049f27c21e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.connector.{FakeV2Provider,
InMemoryTableSessionCatalog}
-import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, SupportsRead, Table, TableCapability,
V2TableWithV1Fallback}
+import org.apache.spark.sql.connector.catalog.{Identifier,
InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead,
Table, TableCapability, V2TableWithV1Fallback}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.execution.streaming.{MemoryStream,
MemoryStreamScanBuilder, StreamingQueryWrapper}
@@ -36,7 +36,7 @@ import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.streaming.sources.FakeScanBuilder
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.tags.SlowSQLTest
import org.apache.spark.util.Utils
@@ -499,6 +499,20 @@ class DataStreamTableAPISuite extends StreamTest with
BeforeAndAfter {
}
}
+ test("SPARK-44865: Test StreamingRelationV2 with metadata column") {
+ val tblName = "teststream.table_name"
+ withTable(tblName) {
+ spark.sql(s"CREATE TABLE $tblName (data int) USING foo")
+ val stream = MemoryStream[Int]
+ val testCatalog =
spark.sessionState.catalogManager.catalog("teststream").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
+ table.asInstanceOf[InMemoryStreamTable].setStream(stream)
+ // It will not throw UNRESOLVED_COLUMN exception because
+ // we add metadata column to StreamingRelationV2
+ spark.readStream.table(tblName).select("value", "_seq")
+ }
+ }
+
private def checkForStreamTable(dir: Option[File], tableName: String): Unit
= {
val memory = MemoryStream[Int]
val dsw = memory.toDS().writeStream.format("parquet")
@@ -576,7 +590,10 @@ object DataStreamTableAPISuite {
val V1FallbackTestTableName = "fallbackV1Test"
}
-class InMemoryStreamTable(override val name: String) extends Table with
SupportsRead {
+class InMemoryStreamTable(override val name: String)
+ extends Table
+ with SupportsRead
+ with SupportsMetadataColumns {
var stream: MemoryStream[Int] = _
def setStream(inputData: MemoryStream[Int]): Unit = stream = inputData
@@ -590,6 +607,14 @@ class InMemoryStreamTable(override val name: String)
extends Table with Supports
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
new MemoryStreamScanBuilder(stream)
}
+
+ private object SeqColumn extends MetadataColumn {
+ override def name: String = "_seq"
+ override def dataType: DataType = IntegerType
+ override def comment: String = "Seq"
+ }
+
+ override val metadataColumns: Array[MetadataColumn] = Array(SeqColumn)
}
class NonStreamV2Table(override val name: String)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]