This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fef6888ea2 [spark] Introduce a base class for SparkTable to support 
multi-version APIs (#6421)
fef6888ea2 is described below

commit fef6888ea20637a2f52cb6a87112c8c773e24364
Author: Kerwin Zhang <[email protected]>
AuthorDate: Tue Oct 21 10:19:36 2025 +0800

    [spark] Introduce a base class for SparkTable to support multi-version APIs 
(#6421)
---
 .../paimon/spark/PaimonPartitionManagement.scala   |   2 +-
 ...SparkTable.scala => PaimonSparkTableBase.scala} |   5 +-
 .../scala/org/apache/paimon/spark/SparkTable.scala | 151 +--------------------
 3 files changed, 5 insertions(+), 153 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 4516c28359..b40812d4d7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -36,7 +36,7 @@ import java.util.{Map => JMap, Objects, UUID}
 import scala.collection.JavaConverters._
 
 trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
-  self: SparkTable =>
+  self: PaimonSparkTableBase =>
 
   lazy val partitionRowType: RowType = TypeUtils.project(table.rowType, 
table.partitionKeys)
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
similarity index 96%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 530cb40943..ff1cfc8705 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -25,7 +25,7 @@ import 
org.apache.paimon.spark.catalog.functions.BucketFunction
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
-import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable, 
KnownSplitsTable, Table}
+import org.apache.paimon.table.{Table, _}
 import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, 
POSTPONE_MODE}
 import org.apache.paimon.utils.StringUtils
 
@@ -41,8 +41,7 @@ import java.util.{Collections, EnumSet => JEnumSet, HashMap 
=> JHashMap, Map =>
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(table: Table)
+abstract class PaimonSparkTableBase(val table: Table)
   extends org.apache.spark.sql.connector.catalog.Table
   with SupportsRead
   with SupportsWrite
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 530cb40943..284426b615 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -18,154 +18,7 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.BucketFunctionType
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.catalog.functions.BucketFunction
-import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
-import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable, 
KnownSplitsTable, Table}
-import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, 
POSTPONE_MODE}
-import org.apache.paimon.utils.StringUtils
-
-import org.apache.spark.sql.connector.catalog._
-import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
-import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map 
=> JMap, Set => JSet}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
+import org.apache.paimon.table.Table
 
 /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(table: Table)
-  extends org.apache.spark.sql.connector.catalog.Table
-  with SupportsRead
-  with SupportsWrite
-  with SupportsMetadataColumns
-  with PaimonPartitionManagement {
-
-  lazy val coreOptions = new CoreOptions(table.options())
-
-  private lazy val useV2Write: Boolean = {
-    val v2WriteConfigured = OptionUtils.useV2Write()
-    v2WriteConfigured && supportsV2Write
-  }
-
-  private def supportsV2Write: Boolean = {
-    coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
-      table match {
-        case storeTable: FileStoreTable =>
-          storeTable.bucketMode() match {
-            case HASH_FIXED => BucketFunction.supportsTable(storeTable)
-            case BUCKET_UNAWARE | POSTPONE_MODE => true
-            case _ => false
-          }
-
-        case _ => false
-      }
-    } && coreOptions.clusteringColumns().isEmpty
-  }
-
-  def getTable: Table = table
-
-  override def name: String = table.fullName
-
-  override lazy val schema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType)
-
-  override def partitioning: Array[Transform] = {
-    table.partitionKeys().asScala.map(p => 
Expressions.identity(StringUtils.quote(p))).toArray
-  }
-
-  override def properties: JMap[String, String] = {
-    table match {
-      case dataTable: DataTable =>
-        val properties = new JHashMap[String, 
String](dataTable.coreOptions.toMap)
-        if (!table.primaryKeys.isEmpty) {
-          properties.put(CoreOptions.PRIMARY_KEY.key, String.join(",", 
table.primaryKeys))
-        }
-        properties.put(TableCatalog.PROP_PROVIDER, SparkSource.NAME)
-        if (table.comment.isPresent) {
-          properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
-        }
-        if (properties.containsKey(CoreOptions.PATH.key())) {
-          properties.put(TableCatalog.PROP_LOCATION, 
properties.get(CoreOptions.PATH.key()))
-        }
-        properties
-      case _ => Collections.emptyMap()
-    }
-  }
-
-  override def capabilities: JSet[TableCapability] = {
-    val capabilities = JEnumSet.of(
-      TableCapability.BATCH_READ,
-      TableCapability.OVERWRITE_BY_FILTER,
-      TableCapability.MICRO_BATCH_READ
-    )
-
-    if (useV2Write) {
-      capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
-      capabilities.add(TableCapability.BATCH_WRITE)
-      capabilities.add(TableCapability.OVERWRITE_DYNAMIC)
-    } else {
-      capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
-      capabilities.add(TableCapability.V1_BATCH_WRITE)
-    }
-
-    capabilities
-  }
-
-  override def metadataColumns: Array[MetadataColumn] = {
-    val partitionType = SparkTypeUtils.toSparkPartitionType(table)
-
-    val _metadataColumns = ArrayBuffer[MetadataColumn]()
-
-    if (coreOptions.rowTrackingEnabled()) {
-      _metadataColumns.append(PaimonMetadataColumn.ROW_ID)
-      _metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
-    }
-
-    _metadataColumns.appendAll(
-      Seq(
-        PaimonMetadataColumn.FILE_PATH,
-        PaimonMetadataColumn.ROW_INDEX,
-        PaimonMetadataColumn.PARTITION(partitionType),
-        PaimonMetadataColumn.BUCKET
-      ))
-
-    _metadataColumns.toArray
-  }
-
-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
-    table match {
-      case t: KnownSplitsTable =>
-        new PaimonSplitScanBuilder(t)
-      case _: InnerTable =>
-        new 
PaimonScanBuilder(table.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable])
-      case _ =>
-        throw new RuntimeException("Only InnerTable can be scanned.")
-    }
-  }
-
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    table match {
-      case fileStoreTable: FileStoreTable =>
-        val options = Options.fromMap(info.options)
-        if (useV2Write) {
-          new PaimonV2WriteBuilder(fileStoreTable, info.schema(), options)
-        } else {
-          new PaimonWriteBuilder(fileStoreTable, options)
-        }
-      case _ =>
-        throw new RuntimeException("Only FileStoreTable can be written.")
-    }
-  }
-
-  override def toString: String = {
-    s"${table.getClass.getSimpleName}[${table.fullName()}]"
-  }
-}
+case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table) {}

Reply via email to