This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bae835ccbc8 [SPARK-53720][SQL] Simplify extracting Table from 
DataSourceV2Relation (#52460)
6bae835ccbc8 is described below

commit 6bae835ccbc8850ac5e2ab0225c6cd75921f06b4
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Oct 13 16:19:45 2025 -0700

    [SPARK-53720][SQL] Simplify extracting Table from DataSourceV2Relation 
(#52460)
    
    ### What changes were proposed in this pull request?
    
    This PR adds a new extractor for `Table` from `DataSourceV2Relation`.
    
    ### Why are the changes needed?
    
    As we see over time, `DataSourceV2Relation` continues to evolve and has 
many args. Frequently, we only need to get the table instance and are not 
interested in other arguments. Therefore, it makes sense to add a new extractor 
for this common use case. In particular, I plan to add `TimeTravelSpec` and I 
don't want to update a ton of places for no good reason.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR relies on existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
---
 .../spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala   |  8 ++++----
 .../spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala    |  4 ++--
 .../spark/sql/catalyst/analysis/RewriteUpdateTable.scala       |  4 ++--
 .../org/apache/spark/sql/catalyst/planning/patterns.scala      |  5 ++---
 .../apache/spark/sql/catalyst/plans/logical/v2Commands.scala   | 10 +++++-----
 .../sql/execution/datasources/v2/DataSourceV2Relation.scala    |  4 ++++
 .../src/main/scala/org/apache/spark/sql/classic/Dataset.scala  |  5 ++---
 .../scala/org/apache/spark/sql/execution/CacheManager.scala    |  4 ++--
 .../spark/sql/execution/datasources/DataSourceStrategy.scala   |  5 ++---
 .../spark/sql/execution/datasources/FallBackFileSourceV2.scala |  4 ++--
 .../sql/execution/datasources/v2/DataSourceV2Strategy.scala    |  4 ++--
 .../datasources/parquet/ParquetPartitionDiscoverySuite.scala   |  4 ++--
 .../org/apache/spark/sql/streaming/FileStreamSinkSuite.scala   |  4 ++--
 13 files changed, 33 insertions(+), 32 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
index 9c63e091eaf5..0dc217788fd0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteDeleteFromTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
 import org.apache.spark.sql.connector.catalog.{SupportsDeleteV2, 
SupportsRowLevelOperations, TruncatableTable}
 import org.apache.spark.sql.connector.write.{RowLevelOperationTable, 
SupportsDelta}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command.DELETE
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
ExtractV2Table}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
@@ -40,11 +40,11 @@ object RewriteDeleteFromTable extends 
RewriteRowLevelCommand {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case d @ DeleteFromTable(aliasedTable, cond) if d.resolved =>
       EliminateSubqueryAliases(aliasedTable) match {
-        case DataSourceV2Relation(_: TruncatableTable, _, _, _, _) if cond == 
TrueLiteral =>
+        case ExtractV2Table(_: TruncatableTable) if cond == TrueLiteral =>
           // don't rewrite as the table supports truncation
           d
 
-        case r @ DataSourceV2Relation(t: SupportsRowLevelOperations, _, _, _, 
_) =>
+        case r @ ExtractV2Table(t: SupportsRowLevelOperations) =>
           val table = buildOperationTable(t, DELETE, 
CaseInsensitiveStringMap.empty())
           table.operation match {
             case _: SupportsDelta =>
@@ -53,7 +53,7 @@ object RewriteDeleteFromTable extends RewriteRowLevelCommand {
               buildReplaceDataPlan(r, table, cond)
           }
 
-        case DataSourceV2Relation(_: SupportsDeleteV2, _, _, _, _) =>
+        case ExtractV2Table(_: SupportsDeleteV2) =>
           // don't rewrite as the table supports deletes only with filters
           d
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 9e67aa156fa2..8b5b690aa740 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
 import org.apache.spark.sql.connector.write.{RowLevelOperationTable, 
SupportsDelta}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command.MERGE
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
ExtractV2Table}
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -125,7 +125,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
       if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>
 
       EliminateSubqueryAliases(aliasedTable) match {
-        case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, 
_, _) =>
+        case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
           validateMergeIntoConditions(m)
           val table = buildOperationTable(tbl, MERGE, 
CaseInsensitiveStringMap.empty())
           table.operation match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
index b2955ca00687..a4453ae51734 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteUpdateTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.util.RowDeltaUtils._
 import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
 import org.apache.spark.sql.connector.write.{RowLevelOperationTable, 
SupportsDelta}
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
ExtractV2Table}
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -40,7 +40,7 @@ object RewriteUpdateTable extends RewriteRowLevelCommand {
         if u.resolved && u.rewritable && u.aligned =>
 
       EliminateSubqueryAliases(aliasedTable) match {
-        case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, 
_, _) =>
+        case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
           val table = buildOperationTable(tbl, UPDATE, 
CaseInsensitiveStringMap.empty())
           val updateCond = cond.getOrElse(TrueLiteral)
           table.operation match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 54a4e75c90c9..b95c4624b8c5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command.UPDATE
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation, ExtractV2Table}
 import org.apache.spark.sql.internal.SQLConf
 
 trait OperationHelper extends AliasHelper with PredicateHelper {
@@ -436,8 +436,7 @@ object GroupBasedRowLevelOperation {
   type ReturnType = (ReplaceData, Expression, Option[Expression], LogicalPlan)
 
   def unapply(plan: LogicalPlan): Option[ReturnType] = plan match {
-    case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _),
-        cond, query, _, _, groupFilterCond, _) =>
+    case rd @ ReplaceData(ExtractV2Table(table), cond, query, _, _, 
groupFilterCond, _) =>
       // group-based UPDATEs that are rewritten as UNION read the table twice
       val allowMultipleReads = rd.operation.command == UPDATE
       val readRelation = findReadRelation(table, query, allowMultipleReads)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index a5cba44aac6a..cd0c2742df3d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.connector.write.{DeltaWrite, 
RowLevelOperation, RowL
 import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, 
MERGE, UPDATE}
 import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
ExtractV2Table}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, 
DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, 
StructType}
 import org.apache.spark.util.ArrayImplicits._
@@ -263,7 +263,7 @@ case class ReplaceData(
 
   lazy val operation: RowLevelOperation = {
     EliminateSubqueryAliases(table) match {
-      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, 
_) =>
+      case ExtractV2Table(RowLevelOperationTable(_, operation)) =>
         operation
       case _ =>
         throw new AnalysisException(
@@ -345,7 +345,7 @@ case class WriteDelta(
 
   lazy val operation: SupportsDelta = {
     EliminateSubqueryAliases(table) match {
-      case DataSourceV2Relation(RowLevelOperationTable(_, operation), _, _, _, 
_) =>
+      case ExtractV2Table(RowLevelOperationTable(_, operation)) =>
         operation.asInstanceOf[SupportsDelta]
       case _ =>
         throw new AnalysisException(
@@ -834,7 +834,7 @@ case class UpdateTable(
 
   lazy val rewritable: Boolean = {
     EliminateSubqueryAliases(table) match {
-      case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => 
true
+      case ExtractV2Table(_: SupportsRowLevelOperations) => true
       case _ => false
     }
   }
@@ -878,7 +878,7 @@ case class MergeIntoTable(
 
   lazy val rewritable: Boolean = {
     EliminateSubqueryAliases(targetTable) match {
-      case DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _) => 
true
+      case ExtractV2Table(_: SupportsRowLevelOperations) => true
       case _ => false
     }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 26f406999494..180a14df865b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -231,6 +231,10 @@ case class StreamingDataSourceV2ScanRelation(
   override protected def stringArgs: Iterator[Any] = stringArgsVal.iterator
 }
 
+object ExtractV2Table {
+  def unapply(relation: DataSourceV2Relation): Option[Table] = 
Some(relation.table)
+}
+
 object DataSourceV2Relation {
   def create(
       table: Table,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
index b8ffa09dfa05..3f377531f91f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala
@@ -60,7 +60,7 @@ import 
org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
 import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelationWithTable
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation, FileTable}
+import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, 
ExtractV2Table, FileTable}
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.execution.stat.StatFunctions
 import org.apache.spark.sql.internal.SQLConf
@@ -1733,8 +1733,7 @@ class Dataset[T] private[sql](
         fr.inputFiles
       case r: HiveTableRelation =>
         r.tableMeta.storage.locationUri.map(_.toString).toArray
-      case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, 
_, _, _),
-          _, _, _, _) =>
+      case DataSourceV2ScanRelation(ExtractV2Table(table: FileTable), _, _, _, 
_) =>
         table.fileIndex.inputFiles
     }.flatten
     files.toSet.toArray
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index a8292a8dbaa3..a04486d36a64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -32,7 +32,7 @@ import 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.execution.datasources.{FileIndex, 
HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
ExtractV2Table, FileTable}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -431,7 +431,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
         case _ => false
       }
 
-      case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
+      case ExtractV2Table(fileTable: FileTable) =>
         refreshFileIndexIfNecessary(fileTable.fileIndex, fs, qualifiedPath)
 
       case _ => false
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2e47f08ac115..b1fcf6f4b3a7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
PushedDownOperators}
+import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, 
PushedDownOperators}
 import org.apache.spark.sql.execution.streaming.runtime.StreamingRelation
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources
@@ -319,8 +319,7 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
       i.copy(table = DDLUtils.readHiveTable(tableMeta))
 
     case append @ AppendData(
-        DataSourceV2Relation(
-          V1Table(table: CatalogTable), _, _, _, _), _, _, _, _, _) if 
!append.isByName =>
+        ExtractV2Table(V1Table(table: CatalogTable)), _, _, _, _, _) if 
!append.isByName =>
       InsertIntoStatement(UnresolvedCatalogRelation(table),
         table.partitionColumnNames.map(name => name -> None).toMap,
         Seq.empty, append.query, false, append.isByName)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index 8c7203bca625..60c459ecf540 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, 
FileTable}
 
 /**
  * Replace the File source V2 table in [[InsertIntoStatement]] to V1 
[[FileFormat]].
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
 class FallBackFileSourceV2(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case i @ InsertIntoStatement(
-        d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _, 
_) =>
+        d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _) =>
       val v1FileFormat = 
table.fallbackFileFormat.getDeclaredConstructor().newInstance()
       val relation = HadoopFsRelation(
         table.fileIndex,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index b07e0442d4f0..98ea63862ac2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -264,7 +264,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
             invalidateCache) :: Nil
       }
 
-    case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), 
_, _,
+    case AppendData(r @ ExtractV2Table(v1: SupportsWrite), _, _,
         _, Some(write), analyzedQuery) if 
v1.supports(TableCapability.V1_BATCH_WRITE) =>
       write match {
         case v1Write: V1Write =>
@@ -278,7 +278,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) =>
       AppendDataExec(planLater(query), refreshCache(r), write) :: Nil
 
-    case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, 
_, _, _), _, _,
+    case OverwriteByExpression(r @ ExtractV2Table(v1: SupportsWrite), _, _,
         _, _, Some(write), analyzedQuery) if 
v1.supports(TableCapability.V1_BATCH_WRITE) =>
       write match {
         case v1Write: V1Write =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 324fe148592a..796b14a08ec0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util.{DateFormatter, 
DateTimeUtils, TimeFor
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateTimeToMicros
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, 
FileTable}
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -1498,7 +1498,7 @@ class ParquetV2PartitionDiscoverySuite extends 
ParquetPartitionDiscoverySuite {
       (1 to 10).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(dir.getCanonicalPath)
       val queryExecution = 
spark.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
-        case DataSourceV2Relation(fileTable: FileTable, _, _, _, _) =>
+        case ExtractV2Table(fileTable: FileTable) =>
           assert(fileTable.fileIndex.partitionSpec() === 
PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a matching DataSourceV2Relation, but 
got:\n$queryExecution")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 067b0ca285d5..b0aa71a7e1b3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.stringToFile
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2Relation, FileScan, FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
ExtractV2Table, FileScan, FileTable}
 import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
 import org.apache.spark.sql.execution.streaming.runtime._
 import org.apache.spark.sql.execution.streaming.sinks.{FileStreamSink, 
FileStreamSinkLog, SinkFileStatus}
@@ -776,7 +776,7 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
     // Verify that MetadataLogFileIndex is being used and the correct 
partitioning schema has
     // been inferred
     val table = df.queryExecution.analyzed.collect {
-      case DataSourceV2Relation(table: FileTable, _, _, _, _) => table
+      case ExtractV2Table(table: FileTable) => table
     }
     assert(table.size === 1)
     assert(table.head.fileIndex.isInstanceOf[MetadataLogFileIndex])


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to