This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ca6e718d5a [spark] Support Parquet format in COPY INTO (#8037)
ca6e718d5a is described below

commit ca6e718d5a77bbbea3ffa3643537cb0306a9dfa3
Author: Junrui Lee <[email protected]>
AuthorDate: Sat May 30 12:03:35 2026 +0800

    [spark] Support Parquet format in COPY INTO (#8037)
---
 docs/docs/spark/sql-write.md                       |  89 +++++-
 .../PaimonSqlExtensions.g4                         |   2 +
 .../spark/catalyst/plans/logical/CopyOptions.scala |  32 +-
 .../spark/execution/CopyIntoLocationExec.scala     |   2 +
 .../paimon/spark/execution/CopyIntoTableExec.scala | 232 ++++++++++++--
 .../apache/paimon/spark/sql/CopyIntoTestBase.scala | 341 ++++++++++++++++++++-
 6 files changed, 655 insertions(+), 43 deletions(-)

diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 449d8871ed..56899796ea 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -314,7 +314,7 @@ INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
 
 ## COPY INTO
 
-`COPY INTO` provides a SQL command for bulk loading data files into Paimon 
tables and exporting table data to files. Supported formats: **CSV** and 
**JSON**.
+`COPY INTO` provides a SQL command for bulk loading data files into Paimon 
tables and exporting table data to files. Supported formats: **CSV**, **JSON**, 
and **Parquet**.
 
 ### CSV Import
 
@@ -383,6 +383,37 @@ FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE);
 
 JSON columns are matched **by column name** (not by position), so source field 
order does not matter.
 
+### Parquet Import
+
+```sql
+COPY INTO table_name [(col1, col2, ...)]
+FROM 'source_path'
+FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
+[PATTERN = 'regex']
+[FORCE = TRUE|FALSE]
+[ON_ERROR = ABORT_STATEMENT]
+```
+
+**Basic import:**
+
+```sql
+COPY INTO my_db.my_table
+FROM '/data/parquet_files/'
+FILE_FORMAT = (TYPE = PARQUET);
+```
+
+**Import with PATTERN:**
+
+```sql
+COPY INTO my_db.events
+FROM '/data/lake/'
+FILE_FORMAT = (TYPE = PARQUET)
+PATTERN = '.*\.parquet'
+FORCE = FALSE;
+```
+
+Parquet columns are matched **by column name** (not by position). Extra 
columns in the source files are ignored; missing columns become NULL.
+
 ### Write CSV Files
 
 ```sql
@@ -419,15 +450,42 @@ FILE_FORMAT = (TYPE = JSON)
 OVERWRITE = TRUE;
 ```
 
+### Write Parquet Files
+
+```sql
+COPY INTO 'target_path'
+FROM table_name
+FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
+[OVERWRITE = TRUE|FALSE]
+```
+
+**Basic Parquet export:**
+
+```sql
+COPY INTO '/export/data_backup/'
+FROM my_db.events
+FILE_FORMAT = (TYPE = PARQUET)
+OVERWRITE = TRUE;
+```
+
+**Export with compression:**
+
+```sql
+COPY INTO '/export/data_compressed/'
+FROM my_db.events
+FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
+OVERWRITE = TRUE;
+```
+
 ### FILE_FORMAT Options
 
-`FILE_FORMAT` is required and must include `TYPE = CSV` or `TYPE = JSON`.
+`FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or 
`TYPE = PARQUET`.
 
 **CSV import options:**
 
 | Option | Description | Default |
 |--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
 | FIELD_DELIMITER | Column delimiter character. | `,` |
 | SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
 | QUOTE | Quote character for enclosing fields. | `"` |
@@ -440,17 +498,24 @@ OVERWRITE = TRUE;
 
 | Option | Description | Default |
 |--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
 | MULTI_LINE | Parse multi-line JSON (e.g. JSON arrays or pretty-printed 
objects). | `FALSE` |
 | NULL_IF | List of string values to interpret as NULL. | (none) |
 | EMPTY_FIELD_AS_NULL | Treat empty string values as NULL. | `FALSE` |
 | COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
 
+**Parquet import options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
+| COMPRESSION | Compression codec. Usually auto-detected; rarely needed for 
import. | (auto) |
+
 **CSV write options:**
 
 | Option | Description | Default |
 |--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
 | FIELD_DELIMITER | Column delimiter character. | `,` |
 | HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
 | QUOTE | Quote character for enclosing fields. | `"` |
@@ -461,11 +526,18 @@ OVERWRITE = TRUE;
 
 | Option | Description | Default |
 |--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
 | DATE_FORMAT | Custom date format pattern. | Spark default |
 | TIMESTAMP_FORMAT | Custom timestamp format pattern. | Spark default |
 | COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
 
+**Parquet write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
+| COMPRESSION | Compression codec (`SNAPPY`, `GZIP`, `NONE`, etc.). | `SNAPPY` 
|
+
 ### Import Options
 
 | Option | Description | Default |
@@ -486,7 +558,8 @@ When an explicit column list is provided (e.g., `COPY INTO 
t (col1, col2) FROM .
 
 - **CSV**: Columns are mapped **positionally** to the specified column list.
 - **JSON**: Columns are matched **by name** to the specified column list.
-- The number of source columns must match the column list length (CSV). For 
JSON, missing fields in the source become NULL.
+- **Parquet**: Columns are matched **by name** to the specified column list.
+- The number of source columns must match the column list length (CSV). For 
JSON and Parquet, missing fields in the source become NULL.
 - Columns not in the list are filled with their **DEFAULT value** (if defined 
in the table schema) or **NULL**.
 - Non-nullable columns without a default value that are not in the list will 
cause an error.
 
@@ -524,7 +597,7 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files 
have been successfull
 
 ### Limitations
 
-- Only **CSV** and **JSON** formats are supported.
+- Only **CSV**, **JSON**, and **Parquet** formats are supported.
 - Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not 
supported.
 - `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the 
entire command.
 - `SINGLE = TRUE` (single-file output) is not supported.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index f87884634a..1bbbca478b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++ 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -206,6 +206,7 @@ nonReserved
     | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | 
ABORT_STATEMENT | OVERWRITE
     | CSV
     | JSON
+    | PARQUET
     ;
 
 ALTER: 'ALTER';
@@ -248,6 +249,7 @@ ABORT_STATEMENT: 'ABORT_STATEMENT';
 OVERWRITE: 'OVERWRITE';
 CSV: 'CSV';
 JSON: 'JSON';
+PARQUET: 'PARQUET';
 
 PLUS: '+';
 MINUS: '-';
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
index ae5d56eaea..4730307193 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -23,6 +23,7 @@ sealed trait FileFormatType
 object FileFormatType {
   case object CSV extends FileFormatType
   case object JSON extends FileFormatType
+  case object PARQUET extends FileFormatType
   case class Unsupported(name: String) extends FileFormatType
 }
 
@@ -53,6 +54,15 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
               case _ =>
             }
         }
+      case FileFormatType.PARQUET =>
+        mapped.remove("mode")
+        options.foreach {
+          case (k, v) =>
+            k match {
+              case "COMPRESSION" => mapped("compression") = v
+              case _ =>
+            }
+        }
       case _ =>
     }
     mapped.toMap
@@ -83,6 +93,14 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
               case _ =>
             }
         }
+      case FileFormatType.PARQUET =>
+        options.foreach {
+          case (k, v) =>
+            k match {
+              case "COMPRESSION" => mapped("compression") = v
+              case _ =>
+            }
+        }
       case _ =>
     }
     mapped.toMap
@@ -108,6 +126,7 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
     }
     val validKeys = formatType match {
       case FileFormatType.JSON => CopyFileFormat.VALID_JSON_IMPORT_KEYS
+      case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_IMPORT_KEYS
       case _ => CopyFileFormat.VALID_CSV_IMPORT_KEYS
     }
     val invalid = options.keys.filterNot(validKeys.contains)
@@ -132,6 +151,7 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
     validateFormatType()
     val validKeys = formatType match {
       case FileFormatType.JSON => CopyFileFormat.VALID_JSON_EXPORT_KEYS
+      case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_EXPORT_KEYS
       case _ => CopyFileFormat.VALID_CSV_EXPORT_KEYS
     }
     val invalid = options.keys.filterNot(validKeys.contains)
@@ -145,9 +165,10 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
     formatType match {
       case FileFormatType.CSV =>
       case FileFormatType.JSON =>
+      case FileFormatType.PARQUET =>
       case FileFormatType.Unsupported(name) =>
         throw new IllegalArgumentException(
-          s"Unsupported file format type: $name. Supported types: CSV, JSON")
+          s"Unsupported file format type: $name. Supported types: CSV, JSON, 
PARQUET")
     }
   }
 }
@@ -185,6 +206,14 @@ object CopyFileFormat {
     "TIMESTAMP_FORMAT"
   )
 
+  val VALID_PARQUET_IMPORT_KEYS: Set[String] = Set(
+    "COMPRESSION"
+  )
+
+  val VALID_PARQUET_EXPORT_KEYS: Set[String] = Set(
+    "COMPRESSION"
+  )
+
   // Unit Separator (U+001F) used to encode multi-value lists in a single 
string
   val LIST_SEPARATOR: String = "\u001f"
 
@@ -192,6 +221,7 @@ object CopyFileFormat {
     typeStr.toUpperCase match {
       case "CSV" => FileFormatType.CSV
       case "JSON" => FileFormatType.JSON
+      case "PARQUET" => FileFormatType.PARQUET
       case other => FileFormatType.Unsupported(other)
     }
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
index 69387a5071..45471bab84 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -54,6 +54,8 @@ case class CopyIntoLocationExec(
     fileFormat.formatType match {
       case FileFormatType.JSON =>
         df.write.options(writerOptions).mode(saveMode).json(targetPath)
+      case FileFormatType.PARQUET =>
+        df.write.options(writerOptions).mode(saveMode).parquet(targetPath)
       case _ =>
         df.write.options(writerOptions).mode(saveMode).csv(targetPath)
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
index 4763d8c784..5ef05b4188 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.DataField
 
 import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Column, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -48,7 +49,8 @@ case class CopyIntoTableExec(
     pattern: Option[String],
     force: Boolean,
     out: Seq[Attribute])
-  extends PaimonLeafV2CommandExec {
+  extends PaimonLeafV2CommandExec
+  with Logging {
 
   override def output: Seq[Attribute] = out
 
@@ -72,9 +74,168 @@ case class CopyIntoTableExec(
     }
 
     val filePaths = filesToLoad.map(_.getPath.toString)
-    val stringSchema = buildStringSchema(targetColumns)
     val readerOptions = fileFormat.toSparkReaderOptions
 
+    fileFormat.formatType match {
+      case FileFormatType.PARQUET =>
+        runParquetImport(
+          paimonTable,
+          filePaths,
+          targetColumns,
+          writableColumns,
+          fields,
+          filesToLoad,
+          skippedFiles,
+          readerOptions)
+      case _ =>
+        runTextImport(
+          paimonTable,
+          filePaths,
+          targetColumns,
+          writableColumns,
+          fields,
+          filesToLoad,
+          skippedFiles,
+          readerOptions)
+    }
+  }
+
+  /**
+   * Parquet import pipeline. Unlike CSV/JSON which read as strings then cast, 
Parquet files already
+   * have typed columns, so the flow is:
+   *   1. Read source Parquet with native types
+   *   2. Project and cast columns to match target table schema (by column 
name, not position)
+   *   3. Validate that no non-null values become null after casting (detect 
type incompatibility)
+   *   4. Write to Paimon table
+   *   5. Record load history for idempotent re-runs (FORCE=FALSE dedup)
+   */
+  private def runParquetImport(
+      paimonTable: FileStoreTable,
+      filePaths: Array[String],
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField],
+      filesToLoad: Array[FileStatus],
+      skippedFiles: Array[FileStatus],
+      readerOptions: Map[String, String]): Seq[InternalRow] = {
+    val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+
+    val selectedDf = buildParquetDataFrame(rawDf, targetColumns, 
writableColumns, fields)
+    validateParquetCast(rawDf, targetColumns, writableColumns, fields)
+
+    val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+    selectedDf.write.format("paimon").mode("append").insertInto(tableName)
+
+    val countDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+    recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles, 
countDf)
+  }
+
+  /**
+   * Build the projection DataFrame for Parquet import. Maps source columns to 
target table columns
+   * by name (case-insensitive). For each writable column:
+   *   - If it's in targetColumns AND exists in source: cast source column to 
target type
+   *   - If it's in targetColumns but missing from source: fill with NULL
+   *   - If it's NOT in targetColumns (unmapped): fill with default value or 
NULL
+   */
+  private def buildParquetDataFrame(
+      rawDf: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): DataFrame = {
+    val resolver = spark.sessionState.conf.resolver
+    val sourceColumns = rawDf.columns.toSeq
+
+    val selectExprs: Seq[Column] = writableColumns.map {
+      colName =>
+        if (targetColumns.exists(tc => resolver(tc, colName))) {
+          val srcCol = sourceColumns.find(s => resolver(s, colName))
+          srcCol match {
+            case Some(s) =>
+              val field = fields.find(_.name() == colName).get
+              val sparkType =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+              col(s).cast(sparkType).as(colName)
+            case None =>
+              val field = fields.find(_.name() == colName).get
+              val sparkType =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+              lit(null).cast(sparkType).as(colName)
+          }
+        } else {
+          resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+        }
+    }
+    rawDf.select(selectExprs: _*)
+  }
+
+  /**
+   * Validate that casting Parquet source columns to target types does not 
silently lose data.
+   * Detection strategy: if a source value is non-null but becomes null after 
casting, the cast
+   * failed (e.g., a string "abc" cast to IntegerType → null). Aborts 
immediately on first failure.
+   */
+  private def validateParquetCast(
+      rawDf: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): Unit = {
+    val resolver = spark.sessionState.conf.resolver
+    val sourceColumns = rawDf.columns.toSeq
+
+    val castCheckCols = ArrayBuffer[(String, String)]()
+    var validationDf = rawDf
+
+    writableColumns.zip(fields).foreach {
+      case (colName, field) =>
+        if (targetColumns.exists(tc => resolver(tc, colName))) {
+          sourceColumns.find(s => resolver(s, colName)).foreach {
+            srcColName =>
+              val sparkType =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+              val castColName = s"__pq_cv_$colName"
+              validationDf = validationDf.withColumn(castColName, 
col(srcColName).cast(sparkType))
+              castCheckCols += ((srcColName, castColName))
+          }
+        }
+    }
+
+    if (castCheckCols.nonEmpty) {
+      val badCastFilter = castCheckCols
+        .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
+        .reduce(_ || _)
+      val badRows = validationDf.filter(badCastFilter).limit(1).collect()
+      if (badRows.nonEmpty) {
+        val example = castCheckCols.find {
+          case (src, dst) =>
+            val row = badRows(0)
+            val srcIdx = validationDf.schema.fieldIndex(src)
+            val dstIdx = validationDf.schema.fieldIndex(dst)
+            !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+        }
+        throw new IllegalArgumentException(
+          s"ON_ERROR = ABORT_STATEMENT: Cast failure in column 
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that 
cannot be converted to the target type.")
+      }
+    }
+  }
+
+  /**
+   * Text-based (CSV/JSON) import pipeline. Reads all columns as strings 
first, then:
+   *   1. Rename positional columns (CSV) or keep named columns (JSON)
+   *   2. Fill unmapped columns with default values
+   *   3. Cast all string columns to target types with validation
+   *   4. Write to Paimon table
+   *   5. Record load history
+   */
+  private def runTextImport(
+      paimonTable: FileStoreTable,
+      filePaths: Array[String],
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField],
+      filesToLoad: Array[FileStatus],
+      skippedFiles: Array[FileStatus],
+      readerOptions: Map[String, String]): Seq[InternalRow] = {
+    val stringSchema = buildStringSchema(targetColumns)
+
     val sourceDf = readSourceData(filePaths, stringSchema, readerOptions)
     val finalDf =
       buildFinalDataFrame(sourceDf, targetColumns, writableColumns, fields)
@@ -83,13 +244,13 @@ case class CopyIntoTableExec(
     val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
     castedDf.write.format("paimon").mode("append").insertInto(tableName)
 
-    recordHistoryAndBuildResults(
-      paimonTable,
-      filesToLoad,
-      skippedFiles,
-      filePaths,
-      stringSchema,
-      readerOptions)
+    val countDf = fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        spark.read.options(readerOptions).schema(stringSchema).json(filePaths: 
_*)
+      case _ =>
+        spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: 
_*)
+    }
+    recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles, 
countDf)
   }
 
   private def buildStringSchema(targetColumns: Seq[String]): StructType = {
@@ -229,20 +390,7 @@ case class CopyIntoTableExec(
           if (targetColumns.contains(colName)) {
             col(colName)
           } else {
-            val field = fields.find(_.name() == colName).get
-            val defaultVal = field.defaultValue()
-            if (defaultVal != null) {
-              val sparkType =
-                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
-              try {
-                val parsed = 
spark.sessionState.sqlParser.parseExpression(defaultVal)
-                
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
-              } catch {
-                case _: Exception => lit(null).cast(sparkType).as(colName)
-              }
-            } else {
-              lit(null).as(colName)
-            }
+            resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
           }
       }
       renamedDf.select(selectExprs: _*)
@@ -294,25 +442,22 @@ case class CopyIntoTableExec(
     castedDf
   }
 
+  /**
+   * Record successfully loaded files to load history (for FORCE=FALSE 
idempotent dedup), and build
+   * the result rows showing per-file load status. Accepts a pre-built countDf 
that will be grouped
+   * by input_file_name() to get per-file row counts — this allows both 
Parquet and text paths to
+   * share the same logic.
+   */
   private def recordHistoryAndBuildResults(
       paimonTable: FileStoreTable,
       filesToLoad: Array[FileStatus],
       skippedFiles: Array[FileStatus],
-      filePaths: Array[String],
-      stringSchema: StructType,
-      readerOptions: Map[String, String]): Seq[InternalRow] = {
+      countDf: DataFrame): Seq[InternalRow] = {
     val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
     val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
     val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
     val loadedAt = System.currentTimeMillis()
 
-    val countDf = fileFormat.formatType match {
-      case FileFormatType.JSON =>
-        spark.read.options(readerOptions).schema(stringSchema).json(filePaths: 
_*)
-      case _ =>
-        spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: 
_*)
-    }
-
     val rowCounts = countDf
       .groupBy(input_file_name().as("file"))
       .count()
@@ -361,4 +506,25 @@ case class CopyIntoTableExec(
           0L)
     }.toSeq
   }
+
+  /** Resolve the default value expression for a column not populated from 
source data. */
+  private def resolveDefaultColumn(field: DataField, colName: String): Column 
= {
+    val defaultVal = field.defaultValue()
+    if (defaultVal != null) {
+      val sparkType =
+        org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+      try {
+        val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal)
+        
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
+      } catch {
+        case e: Exception =>
+          logWarning(
+            s"Failed to parse default value '$defaultVal' for column 
'$colName': " +
+              s"${e.getMessage}. Using null instead.")
+          lit(null).cast(sparkType).as(colName)
+      }
+    } else {
+      lit(null).as(colName)
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index aa764bd30e..e0970362a0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -234,7 +234,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
         val e = intercept[IllegalArgumentException] {
           spark.sql(s"""COPY INTO $dbName0.copy_unsup
                        |FROM '${dir.getAbsolutePath}'
-                       |FILE_FORMAT = (TYPE = PARQUET)
+                       |FILE_FORMAT = (TYPE = ORC)
                        |""".stripMargin)
         }
         assert(
@@ -1083,4 +1083,343 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
 
     spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
   }
+
+  // ========== Parquet Tests ==========
+
+  private def withParquetDir(testBody: File => Unit): Unit = {
+    val dir = Files.createTempDirectory("copy_into_parquet_test").toFile
+    try testBody(dir)
+    finally deleteRecursively(dir)
+  }
+
+  private def createParquetFile(
+      dir: File,
+      name: String,
+      data: Seq[Row],
+      schema: org.apache.spark.sql.types.StructType): Unit = {
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema)
+    df.coalesce(1).write.parquet(new File(dir, name).getAbsolutePath)
+  }
+
+  private def createParquetSingleFile(
+      dir: File,
+      fileName: String,
+      data: Seq[Row],
+      schema: org.apache.spark.sql.types.StructType): Unit = {
+    val tmpDir = new File(dir, s"_tmp_$fileName")
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(data), 
schema)
+    df.coalesce(1).write.parquet(tmpDir.getAbsolutePath)
+    val partFile = tmpDir.listFiles().find(_.getName.endsWith(".parquet")).get
+    partFile.renameTo(new File(dir, fileName))
+    deleteRecursively(tmpDir)
+  }
+
+  test("COPY INTO: basic Parquet import") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_basic")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_basic (id INT, name STRING, 
age INT)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema = StructType(
+          Seq(
+            StructField("id", IntegerType),
+            StructField("name", StringType),
+            StructField("age", IntegerType)))
+        createParquetFile(dir, "data", Seq(Row(1, "Alice", 30), Row(2, "Bob", 
25)), schema)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_basic
+                     |FROM '${dir.getAbsolutePath}/data'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_basic ORDER BY id"),
+          Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_basic")
+  }
+
+  test("COPY INTO: Parquet column name matching ignores order") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_order")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_order (id INT, name STRING, 
age INT)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema = StructType(
+          Seq(
+            StructField("age", IntegerType),
+            StructField("name", StringType),
+            StructField("id", IntegerType)))
+        createParquetFile(dir, "data", Seq(Row(30, "Alice", 1), Row(25, "Bob", 
2)), schema)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_order
+                     |FROM '${dir.getAbsolutePath}/data'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_order ORDER BY id"),
+          Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_order")
+  }
+
+  test("COPY INTO: Parquet with explicit column list") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_cols")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_cols (id INT, name STRING, 
age INT)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema =
+          StructType(Seq(StructField("id", IntegerType), StructField("name", 
StringType)))
+        createParquetFile(dir, "data", Seq(Row(1, "Alice"), Row(2, "Bob")), 
schema)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_cols (id, name)
+                     |FROM '${dir.getAbsolutePath}/data'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_cols ORDER BY id"),
+          Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_cols")
+  }
+
+  test("COPY INTO: Parquet export") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_export")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_export (id INT, name 
STRING)")
+    spark.sql(s"INSERT INTO $dbName0.copy_parquet_export VALUES (1, 'Alice'), 
(2, 'Bob')")
+
+    withParquetDir {
+      dir =>
+        val outputPath = new File(dir, "output").getAbsolutePath
+        spark.sql(s"""COPY INTO '$outputPath'
+                     |FROM $dbName0.copy_parquet_export
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        val readBack = spark.read.parquet(outputPath)
+        checkAnswer(readBack.orderBy("id"), Seq(Row(1, "Alice"), Row(2, 
"Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_export")
+  }
+
+  test("COPY INTO: Parquet export with COMPRESSION") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_compress")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_compress (id INT, name 
STRING)")
+    spark.sql(s"INSERT INTO $dbName0.copy_parquet_compress VALUES (1, 
'Alice'), (2, 'Bob')")
+
+    withParquetDir {
+      dir =>
+        val outputPath = new File(dir, "output").getAbsolutePath
+        spark.sql(s"""COPY INTO '$outputPath'
+                     |FROM $dbName0.copy_parquet_compress
+                     |FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
+                     |""".stripMargin)
+
+        val readBack = spark.read.parquet(outputPath)
+        checkAnswer(readBack.orderBy("id"), Seq(Row(1, "Alice"), Row(2, 
"Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_compress")
+  }
+
+  test("COPY INTO: Parquet export then import round-trip") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_src")
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_dst")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_rt_src (id INT, name 
STRING, score DOUBLE)")
+    spark.sql(
+      s"INSERT INTO $dbName0.copy_parquet_rt_src VALUES (1, 'Alice', 95.5), 
(2, 'Bob', 87.3)")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_rt_dst (id INT, name 
STRING, score DOUBLE)")
+
+    withParquetDir {
+      dir =>
+        val outputPath = new File(dir, "export").getAbsolutePath
+        spark.sql(s"""COPY INTO '$outputPath'
+                     |FROM $dbName0.copy_parquet_rt_src
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_rt_dst
+                     |FROM '$outputPath'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_rt_dst ORDER BY id"),
+          Seq(Row(1, "Alice", 95.5), Row(2, "Bob", 87.3)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_src")
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_dst")
+  }
+
+  test("COPY INTO: Parquet extra fields are ignored") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_extra")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_extra (id INT, name 
STRING)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema = StructType(
+          Seq(
+            StructField("id", IntegerType),
+            StructField("name", StringType),
+            StructField("extra", StringType)))
+        createParquetFile(
+          dir,
+          "data",
+          Seq(Row(1, "Alice", "ignore"), Row(2, "Bob", "ignore")),
+          schema)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_extra
+                     |FROM '${dir.getAbsolutePath}/data'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_extra ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_extra")
+  }
+
+  test("COPY INTO: Parquet missing fields become null") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_missing")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_missing (id INT, name 
STRING, age INT)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema =
+          StructType(Seq(StructField("id", IntegerType), StructField("name", 
StringType)))
+        createParquetFile(dir, "data", Seq(Row(1, "Alice"), Row(2, "Bob")), 
schema)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_missing
+                     |FROM '${dir.getAbsolutePath}/data'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_missing ORDER BY 
id"),
+          Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_missing")
+  }
+
+  test("COPY INTO: Parquet FORCE=FALSE skips already-loaded files") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_force")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_force (id INT, name 
STRING)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema =
+          StructType(Seq(StructField("id", IntegerType), StructField("name", 
StringType)))
+        createParquetFile(dir, "data", Seq(Row(1, "Alice")), schema)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_force
+                     |FROM '${dir.getAbsolutePath}/data'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |""".stripMargin)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_force
+                     |FROM '${dir.getAbsolutePath}/data'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |FORCE = FALSE
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_force ORDER BY id"),
+          Seq(Row(1, "Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_force")
+  }
+
+  test("COPY INTO: Parquet PATTERN filters files") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_pattern")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_pattern (id INT, name 
STRING)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema =
+          StructType(Seq(StructField("id", IntegerType), StructField("name", 
StringType)))
+        createParquetSingleFile(dir, "include_data.parquet", Seq(Row(1, 
"Alice")), schema)
+        createParquetSingleFile(dir, "exclude_data.parquet", Seq(Row(2, 
"Bob")), schema)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_parquet_pattern
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = PARQUET)
+                     |PATTERN = 'include.*'
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_parquet_pattern ORDER BY 
id"),
+          Seq(Row(1, "Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_pattern")
+  }
+
+  test("COPY INTO: Parquet unsupported option errors") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_opt_err")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_opt_err (id INT, name 
STRING)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema =
+          StructType(Seq(StructField("id", IntegerType), StructField("name", 
StringType)))
+        createParquetFile(dir, "data", Seq(Row(1, "Alice")), schema)
+
+        intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO $dbName0.copy_parquet_opt_err
+                       |FROM '${dir.getAbsolutePath}/data'
+                       |FILE_FORMAT = (TYPE = PARQUET, FIELD_DELIMITER = ',')
+                       |""".stripMargin)
+        }
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_opt_err")
+  }
+
+  test("COPY INTO: Parquet rows_loaded count is accurate") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count")
+    spark.sql(s"CREATE TABLE $dbName0.copy_parquet_count (id INT, name 
STRING)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema =
+          StructType(Seq(StructField("id", IntegerType), StructField("name", 
StringType)))
+        createParquetFile(
+          dir,
+          "data",
+          Seq(Row(1, "Alice"), Row(2, "Bob"), Row(3, "Charlie")),
+          schema)
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_parquet_count
+                                  |FROM '${dir.getAbsolutePath}/data'
+                                  |FILE_FORMAT = (TYPE = PARQUET)
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        val totalLoaded = rows.map(_.getLong(2)).sum
+        assert(totalLoaded == 3)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count")
+  }
 }


Reply via email to