This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ca6e718d5a [spark] Support Parquet format in COPY INTO (#8037)
ca6e718d5a is described below
commit ca6e718d5a77bbbea3ffa3643537cb0306a9dfa3
Author: Junrui Lee <[email protected]>
AuthorDate: Sat May 30 12:03:35 2026 +0800
[spark] Support Parquet format in COPY INTO (#8037)
---
docs/docs/spark/sql-write.md | 89 +++++-
.../PaimonSqlExtensions.g4 | 2 +
.../spark/catalyst/plans/logical/CopyOptions.scala | 32 +-
.../spark/execution/CopyIntoLocationExec.scala | 2 +
.../paimon/spark/execution/CopyIntoTableExec.scala | 232 ++++++++++++--
.../apache/paimon/spark/sql/CopyIntoTestBase.scala | 341 ++++++++++++++++++++-
6 files changed, 655 insertions(+), 43 deletions(-)
diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 449d8871ed..56899796ea 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -314,7 +314,7 @@ INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
## COPY INTO
-`COPY INTO` provides a SQL command for bulk loading data files into Paimon
tables and exporting table data to files. Supported formats: **CSV** and
**JSON**.
+`COPY INTO` provides a SQL command for bulk loading data files into Paimon
tables and exporting table data to files. Supported formats: **CSV**, **JSON**,
and **Parquet**.
### CSV Import
@@ -383,6 +383,37 @@ FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE);
JSON columns are matched **by column name** (not by position), so source field
order does not matter.
+### Parquet Import
+
+```sql
+COPY INTO table_name [(col1, col2, ...)]
+FROM 'source_path'
+FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
+[PATTERN = 'regex']
+[FORCE = TRUE|FALSE]
+[ON_ERROR = ABORT_STATEMENT]
+```
+
+**Basic import:**
+
+```sql
+COPY INTO my_db.my_table
+FROM '/data/parquet_files/'
+FILE_FORMAT = (TYPE = PARQUET);
+```
+
+**Import with PATTERN:**
+
+```sql
+COPY INTO my_db.events
+FROM '/data/lake/'
+FILE_FORMAT = (TYPE = PARQUET)
+PATTERN = '.*\.parquet'
+FORCE = FALSE;
+```
+
+Parquet columns are matched **by column name** (not by position). Extra
columns in the source files are ignored; missing columns become NULL.
+
### Write CSV Files
```sql
@@ -419,15 +450,42 @@ FILE_FORMAT = (TYPE = JSON)
OVERWRITE = TRUE;
```
+### Write Parquet Files
+
+```sql
+COPY INTO 'target_path'
+FROM table_name
+FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
+[OVERWRITE = TRUE|FALSE]
+```
+
+**Basic Parquet export:**
+
+```sql
+COPY INTO '/export/data_backup/'
+FROM my_db.events
+FILE_FORMAT = (TYPE = PARQUET)
+OVERWRITE = TRUE;
+```
+
+**Export with compression:**
+
+```sql
+COPY INTO '/export/data_compressed/'
+FROM my_db.events
+FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
+OVERWRITE = TRUE;
+```
+
### FILE_FORMAT Options
-`FILE_FORMAT` is required and must include `TYPE = CSV` or `TYPE = JSON`.
+`FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or
`TYPE = PARQUET`.
**CSV import options:**
| Option | Description | Default |
|--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
| QUOTE | Quote character for enclosing fields. | `"` |
@@ -440,17 +498,24 @@ OVERWRITE = TRUE;
| Option | Description | Default |
|--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| MULTI_LINE | Parse multi-line JSON (e.g. JSON arrays or pretty-printed
objects). | `FALSE` |
| NULL_IF | List of string values to interpret as NULL. | (none) |
| EMPTY_FIELD_AS_NULL | Treat empty string values as NULL. | `FALSE` |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+**Parquet import options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
+| COMPRESSION | Compression codec. Usually auto-detected; rarely needed for
import. | (auto) |
+
**CSV write options:**
| Option | Description | Default |
|--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
| QUOTE | Quote character for enclosing fields. | `"` |
@@ -461,11 +526,18 @@ OVERWRITE = TRUE;
| Option | Description | Default |
|--------|-------------|---------|
-| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
| DATE_FORMAT | Custom date format pattern. | Spark default |
| TIMESTAMP_FORMAT | Custom timestamp format pattern. | Spark default |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+**Parquet write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV`, `JSON`, or `PARQUET`. | (required) |
+| COMPRESSION | Compression codec (`SNAPPY`, `GZIP`, `NONE`, etc.). | `SNAPPY`
|
+
### Import Options
| Option | Description | Default |
@@ -486,7 +558,8 @@ When an explicit column list is provided (e.g., `COPY INTO
t (col1, col2) FROM .
- **CSV**: Columns are mapped **positionally** to the specified column list.
- **JSON**: Columns are matched **by name** to the specified column list.
-- The number of source columns must match the column list length (CSV). For
JSON, missing fields in the source become NULL.
+- **Parquet**: Columns are matched **by name** to the specified column list.
+- The number of source columns must match the column list length (CSV). For
JSON and Parquet, missing fields in the source become NULL.
- Columns not in the list are filled with their **DEFAULT value** (if defined
in the table schema) or **NULL**.
- Non-nullable columns without a default value that are not in the list will
cause an error.
@@ -524,7 +597,7 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files
have been successfull
### Limitations
-- Only **CSV** and **JSON** formats are supported.
+- Only **CSV**, **JSON**, and **Parquet** formats are supported.
- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not
supported.
- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the
entire command.
- `SINGLE = TRUE` (single-file output) is not supported.
diff --git
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index f87884634a..1bbbca478b 100644
---
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -206,6 +206,7 @@ nonReserved
| COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR |
ABORT_STATEMENT | OVERWRITE
| CSV
| JSON
+ | PARQUET
;
ALTER: 'ALTER';
@@ -248,6 +249,7 @@ ABORT_STATEMENT: 'ABORT_STATEMENT';
OVERWRITE: 'OVERWRITE';
CSV: 'CSV';
JSON: 'JSON';
+PARQUET: 'PARQUET';
PLUS: '+';
MINUS: '-';
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
index ae5d56eaea..4730307193 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -23,6 +23,7 @@ sealed trait FileFormatType
object FileFormatType {
case object CSV extends FileFormatType
case object JSON extends FileFormatType
+ case object PARQUET extends FileFormatType
case class Unsupported(name: String) extends FileFormatType
}
@@ -53,6 +54,15 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
case _ =>
}
}
+ case FileFormatType.PARQUET =>
+ mapped.remove("mode")
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "COMPRESSION" => mapped("compression") = v
+ case _ =>
+ }
+ }
case _ =>
}
mapped.toMap
@@ -83,6 +93,14 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
case _ =>
}
}
+ case FileFormatType.PARQUET =>
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "COMPRESSION" => mapped("compression") = v
+ case _ =>
+ }
+ }
case _ =>
}
mapped.toMap
@@ -108,6 +126,7 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
}
val validKeys = formatType match {
case FileFormatType.JSON => CopyFileFormat.VALID_JSON_IMPORT_KEYS
+ case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_IMPORT_KEYS
case _ => CopyFileFormat.VALID_CSV_IMPORT_KEYS
}
val invalid = options.keys.filterNot(validKeys.contains)
@@ -132,6 +151,7 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
validateFormatType()
val validKeys = formatType match {
case FileFormatType.JSON => CopyFileFormat.VALID_JSON_EXPORT_KEYS
+ case FileFormatType.PARQUET => CopyFileFormat.VALID_PARQUET_EXPORT_KEYS
case _ => CopyFileFormat.VALID_CSV_EXPORT_KEYS
}
val invalid = options.keys.filterNot(validKeys.contains)
@@ -145,9 +165,10 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
formatType match {
case FileFormatType.CSV =>
case FileFormatType.JSON =>
+ case FileFormatType.PARQUET =>
case FileFormatType.Unsupported(name) =>
throw new IllegalArgumentException(
- s"Unsupported file format type: $name. Supported types: CSV, JSON")
+ s"Unsupported file format type: $name. Supported types: CSV, JSON,
PARQUET")
}
}
}
@@ -185,6 +206,14 @@ object CopyFileFormat {
"TIMESTAMP_FORMAT"
)
+ val VALID_PARQUET_IMPORT_KEYS: Set[String] = Set(
+ "COMPRESSION"
+ )
+
+ val VALID_PARQUET_EXPORT_KEYS: Set[String] = Set(
+ "COMPRESSION"
+ )
+
// Unit Separator (U+001F) used to encode multi-value lists in a single
string
val LIST_SEPARATOR: String = "\u001f"
@@ -192,6 +221,7 @@ object CopyFileFormat {
typeStr.toUpperCase match {
case "CSV" => FileFormatType.CSV
case "JSON" => FileFormatType.JSON
+ case "PARQUET" => FileFormatType.PARQUET
case other => FileFormatType.Unsupported(other)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
index 69387a5071..45471bab84 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -54,6 +54,8 @@ case class CopyIntoLocationExec(
fileFormat.formatType match {
case FileFormatType.JSON =>
df.write.options(writerOptions).mode(saveMode).json(targetPath)
+ case FileFormatType.PARQUET =>
+ df.write.options(writerOptions).mode(saveMode).parquet(targetPath)
case _ =>
df.write.options(writerOptions).mode(saveMode).csv(targetPath)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
index 4763d8c784..5ef05b4188 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.DataField
import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -48,7 +49,8 @@ case class CopyIntoTableExec(
pattern: Option[String],
force: Boolean,
out: Seq[Attribute])
- extends PaimonLeafV2CommandExec {
+ extends PaimonLeafV2CommandExec
+ with Logging {
override def output: Seq[Attribute] = out
@@ -72,9 +74,168 @@ case class CopyIntoTableExec(
}
val filePaths = filesToLoad.map(_.getPath.toString)
- val stringSchema = buildStringSchema(targetColumns)
val readerOptions = fileFormat.toSparkReaderOptions
+ fileFormat.formatType match {
+ case FileFormatType.PARQUET =>
+ runParquetImport(
+ paimonTable,
+ filePaths,
+ targetColumns,
+ writableColumns,
+ fields,
+ filesToLoad,
+ skippedFiles,
+ readerOptions)
+ case _ =>
+ runTextImport(
+ paimonTable,
+ filePaths,
+ targetColumns,
+ writableColumns,
+ fields,
+ filesToLoad,
+ skippedFiles,
+ readerOptions)
+ }
+ }
+
+ /**
+ * Parquet import pipeline. Unlike CSV/JSON which read as strings then cast,
Parquet files already
+ * have typed columns, so the flow is:
+ * 1. Read source Parquet with native types
+ * 2. Project and cast columns to match target table schema (by column
name, not position)
+ * 3. Validate that no non-null values become null after casting (detect
type incompatibility)
+ * 4. Write to Paimon table
+ * 5. Record load history for idempotent re-runs (FORCE=FALSE dedup)
+ */
+ private def runParquetImport(
+ paimonTable: FileStoreTable,
+ filePaths: Array[String],
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField],
+ filesToLoad: Array[FileStatus],
+ skippedFiles: Array[FileStatus],
+ readerOptions: Map[String, String]): Seq[InternalRow] = {
+ val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+
+ val selectedDf = buildParquetDataFrame(rawDf, targetColumns,
writableColumns, fields)
+ validateParquetCast(rawDf, targetColumns, writableColumns, fields)
+
+ val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+ selectedDf.write.format("paimon").mode("append").insertInto(tableName)
+
+ val countDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+ recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles,
countDf)
+ }
+
+ /**
+ * Build the projection DataFrame for Parquet import. Maps source columns to
target table columns
+ * by name (case-insensitive). For each writable column:
+ * - If it's in targetColumns AND exists in source: cast source column to
target type
+ * - If it's in targetColumns but missing from source: fill with NULL
+ * - If it's NOT in targetColumns (unmapped): fill with default value or
NULL
+ */
+ private def buildParquetDataFrame(
+ rawDf: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): DataFrame = {
+ val resolver = spark.sessionState.conf.resolver
+ val sourceColumns = rawDf.columns.toSeq
+
+ val selectExprs: Seq[Column] = writableColumns.map {
+ colName =>
+ if (targetColumns.exists(tc => resolver(tc, colName))) {
+ val srcCol = sourceColumns.find(s => resolver(s, colName))
+ srcCol match {
+ case Some(s) =>
+ val field = fields.find(_.name() == colName).get
+ val sparkType =
+
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ col(s).cast(sparkType).as(colName)
+ case None =>
+ val field = fields.find(_.name() == colName).get
+ val sparkType =
+
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ lit(null).cast(sparkType).as(colName)
+ }
+ } else {
+ resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+ }
+ }
+ rawDf.select(selectExprs: _*)
+ }
+
+ /**
+ * Validate that casting Parquet source columns to target types does not
silently lose data.
+ * Detection strategy: if a source value is non-null but becomes null after
casting, the cast
+ * failed (e.g., a string "abc" cast to IntegerType → null). Aborts
immediately on first failure.
+ */
+ private def validateParquetCast(
+ rawDf: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): Unit = {
+ val resolver = spark.sessionState.conf.resolver
+ val sourceColumns = rawDf.columns.toSeq
+
+ val castCheckCols = ArrayBuffer[(String, String)]()
+ var validationDf = rawDf
+
+ writableColumns.zip(fields).foreach {
+ case (colName, field) =>
+ if (targetColumns.exists(tc => resolver(tc, colName))) {
+ sourceColumns.find(s => resolver(s, colName)).foreach {
+ srcColName =>
+ val sparkType =
+
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ val castColName = s"__pq_cv_$colName"
+ validationDf = validationDf.withColumn(castColName,
col(srcColName).cast(sparkType))
+ castCheckCols += ((srcColName, castColName))
+ }
+ }
+ }
+
+ if (castCheckCols.nonEmpty) {
+ val badCastFilter = castCheckCols
+ .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
+ .reduce(_ || _)
+ val badRows = validationDf.filter(badCastFilter).limit(1).collect()
+ if (badRows.nonEmpty) {
+ val example = castCheckCols.find {
+ case (src, dst) =>
+ val row = badRows(0)
+ val srcIdx = validationDf.schema.fieldIndex(src)
+ val dstIdx = validationDf.schema.fieldIndex(dst)
+ !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+ }
+ throw new IllegalArgumentException(
+ s"ON_ERROR = ABORT_STATEMENT: Cast failure in column
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that
cannot be converted to the target type.")
+ }
+ }
+ }
+
+ /**
+ * Text-based (CSV/JSON) import pipeline. Reads all columns as strings
first, then:
+ * 1. Rename positional columns (CSV) or keep named columns (JSON)
+ * 2. Fill unmapped columns with default values
+ * 3. Cast all string columns to target types with validation
+ * 4. Write to Paimon table
+ * 5. Record load history
+ */
+ private def runTextImport(
+ paimonTable: FileStoreTable,
+ filePaths: Array[String],
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField],
+ filesToLoad: Array[FileStatus],
+ skippedFiles: Array[FileStatus],
+ readerOptions: Map[String, String]): Seq[InternalRow] = {
+ val stringSchema = buildStringSchema(targetColumns)
+
val sourceDf = readSourceData(filePaths, stringSchema, readerOptions)
val finalDf =
buildFinalDataFrame(sourceDf, targetColumns, writableColumns, fields)
@@ -83,13 +244,13 @@ case class CopyIntoTableExec(
val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
castedDf.write.format("paimon").mode("append").insertInto(tableName)
- recordHistoryAndBuildResults(
- paimonTable,
- filesToLoad,
- skippedFiles,
- filePaths,
- stringSchema,
- readerOptions)
+ val countDf = fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ spark.read.options(readerOptions).schema(stringSchema).json(filePaths:
_*)
+ case _ =>
+ spark.read.options(readerOptions).schema(stringSchema).csv(filePaths:
_*)
+ }
+ recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles,
countDf)
}
private def buildStringSchema(targetColumns: Seq[String]): StructType = {
@@ -229,20 +390,7 @@ case class CopyIntoTableExec(
if (targetColumns.contains(colName)) {
col(colName)
} else {
- val field = fields.find(_.name() == colName).get
- val defaultVal = field.defaultValue()
- if (defaultVal != null) {
- val sparkType =
-
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
- try {
- val parsed =
spark.sessionState.sqlParser.parseExpression(defaultVal)
-
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
- } catch {
- case _: Exception => lit(null).cast(sparkType).as(colName)
- }
- } else {
- lit(null).as(colName)
- }
+ resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
}
}
renamedDf.select(selectExprs: _*)
@@ -294,25 +442,22 @@ case class CopyIntoTableExec(
castedDf
}
+ /**
+ * Record successfully loaded files to load history (for FORCE=FALSE
idempotent dedup), and build
+ * the result rows showing per-file load status. Accepts a pre-built countDf
that will be grouped
+ * by input_file_name() to get per-file row counts — this allows both
Parquet and text paths to
+ * share the same logic.
+ */
private def recordHistoryAndBuildResults(
paimonTable: FileStoreTable,
filesToLoad: Array[FileStatus],
skippedFiles: Array[FileStatus],
- filePaths: Array[String],
- stringSchema: StructType,
- readerOptions: Map[String, String]): Seq[InternalRow] = {
+ countDf: DataFrame): Seq[InternalRow] = {
val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
val loadedAt = System.currentTimeMillis()
- val countDf = fileFormat.formatType match {
- case FileFormatType.JSON =>
- spark.read.options(readerOptions).schema(stringSchema).json(filePaths:
_*)
- case _ =>
- spark.read.options(readerOptions).schema(stringSchema).csv(filePaths:
_*)
- }
-
val rowCounts = countDf
.groupBy(input_file_name().as("file"))
.count()
@@ -361,4 +506,25 @@ case class CopyIntoTableExec(
0L)
}.toSeq
}
+
+ /** Resolve the default value expression for a column not populated from
source data. */
+ private def resolveDefaultColumn(field: DataField, colName: String): Column
= {
+ val defaultVal = field.defaultValue()
+ if (defaultVal != null) {
+ val sparkType =
+ org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ try {
+ val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal)
+
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
+ } catch {
+ case e: Exception =>
+ logWarning(
+ s"Failed to parse default value '$defaultVal' for column
'$colName': " +
+ s"${e.getMessage}. Using null instead.")
+ lit(null).cast(sparkType).as(colName)
+ }
+ } else {
+ lit(null).as(colName)
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index aa764bd30e..e0970362a0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -234,7 +234,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
val e = intercept[IllegalArgumentException] {
spark.sql(s"""COPY INTO $dbName0.copy_unsup
|FROM '${dir.getAbsolutePath}'
- |FILE_FORMAT = (TYPE = PARQUET)
+ |FILE_FORMAT = (TYPE = ORC)
|""".stripMargin)
}
assert(
@@ -1083,4 +1083,343 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
}
+
+ // ========== Parquet Tests ==========
+
+ private def withParquetDir(testBody: File => Unit): Unit = {
+ val dir = Files.createTempDirectory("copy_into_parquet_test").toFile
+ try testBody(dir)
+ finally deleteRecursively(dir)
+ }
+
+ private def createParquetFile(
+ dir: File,
+ name: String,
+ data: Seq[Row],
+ schema: org.apache.spark.sql.types.StructType): Unit = {
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
+ df.coalesce(1).write.parquet(new File(dir, name).getAbsolutePath)
+ }
+
+ private def createParquetSingleFile(
+ dir: File,
+ fileName: String,
+ data: Seq[Row],
+ schema: org.apache.spark.sql.types.StructType): Unit = {
+ val tmpDir = new File(dir, s"_tmp_$fileName")
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data),
schema)
+ df.coalesce(1).write.parquet(tmpDir.getAbsolutePath)
+ val partFile = tmpDir.listFiles().find(_.getName.endsWith(".parquet")).get
+ partFile.renameTo(new File(dir, fileName))
+ deleteRecursively(tmpDir)
+ }
+
+ test("COPY INTO: basic Parquet import") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_basic")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_basic (id INT, name STRING,
age INT)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema = StructType(
+ Seq(
+ StructField("id", IntegerType),
+ StructField("name", StringType),
+ StructField("age", IntegerType)))
+ createParquetFile(dir, "data", Seq(Row(1, "Alice", 30), Row(2, "Bob",
25)), schema)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_basic
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_basic ORDER BY id"),
+ Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_basic")
+ }
+
+ test("COPY INTO: Parquet column name matching ignores order") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_order")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_order (id INT, name STRING,
age INT)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema = StructType(
+ Seq(
+ StructField("age", IntegerType),
+ StructField("name", StringType),
+ StructField("id", IntegerType)))
+ createParquetFile(dir, "data", Seq(Row(30, "Alice", 1), Row(25, "Bob",
2)), schema)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_order
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_order ORDER BY id"),
+ Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_order")
+ }
+
+ test("COPY INTO: Parquet with explicit column list") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_cols")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_cols (id INT, name STRING,
age INT)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema =
+ StructType(Seq(StructField("id", IntegerType), StructField("name",
StringType)))
+ createParquetFile(dir, "data", Seq(Row(1, "Alice"), Row(2, "Bob")),
schema)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_cols (id, name)
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_cols ORDER BY id"),
+ Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_cols")
+ }
+
+ test("COPY INTO: Parquet export") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_export")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_export (id INT, name
STRING)")
+ spark.sql(s"INSERT INTO $dbName0.copy_parquet_export VALUES (1, 'Alice'),
(2, 'Bob')")
+
+ withParquetDir {
+ dir =>
+ val outputPath = new File(dir, "output").getAbsolutePath
+ spark.sql(s"""COPY INTO '$outputPath'
+ |FROM $dbName0.copy_parquet_export
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ val readBack = spark.read.parquet(outputPath)
+ checkAnswer(readBack.orderBy("id"), Seq(Row(1, "Alice"), Row(2,
"Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_export")
+ }
+
+ test("COPY INTO: Parquet export with COMPRESSION") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_compress")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_compress (id INT, name
STRING)")
+ spark.sql(s"INSERT INTO $dbName0.copy_parquet_compress VALUES (1,
'Alice'), (2, 'Bob')")
+
+ withParquetDir {
+ dir =>
+ val outputPath = new File(dir, "output").getAbsolutePath
+ spark.sql(s"""COPY INTO '$outputPath'
+ |FROM $dbName0.copy_parquet_compress
+ |FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
+ |""".stripMargin)
+
+ val readBack = spark.read.parquet(outputPath)
+ checkAnswer(readBack.orderBy("id"), Seq(Row(1, "Alice"), Row(2,
"Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_compress")
+ }
+
+ test("COPY INTO: Parquet export then import round-trip") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_src")
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_dst")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_rt_src (id INT, name
STRING, score DOUBLE)")
+ spark.sql(
+ s"INSERT INTO $dbName0.copy_parquet_rt_src VALUES (1, 'Alice', 95.5),
(2, 'Bob', 87.3)")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_rt_dst (id INT, name
STRING, score DOUBLE)")
+
+ withParquetDir {
+ dir =>
+ val outputPath = new File(dir, "export").getAbsolutePath
+ spark.sql(s"""COPY INTO '$outputPath'
+ |FROM $dbName0.copy_parquet_rt_src
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_rt_dst
+ |FROM '$outputPath'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_rt_dst ORDER BY id"),
+ Seq(Row(1, "Alice", 95.5), Row(2, "Bob", 87.3)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_src")
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_rt_dst")
+ }
+
+ test("COPY INTO: Parquet extra fields are ignored") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_extra")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_extra (id INT, name
STRING)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema = StructType(
+ Seq(
+ StructField("id", IntegerType),
+ StructField("name", StringType),
+ StructField("extra", StringType)))
+ createParquetFile(
+ dir,
+ "data",
+ Seq(Row(1, "Alice", "ignore"), Row(2, "Bob", "ignore")),
+ schema)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_extra
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_extra ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_extra")
+ }
+
+ test("COPY INTO: Parquet missing fields become null") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_missing")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_missing (id INT, name
STRING, age INT)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema =
+ StructType(Seq(StructField("id", IntegerType), StructField("name",
StringType)))
+ createParquetFile(dir, "data", Seq(Row(1, "Alice"), Row(2, "Bob")),
schema)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_missing
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_missing ORDER BY
id"),
+ Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_missing")
+ }
+
+ test("COPY INTO: Parquet FORCE=FALSE skips already-loaded files") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_force")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_force (id INT, name
STRING)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema =
+ StructType(Seq(StructField("id", IntegerType), StructField("name",
StringType)))
+ createParquetFile(dir, "data", Seq(Row(1, "Alice")), schema)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_force
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_force
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |FORCE = FALSE
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_force ORDER BY id"),
+ Seq(Row(1, "Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_force")
+ }
+
+ test("COPY INTO: Parquet PATTERN filters files") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_pattern")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_pattern (id INT, name
STRING)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema =
+ StructType(Seq(StructField("id", IntegerType), StructField("name",
StringType)))
+ createParquetSingleFile(dir, "include_data.parquet", Seq(Row(1,
"Alice")), schema)
+ createParquetSingleFile(dir, "exclude_data.parquet", Seq(Row(2,
"Bob")), schema)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_pattern
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |PATTERN = 'include.*'
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_parquet_pattern ORDER BY
id"),
+ Seq(Row(1, "Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_pattern")
+ }
+
+ test("COPY INTO: Parquet unsupported option errors") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_opt_err")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_opt_err (id INT, name
STRING)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema =
+ StructType(Seq(StructField("id", IntegerType), StructField("name",
StringType)))
+ createParquetFile(dir, "data", Seq(Row(1, "Alice")), schema)
+
+ intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO $dbName0.copy_parquet_opt_err
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET, FIELD_DELIMITER = ',')
+ |""".stripMargin)
+ }
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_opt_err")
+ }
+
+ test("COPY INTO: Parquet rows_loaded count is accurate") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count")
+ spark.sql(s"CREATE TABLE $dbName0.copy_parquet_count (id INT, name
STRING)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema =
+ StructType(Seq(StructField("id", IntegerType), StructField("name",
StringType)))
+ createParquetFile(
+ dir,
+ "data",
+ Seq(Row(1, "Alice"), Row(2, "Bob"), Row(3, "Charlie")),
+ schema)
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_parquet_count
+ |FROM '${dir.getAbsolutePath}/data'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ val totalLoaded = rows.map(_.getLong(2)).sum
+ assert(totalLoaded == 3)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count")
+ }
}