This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5adb6c9d4c [spark] Support ON_ERROR = CONTINUE / SKIP_FILE in COPY
INTO (#8062)
5adb6c9d4c is described below
commit 5adb6c9d4cb7dfa102c4dd3da2d0e5de0674d75e
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Jun 2 23:27:27 2026 +0800
[spark] Support ON_ERROR = CONTINUE / SKIP_FILE in COPY INTO (#8062)
This is part of #8005.
COPY INTO previously only supported `ON_ERROR = ABORT_STATEMENT`: any
parse or
cast error aborted the entire command. In production data-loading
pipelines a
single malformed row or file would then fail the whole batch, which is
often
too strict. This adds two error-tolerant modes:
- `CONTINUE` — skip bad rows and load the rest (row-level tolerance).
- `SKIP_FILE` — skip any file that contains an error, all-or-nothing per
file.
`ABORT_STATEMENT` remains the default, so existing behavior is
unchanged.
---
docs/docs/spark/sql-write.md | 18 +-
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 2 +-
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 2 +-
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 2 +-
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 2 +-
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 2 +-
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 2 +-
.../PaimonSqlExtensions.g4 | 6 +-
.../plans/logical/CopyIntoTableCommand.scala | 7 +-
.../spark/catalyst/plans/logical/CopyOptions.scala | 8 +
.../spark/copyinto/CopyIntoResultBuilder.scala | 145 +++++
.../spark/execution/CopyIntoCastValidator.scala | 196 +++++++
.../spark/execution/CopyIntoDataFrameBuilder.scala | 187 +++++++
.../spark/execution/CopyIntoErrorHandler.scala | 276 ++++++++++
.../paimon/spark/execution/CopyIntoHelper.scala | 138 +++++
.../paimon/spark/execution/CopyIntoTableExec.scala | 601 ++++++++-------------
.../paimon/spark/execution/CopyIntoUtils.scala | 15 +
.../paimon/spark/execution/PaimonStrategy.scala | 3 +-
.../AbstractPaimonSparkSqlExtensionsParser.scala | 2 +
.../extensions/PaimonSqlExtensionsAstBuilder.scala | 10 +-
.../execution/CopyIntoCastValidatorTest.scala | 204 +++++++
.../execution/CopyIntoDataFrameBuilderTest.scala | 231 ++++++++
.../spark/execution/CopyIntoHelperTest.scala | 133 +++++
.../sql/CatalogQualifiedCreateTableLikeTest.scala | 10 +
.../paimon/spark/sql/CopyIntoOnErrorTest.scala | 527 ++++++++++++++++++
.../apache/paimon/spark/sql/CopyIntoTestBase.scala | 14 +-
26 files changed, 2340 insertions(+), 403 deletions(-)
diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 83d26da622..703976857a 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -325,6 +325,10 @@ Notes:
`COPY INTO` provides a SQL command for bulk loading data files into Paimon
tables and exporting table data to files. Supported formats: **CSV**, **JSON**,
and **Parquet**.
+:::info
+**SQL dialect:** Paimon's `COPY INTO` is a Snowflake-style extension
(`FILE_FORMAT = (TYPE = ...)`, `PATTERN`, `FORCE`, `ON_ERROR`), not the
Databricks `COPY INTO` form (`FILEFORMAT` + `FORMAT_OPTIONS (...)` /
`COPY_OPTIONS (...)`). It implements only a subset of the Snowflake syntax. In
particular, `ON_ERROR` supports `ABORT_STATEMENT` (default), `CONTINUE`, and
`SKIP_FILE`; the Snowflake variants `SKIP_FILE_<num>` and `SKIP_FILE_<num>%`
are **not** supported.
+:::
+
#### CSV Import
```sql
@@ -333,7 +337,7 @@ FROM 'source_path'
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
-[ON_ERROR = ABORT_STATEMENT]
+[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]
```
**Basic import:**
@@ -371,7 +375,7 @@ FROM 'source_path'
FILE_FORMAT = (TYPE = JSON [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
-[ON_ERROR = ABORT_STATEMENT]
+[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]
```
**Basic import:**
@@ -400,7 +404,7 @@ FROM 'source_path'
FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
-[ON_ERROR = ABORT_STATEMENT]
+[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]
```
**Basic import:**
@@ -553,7 +557,7 @@ OVERWRITE = TRUE;
|--------|-------------|---------|
| PATTERN | Regex to filter source files by base file name. Only matching
files are loaded. | (all files) |
| FORCE | `FALSE`: skip files already loaded (idempotent). `TRUE`: reload all
files. | `FALSE` |
-| ON_ERROR | Error handling strategy. Only `ABORT_STATEMENT` is supported. |
`ABORT_STATEMENT` |
+| ON_ERROR | Error handling strategy. `ABORT_STATEMENT`: abort on any error.
`CONTINUE`: skip bad rows and continue loading. `SKIP_FILE`: skip files that
contain errors. | `ABORT_STATEMENT` |
#### File Write Options
@@ -592,9 +596,11 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files
have been successfull
| Column | Type | Description |
|--------|------|-------------|
| file_name | STRING | Source file name |
-| status | STRING | `LOADED` or `SKIPPED` |
+| status | STRING | `LOADED`, `PARTIALLY_LOADED`, `LOAD_FAILED`, or `SKIPPED` |
| rows_loaded | BIGINT | Number of rows written |
| rows_parsed | BIGINT | Number of rows parsed from the file |
+| errors_seen | BIGINT | Number of error rows (parse or cast failures) |
+| first_error | STRING | First error message encountered (NULL if no errors) |
**File write** returns a single row:
@@ -606,9 +612,9 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files
have been successfull
#### Limitations
+- **CSV column-count mismatch**: Rows with fewer or more columns than the
target schema are treated as malformed records. With `ON_ERROR = CONTINUE`,
these rows are skipped and counted as errors.
- Only **CSV**, **JSON**, and **Parquet** formats are supported.
- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not
supported.
-- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the
entire command.
- `SINGLE = TRUE` (single-file output) is not supported.
- File format options must be specified inline in `FILE_FORMAT = (...)`.
- File listing is **non-recursive**: only direct files under the source path
are processed. Subdirectories are ignored.
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
---
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
---
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
---
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
---
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
---
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
package org.apache.paimon.spark.sql
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 1bbbca478b..620a2bb95a 100644
---
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -145,7 +145,7 @@ forceClause
;
onErrorClause
- : ON_ERROR '=' ABORT_STATEMENT
+ : ON_ERROR '=' (ABORT_STATEMENT | CONTINUE | SKIP_FILE)
;
overwriteClause
@@ -203,7 +203,7 @@ nonReserved
| NOT | OF | OR | TABLE | REPLACE | RETAIN | VERSION | TAG
| TRUE | FALSE
| MAP
- | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR |
ABORT_STATEMENT | OVERWRITE
+ | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR |
ABORT_STATEMENT | CONTINUE | SKIP_FILE | OVERWRITE
| CSV
| JSON
| PARQUET
@@ -246,6 +246,8 @@ PATTERN: 'PATTERN';
FORCE: 'FORCE';
ON_ERROR: 'ON_ERROR';
ABORT_STATEMENT: 'ABORT_STATEMENT';
+CONTINUE: 'CONTINUE';
+SKIP_FILE: 'SKIP_FILE';
OVERWRITE: 'OVERWRITE';
CSV: 'CSV';
JSON: 'JSON';
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
index eedad9763e..b7cc456f3b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
@@ -29,14 +29,17 @@ case class CopyIntoTableCommand(
sourcePath: String,
fileFormat: CopyFileFormat,
pattern: Option[String],
- force: Boolean)
+ force: Boolean,
+ onError: OnErrorMode = OnErrorMode.AbortStatement)
extends PaimonLeafCommand {
override def output: Seq[Attribute] = Seq(
AttributeReference("file_name", StringType, nullable = false)(),
AttributeReference("status", StringType, nullable = false)(),
AttributeReference("rows_loaded", LongType, nullable = false)(),
- AttributeReference("rows_parsed", LongType, nullable = false)()
+ AttributeReference("rows_parsed", LongType, nullable = false)(),
+ AttributeReference("errors_seen", LongType, nullable = false)(),
+ AttributeReference("first_error", StringType, nullable = true)()
)
override def simpleString(maxFields: Int): String = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
index 4730307193..3e5e35b2b7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -18,6 +18,14 @@
package org.apache.paimon.spark.catalyst.plans.logical
+sealed trait OnErrorMode
+
+object OnErrorMode {
+ case object AbortStatement extends OnErrorMode
+ case object Continue extends OnErrorMode
+ case object SkipFile extends OnErrorMode
+}
+
sealed trait FileFormatType
object FileFormatType {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyIntoResultBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyIntoResultBuilder.scala
new file mode 100644
index 0000000000..a165a45233
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyIntoResultBuilder.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copyinto
+
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+
+/** Builds per-file result rows for COPY INTO commands. */
+object CopyIntoResultBuilder {
+
+ /** Build a single result row with the standard 6-column schema. */
+ def buildResultRow(
+ fileName: String,
+ status: String,
+ rowsLoaded: Long,
+ rowsParsed: Long,
+ errorsSeen: Long,
+ firstError: String): InternalRow = {
+ InternalRow(
+ UTF8String.fromString(fileName),
+ UTF8String.fromString(status),
+ rowsLoaded,
+ rowsParsed,
+ errorsSeen,
+ if (firstError != null) UTF8String.fromString(firstError) else null
+ )
+ }
+
+ /**
+ * Build per-file result rows for ON_ERROR=CONTINUE mode. Merges parse
errors and cast errors per
+ * file, determines status (LOADED / PARTIALLY_LOADED / LOAD_FAILED), and
records load history for
+ * files that loaded at least one row or had no errors (including empty
files).
+ */
+ def buildContinueResults(
+ paimonTable: FileStoreTable,
+ filesToLoad: Array[FileStatus],
+ skippedFiles: Array[FileStatus],
+ totalRowsPerFile: Map[String, Long],
+ parseErrors: Map[String, Long],
+ castErrors: Map[String, Long],
+ firstParseErrorPerFile: Map[String, String],
+ firstCastErrorPerFile: Map[String, String]): Seq[InternalRow] = {
+ val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
+ val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
+ val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+ val loadedAt = System.currentTimeMillis()
+
+ val loadedResults = filesToLoad.map {
+ fileStatus =>
+ val baseName = fileStatus.getPath.getName
+ val fullPath = fileStatus.getPath.toString
+ val parsedCount = totalRowsPerFile.getOrElse(baseName, 0L)
+ val fileParseErrors = parseErrors.getOrElse(baseName, 0L)
+ val fileCastErrors = castErrors.getOrElse(baseName, 0L)
+ val totalFileErrors = fileParseErrors + fileCastErrors
+ val rowsLoaded = Math.max(0, parsedCount - totalFileErrors)
+
+ if (rowsLoaded > 0 || totalFileErrors == 0) {
+ historyManager.recordLoaded(
+ CopyLoadRecord(
+ filePath = fullPath,
+ fileSize = fileStatus.getLen,
+ lastModified = fileStatus.getModificationTime,
+ loadedAt = loadedAt,
+ snapshotId = snapshotId,
+ rowsLoaded = rowsLoaded
+ ))
+ }
+
+ val status =
+ if (rowsLoaded == 0 && totalFileErrors > 0) "LOAD_FAILED"
+ else if (totalFileErrors > 0) "PARTIALLY_LOADED"
+ else "LOADED"
+ val fileFirstError =
+
firstParseErrorPerFile.get(baseName).orElse(firstCastErrorPerFile.get(baseName))
+ buildResultRow(
+ baseName,
+ status,
+ rowsLoaded,
+ parsedCount,
+ totalFileErrors,
+ fileFirstError.orNull)
+ }.toSeq
+
+ loadedResults ++ buildSkippedResults(skippedFiles)
+ }
+
+ /**
+ * Record load history and build results using pre-computed per-file row
counts (ABORT mode).
+ * Avoids re-reading source files just to count rows.
+ */
+ def recordHistoryAndBuildResultsDirect(
+ paimonTable: FileStoreTable,
+ filesToLoad: Array[FileStatus],
+ skippedFiles: Array[FileStatus],
+ rowCountsPerFile: Map[String, Long]): Seq[InternalRow] = {
+ val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
+ val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
+ val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+ val loadedAt = System.currentTimeMillis()
+
+ val loadedResults = filesToLoad.map {
+ fileStatus =>
+ val baseName = fileStatus.getPath.getName
+ val rowCount = rowCountsPerFile.getOrElse(baseName, 0L)
+
+ historyManager.recordLoaded(
+ CopyLoadRecord(
+ filePath = fileStatus.getPath.toString,
+ fileSize = fileStatus.getLen,
+ lastModified = fileStatus.getModificationTime,
+ loadedAt = loadedAt,
+ snapshotId = snapshotId,
+ rowsLoaded = rowCount
+ ))
+
+ buildResultRow(baseName, "LOADED", rowCount, rowCount, 0L, null)
+ }.toSeq
+
+ loadedResults ++ buildSkippedResults(skippedFiles)
+ }
+
+ def buildSkippedResults(files: Array[FileStatus]): Seq[InternalRow] = {
+ files.map(f => buildResultRow(f.getPath.getName, "SKIPPED", 0L, 0L, 0L,
null)).toSeq
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoCastValidator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoCastValidator.scala
new file mode 100644
index 0000000000..971b9803d4
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoCastValidator.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.types.DataField
+
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Handles cast validation for COPY INTO operations. Validates that source
data can be safely cast
+ * to target table types without data loss.
+ */
+private[execution] class CopyIntoCastValidator(spark:
org.apache.spark.sql.SparkSession) {
+
+ /**
+ * Build cast validation columns for Parquet import. For each writable
column that exists in both
+ * targetColumns and source, adds a casted temp column for comparison.
Returns the augmented
+ * DataFrame, the (source, cast) column pairs, and the bad-cast filter
expression.
+ */
+ def buildParquetCastValidation(
+ rawDf: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField],
+ excludeCols: Set[String] = Set.empty): CastValidationSetup = {
+ val resolver = spark.sessionState.conf.resolver
+ val sourceColumns = rawDf.columns.toSeq.filterNot(excludeCols.contains)
+
+ val castColMapping = scala.collection.mutable.LinkedHashMap[String,
String]()
+ var validationDf = rawDf
+ val existingCols = rawDf.columns.toSet ++ writableColumns.toSet
+ var usedCols = existingCols
+
+ writableColumns.zip(fields).foreach {
+ case (colName, field) =>
+ if (targetColumns.exists(tc => resolver(tc, colName))) {
+ sourceColumns.find(s => resolver(s, colName)).foreach {
+ srcColName =>
+ val sparkType =
+
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ val castColName = safeTempCol("__pq_cv_" + colName, usedCols)
+ usedCols += castColName
+ validationDf =
+ validationDf.withColumn(castColName,
nonAnsiCast(col(srcColName), sparkType))
+ castColMapping(srcColName) = castColName
+ }
+ }
+ }
+
+ val badCastFilter = if (castColMapping.nonEmpty) {
+ Some(
+ castColMapping
+ .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
+ .reduce(_ || _))
+ } else None
+
+ CastValidationSetup(validationDf, castColMapping.toMap, badCastFilter)
+ }
+
+ /**
+ * Build cast validation columns for text-based import. For each non-string
writable column, adds
+ * a casted temp column for comparison. Returns the augmented DataFrame, the
column names
+ * requiring validation, a mapping from original to temp column names, and
the bad-cast filter.
+ */
+ def buildTextCastValidation(
+ df: DataFrame,
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): CastValidationSetup = {
+ val existingCols = df.columns.toSet ++ writableColumns.toSet
+ val castColMapping = scala.collection.mutable.LinkedHashMap[String,
String]()
+ var usedCols = existingCols
+ var validationDf = df
+
+ writableColumns.zip(fields).foreach {
+ case (colName, field) =>
+ val sparkType =
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ if (sparkType != StringType) {
+ val tempName = safeTempCol("__cv_" + colName, usedCols)
+ usedCols += tempName
+ castColMapping(colName) = tempName
+ validationDf = validationDf.withColumn(tempName,
nonAnsiCast(col(colName), sparkType))
+ }
+ }
+
+ if (castColMapping.isEmpty) {
+ return CastValidationSetup(df, Map.empty, None)
+ }
+
+ val badCastFilter = castColMapping
+ .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
+ .reduce(_ || _)
+
+ CastValidationSetup(validationDf, castColMapping.toMap,
Some(badCastFilter))
+ }
+
+ /**
+ * Validate Parquet cast and abort on first failure. Detection strategy: if
a source value is
+ * non-null but becomes null after casting, the cast failed (e.g., a string
"abc" cast to
+ * IntegerType → null). Aborts immediately on first failure.
+ */
+ def validateParquetCast(
+ rawDf: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): Unit = {
+ val setup = buildParquetCastValidation(rawDf, targetColumns,
writableColumns, fields)
+ abortOnCastFailure(setup)
+ }
+
+ /** Abort on first cast failure found in the validation setup. */
+ def abortOnCastFailure(setup: CastValidationSetup): Unit = {
+ setup.badCastFilter.foreach {
+ filter =>
+ val badRows = setup.validationDf.filter(filter).limit(1).collect()
+ if (badRows.nonEmpty) {
+ val example = setup.castColMapping.find {
+ case (src, dst) =>
+ val row = badRows(0)
+ val srcIdx = setup.validationDf.schema.fieldIndex(src)
+ val dstIdx = setup.validationDf.schema.fieldIndex(dst)
+ !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+ }
+ throw new IllegalArgumentException(
+ s"ON_ERROR = ABORT_STATEMENT: Cast failure in column
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that
cannot be converted to the target type.")
+ }
+ }
+ }
+
+ /**
+ * Cast all writable columns to their target Paimon types and validate. Used
for text-based ABORT
+ * mode.
+ */
+ def castAndValidate(
+ finalDf: DataFrame,
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): DataFrame = {
+ val castedDf = castColumns(finalDf, writableColumns, fields)
+
+ val setup = buildTextCastValidation(finalDf, writableColumns, fields)
+ abortOnCastFailure(setup)
+
+ castedDf
+ }
+
+ /** Cast all writable columns to their target Paimon types. */
+ def castColumns(
+ df: DataFrame,
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): DataFrame = {
+ writableColumns.zip(fields).foldLeft(df) {
+ case (d, (colName, field)) =>
+ val sparkType =
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ d.withColumn(colName, col(colName).cast(sparkType))
+ }
+ }
+
+ private def safeTempCol(baseName: String, existingColumns: Set[String]):
String =
+ CopyIntoHelper.safeTempCol(spark, baseName, existingColumns)
+
+ /**
+ * Cast a column with ANSI disabled so a failed cast yields NULL instead of
throwing. This is the
+ * basis for bad-row detection: a source value that is non-null but becomes
null after casting
+ * could not be converted to the target type. Under Spark's default ANSI
mode a plain `.cast`
+ * would raise `CAST_INVALID_INPUT` before any filtering could run.
+ */
+ private def nonAnsiCast(column: Column, dataType: DataType): Column = {
+ val expr = SparkShimLoader.shim.classicApi.expression(spark, column)
+ SparkShimLoader.shim.classicApi.column(Compatibility.cast(expr, dataType,
ansiEnabled = false))
+ }
+}
+
+/** Unified cast validation setup for both Parquet and text (CSV/JSON) paths.
*/
+private[execution] case class CastValidationSetup(
+ validationDf: DataFrame,
+ castColMapping: Map[String, String],
+ badCastFilter: Option[org.apache.spark.sql.Column])
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilder.scala
new file mode 100644
index 0000000000..e44ba8c63a
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilder.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat,
FileFormatType}
+import org.apache.paimon.types.DataField
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.functions.{col, lit, when}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+/**
+ * Handles DataFrame construction and transformation for COPY INTO operations.
Responsible for
+ * building DataFrames from source files, applying transformations, and
preparing data for writing
+ * to Paimon tables.
+ */
+private[execution] class CopyIntoDataFrameBuilder(
+ spark: SparkSession,
+ fileFormat: CopyFileFormat,
+ columns: Option[Seq[String]])
+ extends Logging {
+
+ /**
+ * Build the projection DataFrame for Parquet import. Maps source columns to
target table columns
+ * by name (case-insensitive). For each writable column:
+ * - If it's in targetColumns AND exists in source: cast source column to
target type
+ * - If it's in targetColumns but missing from source: fill with NULL
+ * - If it's NOT in targetColumns (unmapped): fill with default value or
NULL
+ */
+ def buildParquetDataFrame(
+ rawDf: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): DataFrame = {
+ val resolver = spark.sessionState.conf.resolver
+ val sourceColumns = rawDf.columns.toSeq
+
+ val selectExprs: Seq[Column] = writableColumns.map {
+ colName =>
+ if (targetColumns.exists(tc => resolver(tc, colName))) {
+ val srcCol = sourceColumns.find(s => resolver(s, colName))
+ srcCol match {
+ case Some(s) =>
+ val field = fields.find(_.name() == colName).get
+ val sparkType =
+
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ col(s).cast(sparkType).as(colName)
+ case None =>
+ val field = fields.find(_.name() == colName).get
+ val sparkType =
+
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ lit(null).cast(sparkType).as(colName)
+ }
+ } else {
+ resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+ }
+ }
+ rawDf.select(selectExprs: _*)
+ }
+
+ /**
+ * Build the final DataFrame for text-based import. Handles column renaming
(CSV positional →
+ * named), default value filling for unmapped columns, and preserves extra
columns like file name.
+ */
+ def buildFinalDataFrame(
+ sourceDf: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField],
+ extraCols: Seq[String] = Seq.empty): DataFrame = {
+ val renamedDf = fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ sourceDf
+ case _ =>
+ targetColumns.zipWithIndex.foldLeft(sourceDf) {
+ case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx",
targetCol)
+ }
+ }
+
+ if (columns.isDefined) {
+ val selectExprs: Seq[Column] = writableColumns.map {
+ colName =>
+ if (targetColumns.contains(colName)) {
+ col(colName)
+ } else {
+ resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+ }
+ }
+ renamedDf.select((selectExprs ++ extraCols.map(col)): _*)
+ } else {
+ renamedDf
+ }
+ }
+
+ /** Build string schema for text-based formats (CSV/JSON). */
+ def buildStringSchema(targetColumns: Seq[String]): StructType = {
+ fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ StructType(targetColumns.map(name => StructField(name, StringType,
nullable = true)))
+ case _ =>
+ StructType(
+ (0 until targetColumns.size).map(i => StructField(s"_c$i",
StringType, nullable = true)))
+ }
+ }
+
+ /** Read source data for text-based formats with NULL transforms applied. */
+ def readSourceData(
+ filePaths: Array[String],
+ stringSchema: StructType,
+ readerOptions: Map[String, String]): DataFrame = {
+ val df = fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ spark.read.options(readerOptions).schema(stringSchema).json(filePaths:
_*)
+ case _ =>
+ spark.read.options(readerOptions).schema(stringSchema).csv(filePaths:
_*)
+ }
+
+ applyNullTransforms(df, df.columns)
+ }
+
+ /** Apply NULL_IF and EMPTY_FIELD_AS_NULL transforms to the specified
columns. */
+ def applyNullTransforms(df: DataFrame, columns: Seq[String]): DataFrame = {
+ var result = df
+
+ val nullIfVals = fileFormat.nullIfValues
+ if (nullIfVals.nonEmpty) {
+ columns.foreach {
+ colName =>
+ result = result.withColumn(
+ colName,
+ when(col(colName).isin(nullIfVals: _*), lit(null).cast(StringType))
+ .otherwise(col(colName)))
+ }
+ }
+
+ if (fileFormat.emptyFieldAsNull) {
+ columns.foreach {
+ colName =>
+ result = result.withColumn(
+ colName,
+ when(col(colName) === lit(""), lit(null).cast(StringType))
+ .otherwise(col(colName)))
+ }
+ }
+
+ result
+ }
+
+ /** Resolve the default value expression for a column not populated from
source data. */
+ private def resolveDefaultColumn(field: DataField, colName: String): Column
= {
+ val defaultVal = field.defaultValue()
+ if (defaultVal != null) {
+ val sparkType =
+ org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ try {
+ val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal)
+
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
+ } catch {
+ case e: Exception =>
+ logWarning(
+ s"Failed to parse default value '$defaultVal' for column
'$colName'; using NULL instead.",
+ e)
+ lit(null).cast(sparkType).as(colName)
+ }
+ } else {
+ lit(null).as(colName)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoErrorHandler.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoErrorHandler.scala
new file mode 100644
index 0000000000..466134900f
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoErrorHandler.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.plans.logical.FileFormatType
+import org.apache.paimon.spark.copyinto.{CopyIntoResultBuilder,
CopyLoadHistoryManager, CopyLoadRecord}
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataField
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+/**
+ * Handles error detection and result building for COPY INTO operations.
Supports both row-level
+ * (CONTINUE) and file-level (SKIP_FILE) error handling.
+ */
+private[execution] class CopyIntoErrorHandler(
+ spark: org.apache.spark.sql.SparkSession,
+ castValidator: CopyIntoCastValidator,
+ dataFrameBuilder: CopyIntoDataFrameBuilder) {
+
+ /** Detect cast errors in Parquet files. Returns error statistics and good
rows DataFrame. */
+ def detectParquetErrors(
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField])(rawDfWithFile: DataFrame, fileCol: String):
ErrorDetectionResult = {
+
+ val totalRowsPerFile = CopyIntoUtils.countPerFile(rawDfWithFile, fileCol)
+
+ val castResult =
+ filterParquetCastErrors(rawDfWithFile, targetColumns, writableColumns,
fields, fileCol)
+
+ ErrorDetectionResult(
+ totalRowsPerFile = totalRowsPerFile,
+ parseErrors = Map.empty, // Parquet has no parse errors
+ castErrors = castResult.errorsPerFile,
+ firstParseError = Map.empty,
+ firstCastError = castResult.firstErrorPerFile,
+ goodRowsDf = castResult.df
+ )
+ }
+
+ /**
+ * Detect parse and cast errors in text files (CSV/JSON). Returns error
statistics and good rows
+ * DataFrame (already processed and cast).
+ */
+ def detectTextErrors(
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField],
+ stringSchema: StructType,
+ corruptCol: String)(rawDfWithFile: DataFrame, fileCol: String):
ErrorDetectionResult = {
+
+ val totalRowsPerFile = CopyIntoUtils.countPerFile(rawDfWithFile, fileCol)
+
+ // Separate corrupt rows (parse errors) from valid rows
+ val corruptDf = rawDfWithFile.filter(col(corruptCol).isNotNull)
+ val validDf = rawDfWithFile.filter(col(corruptCol).isNull).drop(corruptCol)
+
+ val parseErrors = CopyIntoUtils.countPerFile(corruptDf, fileCol)
+ val firstParseError = if (parseErrors.nonEmpty) {
+ val samplesPerFile = corruptDf
+ .select(col(fileCol), col(corruptCol))
+ .dropDuplicates(fileCol)
+ .collect()
+ samplesPerFile.map {
+ row =>
+ CopyIntoUtils.extractBaseName(
+ row.getString(0)) -> s"Malformed record: ${row.getString(1)}"
+ }.toMap
+ } else Map.empty[String, String]
+
+ // Process valid rows: apply null transforms, build final DataFrame,
detect cast errors
+ val processedDf = dataFrameBuilder.applyNullTransforms(validDf,
stringSchema.fieldNames)
+ val finalDfWithFile = dataFrameBuilder.buildFinalDataFrame(
+ processedDf,
+ targetColumns,
+ writableColumns,
+ fields,
+ extraCols = Seq(fileCol))
+
+ val castResult = castAndFilterErrors(finalDfWithFile, writableColumns,
fields, fileCol)
+
+ ErrorDetectionResult(
+ totalRowsPerFile = totalRowsPerFile,
+ parseErrors = parseErrors,
+ castErrors = castResult.errorsPerFile,
+ firstParseError = firstParseError,
+ firstCastError = castResult.firstErrorPerFile,
+ goodRowsDf = castResult.df
+ )
+ }
+
+ /**
+ * Build results for error-tolerant modes (CONTINUE/SKIP_FILE). Handles both
row-level and
+ * file-level error reporting.
+ */
+ def buildErrorTolerantResults(
+ paimonTable: FileStoreTable,
+ filesToLoad: Array[FileStatus],
+ skippedFiles: Array[FileStatus],
+ errorResult: ErrorDetectionResult,
+ filesWithErrors: Set[String],
+ errorGranularity: ErrorGranularity): Seq[InternalRow] = {
+
+ errorGranularity match {
+ case ErrorGranularity.RowLevel =>
+ // CONTINUE mode: use existing buildContinueResults
+ CopyIntoResultBuilder.buildContinueResults(
+ paimonTable,
+ filesToLoad,
+ skippedFiles,
+ errorResult.totalRowsPerFile,
+ errorResult.parseErrors,
+ errorResult.castErrors,
+ errorResult.firstParseError,
+ errorResult.firstCastError
+ )
+
+ case ErrorGranularity.FileLevel =>
+ // SKIP_FILE mode: files are either fully loaded or fully failed
+ val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
+ val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
+ val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+ val loadedAt = System.currentTimeMillis()
+
+ val results = filesToLoad.map {
+ fileStatus =>
+ val baseName = fileStatus.getPath.getName
+ val fullPath = fileStatus.getPath.toString
+
+ if (filesWithErrors.contains(baseName)) {
+ // File had errors, mark as LOAD_FAILED
+ val parseErrorCount =
errorResult.parseErrors.getOrElse(baseName, 0L)
+ val castErrorCount = errorResult.castErrors.getOrElse(baseName,
0L)
+ val totalErrors = parseErrorCount + castErrorCount
+ val firstError = errorResult.firstParseError
+ .get(baseName)
+ .orElse(errorResult.firstCastError.get(baseName))
+
+ CopyIntoResultBuilder.buildResultRow(
+ baseName,
+ "LOAD_FAILED",
+ 0L,
+ errorResult.totalRowsPerFile.getOrElse(baseName, 0L),
+ totalErrors,
+ firstError.orNull)
+ } else {
+ // File had no errors, mark as LOADED
+ val rowCount = errorResult.totalRowsPerFile.getOrElse(baseName,
0L)
+ historyManager.recordLoaded(
+ CopyLoadRecord(
+ filePath = fullPath,
+ fileSize = fileStatus.getLen,
+ lastModified = fileStatus.getModificationTime,
+ loadedAt = loadedAt,
+ snapshotId = snapshotId,
+ rowsLoaded = rowCount
+ ))
+
+ CopyIntoResultBuilder.buildResultRow(baseName, "LOADED",
rowCount, rowCount, 0L, null)
+ }
+ }.toSeq
+
+ results ++ CopyIntoResultBuilder.buildSkippedResults(skippedFiles)
+ }
+ }
+
+ /**
+ * Filter out rows with cast errors and collect per-file error stats. Shared
by both Parquet and
+ * text CONTINUE paths.
+ */
+ private def filterCastErrors(setup: CastValidationSetup, fileCol: String):
CastFilterResult = {
+ setup.badCastFilter match {
+ case Some(filter) =>
+ val badRowsDf = setup.validationDf.filter(filter)
+
+ // Count errors and sample first error per file
+ val sampleRows = badRowsDf.dropDuplicates(fileCol).collect()
+ val errorsPerFile = CopyIntoUtils.countPerFile(badRowsDf, fileCol)
+
+ val firstErrorPerFile = if (sampleRows.nonEmpty) {
+ sampleRows.map {
+ sampleRow =>
+ val fileName =
+
CopyIntoUtils.extractBaseName(sampleRow.getString(sampleRow.fieldIndex(fileCol)))
+ val example = setup.castColMapping.find {
+ case (src, dst) =>
+ val srcIdx = setup.validationDf.schema.fieldIndex(src)
+ val dstIdx = setup.validationDf.schema.fieldIndex(dst)
+ !sampleRow.isNullAt(srcIdx) && sampleRow.isNullAt(dstIdx)
+ }
+ fileName -> s"Cast failure in column
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that
cannot be converted to the target type."
+ }.toMap
+ } else Map.empty[String, String]
+
+ // Keep good rows and drop validation temp columns
+ var goodDf = setup.validationDf.filter(!filter)
+ setup.castColMapping.values.foreach(c => goodDf = goodDf.drop(c))
+
+ CastFilterResult(goodDf, errorsPerFile, firstErrorPerFile)
+ case None =>
+ CastFilterResult(setup.validationDf, Map.empty, Map.empty)
+ }
+ }
+
+ private def filterParquetCastErrors(
+ rawDfWithFile: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField],
+ fileCol: String): CastFilterResult = {
+ val setup =
+ castValidator.buildParquetCastValidation(
+ rawDfWithFile,
+ targetColumns,
+ writableColumns,
+ fields,
+ excludeCols = Set(fileCol))
+ filterCastErrors(setup, fileCol)
+ }
+
+ private def castAndFilterErrors(
+ dfWithFile: DataFrame,
+ writableColumns: Seq[String],
+ fields: Seq[DataField],
+ fileCol: String): CastFilterResult = {
+ val setup = castValidator.buildTextCastValidation(dfWithFile,
writableColumns, fields)
+ val result = filterCastErrors(setup, fileCol)
+ // Apply final cast to target types for good rows
+ CastFilterResult(
+ castValidator.castColumns(result.df, writableColumns, fields),
+ result.errorsPerFile,
+ result.firstErrorPerFile)
+ }
+}
+
+private[execution] case class CastFilterResult(
+ df: DataFrame,
+ errorsPerFile: Map[String, Long],
+ firstErrorPerFile: Map[String, String])
+
+/** Error granularity for error-tolerant modes (CONTINUE/SKIP_FILE). */
+sealed private[execution] trait ErrorGranularity
+private[execution] object ErrorGranularity {
+ case object RowLevel extends ErrorGranularity
+ case object FileLevel extends ErrorGranularity
+}
+
+/** Unified error detection result for both Parquet and text formats. */
+private[execution] case class ErrorDetectionResult(
+ totalRowsPerFile: Map[String, Long],
+ parseErrors: Map[String, Long],
+ castErrors: Map[String, Long],
+ firstParseError: Map[String, String],
+ firstCastError: Map[String, String],
+ goodRowsDf: DataFrame)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoHelper.scala
new file mode 100644
index 0000000000..c00ac34f7b
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoHelper.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.copyinto.CopyLoadHistoryManager
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataField
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Utility methods for COPY INTO operations. Provides helper functions for
column resolution, file
+ * filtering, and validation.
+ */
+private[execution] object CopyIntoHelper {
+
+ /**
+ * Resolve target columns from user-specified column list or use all
writable columns. Validates
+ * that specified columns exist in the table and are not duplicated.
+ */
+ def resolveTargetColumns(
+ spark: SparkSession,
+ columns: Option[Seq[String]],
+ writableColumns: Seq[String]): Seq[String] = {
+ columns match {
+ case Some(cols) =>
+ val resolver = spark.sessionState.conf.resolver
+ cols.indices.foreach {
+ i =>
+ cols.indices.filter(_ > i).foreach {
+ j =>
+ if (resolver(cols(i), cols(j))) {
+ throw new IllegalArgumentException(
+ s"Duplicate columns in column list: ${cols(i)}")
+ }
+ }
+ }
+ cols.map {
+ c =>
+ writableColumns.find(w => resolver(w, c)).getOrElse {
+ throw new IllegalArgumentException(
+ s"Column '$c' does not exist in target table. Available
columns: ${writableColumns.mkString(", ")}")
+ }
+ }
+ case None => writableColumns
+ }
+ }
+
+ /**
+ * Validate that non-nullable columns without default values are included in
the target column
+ * list. Throws exception if validation fails.
+ */
+ def validateNonNullableDefaults(
+ columns: Option[Seq[String]],
+ writableColumns: Seq[String],
+ targetColumns: Seq[String],
+ fields: Seq[DataField]): Unit = {
+ if (columns.isEmpty) return
+ val unmapped = writableColumns.filterNot(targetColumns.contains)
+ unmapped.foreach {
+ colName =>
+ val field = fields.find(_.name() == colName).get
+ if (!field.`type`().isNullable && field.defaultValue() == null) {
+ throw new IllegalArgumentException(
+ s"Non-nullable column '$colName' is not in the column list and has
no default value")
+ }
+ }
+ }
+
+ /**
+ * List files from source path and filter based on pattern and load history.
Returns (files to
+ * load, files to skip).
+ */
+ def listAndFilterFiles(
+ spark: SparkSession,
+ paimonTable: FileStoreTable,
+ sourcePath: String,
+ pattern: Option[String],
+ force: Boolean): (Array[FileStatus], Array[FileStatus]) = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val fsPath = new Path(sourcePath)
+ val fs = fsPath.getFileSystem(hadoopConf)
+ val allFiles = fs.listStatus(fsPath).filter(_.isFile)
+
+ val patternFiltered = pattern match {
+ case Some(p) =>
+ val regex = p.r
+ allFiles.filter(f => regex.findFirstIn(f.getPath.getName).isDefined)
+ case None => allFiles
+ }
+
+ if (patternFiltered.isEmpty) {
+ return (Array.empty, Array.empty)
+ }
+
+ val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
+ val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
+
+ if (!force) {
+ val (skip, load) = patternFiltered.partition {
+ f => historyManager.isLoaded(f.getPath.toString, f.getLen,
f.getModificationTime)
+ }
+ (load, skip)
+ } else {
+ (patternFiltered, Array.empty[FileStatus])
+ }
+ }
+
+ /**
+ * Generate a safe temporary column name that doesn't conflict with existing
columns.
+ * Case-insensitive conflict detection.
+ */
+ def safeTempCol(spark: SparkSession, baseName: String, existingColumns:
Set[String]): String = {
+ val resolver = spark.sessionState.conf.resolver
+ var candidate = baseName
+ while (existingColumns.exists(c => resolver(c, candidate))) {
+ candidate = "_" + candidate
+ }
+ candidate
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
index 5ef05b4188..4701262f13 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -19,25 +19,22 @@
package org.apache.paimon.spark.execution
import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat,
FileFormatType}
-import org.apache.paimon.spark.copyinto.{CopyLoadHistoryManager,
CopyLoadRecord}
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat,
FileFormatType, OnErrorMode}
+import org.apache.paimon.spark.copyinto.CopyIntoResultBuilder
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.DataField
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.FileStatus
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
-import org.apache.spark.sql.functions.{col, input_file_name, lit, when}
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.functions.{col, input_file_name, substring_index}
+import org.apache.spark.sql.types.{StringType, StructField}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
case class CopyIntoTableExec(
spark: SparkSession,
@@ -48,10 +45,16 @@ case class CopyIntoTableExec(
fileFormat: CopyFileFormat,
pattern: Option[String],
force: Boolean,
+ onError: OnErrorMode,
out: Seq[Attribute])
extends PaimonLeafV2CommandExec
with Logging {
+ // Initialize helper classes
+ private val castValidator = new CopyIntoCastValidator(spark)
+ private val dataFrameBuilder = new CopyIntoDataFrameBuilder(spark,
fileFormat, columns)
+ private val errorHandler = new CopyIntoErrorHandler(spark, castValidator,
dataFrameBuilder)
+
override def output: Seq[Attribute] = out
override protected def run(): Seq[InternalRow] = {
@@ -63,14 +66,15 @@ case class CopyIntoTableExec(
val tableSchema = paimonTable.schema()
val writableColumns = tableSchema.fieldNames().asScala.toSeq
val fields = tableSchema.fields().asScala.toSeq
- val targetColumns = resolveTargetColumns(writableColumns)
+ val targetColumns = CopyIntoHelper.resolveTargetColumns(spark, columns,
writableColumns)
- validateNonNullableDefaults(writableColumns, targetColumns, fields)
+ CopyIntoHelper.validateNonNullableDefaults(columns, writableColumns,
targetColumns, fields)
- val (filesToLoad, skippedFiles) = listAndFilterFiles(paimonTable)
+ val (filesToLoad, skippedFiles) =
+ CopyIntoHelper.listAndFilterFiles(spark, paimonTable, sourcePath,
pattern, force)
if (filesToLoad.isEmpty) {
- return buildSkippedResults(skippedFiles)
+ return CopyIntoResultBuilder.buildSkippedResults(skippedFiles)
}
val filePaths = filesToLoad.map(_.getPath.toString)
@@ -101,119 +105,173 @@ case class CopyIntoTableExec(
}
/**
- * Parquet import pipeline. Unlike CSV/JSON which read as strings then cast,
Parquet files already
- * have typed columns, so the flow is:
- * 1. Read source Parquet with native types
- * 2. Project and cast columns to match target table schema (by column
name, not position)
- * 3. Validate that no non-null values become null after casting (detect
type incompatibility)
- * 4. Write to Paimon table
- * 5. Record load history for idempotent re-runs (FORCE=FALSE dedup)
+ * Unified error-tolerant import for both CONTINUE and SKIP_FILE modes. Both
start from
+ * `goodRowsDf`, the row-clean DataFrame produced by error detection (bad
rows already removed
+ * and, for text formats, already cast to target types).
+ * - CONTINUE: writes every good row, so a file with errors is partially
loaded.
+ * - SKIP_FILE: additionally drops every row belonging to a file that had
any error, so such a
+ * file is loaded all-or-nothing.
+ *
+ * Both modes use a single batch write (one commit) regardless of file count.
*/
- private def runParquetImport(
+ private def runErrorTolerantMode(
paimonTable: FileStoreTable,
- filePaths: Array[String],
+ rawDf: DataFrame,
targetColumns: Seq[String],
writableColumns: Seq[String],
fields: Seq[DataField],
filesToLoad: Array[FileStatus],
skippedFiles: Array[FileStatus],
- readerOptions: Map[String, String]): Seq[InternalRow] = {
- val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+ errorGranularity: ErrorGranularity,
+ detectErrors: (DataFrame, String) => ErrorDetectionResult):
Seq[InternalRow] = {
+
+ val allTargetCols = writableColumns.toSet ++ targetColumns.toSet
+ val inputFileCol = CopyIntoHelper.safeTempCol(spark, "__input_file",
allTargetCols)
+ val rawDfWithFile = rawDf.withColumn(inputFileCol,
input_file_name()).cache()
+
+ try {
+ val errorResult = detectErrors(rawDfWithFile, inputFileCol)
+
+ // `goodRowsDf` carries `inputFileCol` (full path); error keys are base
names, so compare on
+ // the base name extracted from the path.
+ val filesWithErrors = errorResult.parseErrors.keySet ++
errorResult.castErrors.keySet
+ val dfToWrite = errorGranularity match {
+ case ErrorGranularity.RowLevel =>
+ // CONTINUE: keep all good rows, including good rows from files that
also had errors.
+ errorResult.goodRowsDf
+
+ case ErrorGranularity.FileLevel =>
+ // SKIP_FILE: drop every row whose file had any error.
+ if (filesWithErrors.nonEmpty) {
+ val baseName = substring_index(col(inputFileCol), "/", -1)
+
errorResult.goodRowsDf.filter(!baseName.isin(filesWithErrors.toSeq: _*))
+ } else {
+ errorResult.goodRowsDf
+ }
+ }
+
+ val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+ val finalDf = fileFormat.formatType match {
+ case FileFormatType.PARQUET =>
+ dataFrameBuilder.buildParquetDataFrame(dfToWrite, targetColumns,
writableColumns, fields)
+ case _ =>
+ // For text formats, goodRowsDf is already processed and cast.
+ dfToWrite
+ }
- val selectedDf = buildParquetDataFrame(rawDf, targetColumns,
writableColumns, fields)
- validateParquetCast(rawDf, targetColumns, writableColumns, fields)
+
finalDf.drop(inputFileCol).write.format("paimon").mode("append").insertInto(tableName)
- val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
- selectedDf.write.format("paimon").mode("append").insertInto(tableName)
+ errorHandler.buildErrorTolerantResults(
+ paimonTable,
+ filesToLoad,
+ skippedFiles,
+ errorResult,
+ filesWithErrors,
+ errorGranularity)
- val countDf = spark.read.options(readerOptions).parquet(filePaths: _*)
- recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles,
countDf)
+ } finally {
+ rawDfWithFile.unpersist()
+ }
}
/**
- * Build the projection DataFrame for Parquet import. Maps source columns to
target table columns
- * by name (case-insensitive). For each writable column:
- * - If it's in targetColumns AND exists in source: cast source column to
target type
- * - If it's in targetColumns but missing from source: fill with NULL
- * - If it's NOT in targetColumns (unmapped): fill with default value or
NULL
+ * Import files in ABORT mode: read, validate, write, and return per-file
row counts. Validation
+ * aborts the whole statement on the first parse or cast error, so either
all files are written or
+ * none are.
*/
- private def buildParquetDataFrame(
- rawDf: DataFrame,
+ private def importAbort(
+ filePaths: Array[String],
targetColumns: Seq[String],
writableColumns: Seq[String],
- fields: Seq[DataField]): DataFrame = {
- val resolver = spark.sessionState.conf.resolver
- val sourceColumns = rawDf.columns.toSeq
-
- val selectExprs: Seq[Column] = writableColumns.map {
- colName =>
- if (targetColumns.exists(tc => resolver(tc, colName))) {
- val srcCol = sourceColumns.find(s => resolver(s, colName))
- srcCol match {
- case Some(s) =>
- val field = fields.find(_.name() == colName).get
- val sparkType =
-
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
- col(s).cast(sparkType).as(colName)
- case None =>
- val field = fields.find(_.name() == colName).get
- val sparkType =
-
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
- lit(null).cast(sparkType).as(colName)
- }
- } else {
- resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+ fields: Seq[DataField],
+ readerOptions: Map[String, String],
+ tableName: String): Map[String, Long] = {
+ val allTargetCols = writableColumns.toSet ++ targetColumns.toSet
+ val fileCol = CopyIntoHelper.safeTempCol(spark, "__file__", allTargetCols)
+ fileFormat.formatType match {
+ case FileFormatType.PARQUET =>
+ val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+ castValidator.validateParquetCast(rawDf, targetColumns,
writableColumns, fields)
+ val selectedDf =
+ dataFrameBuilder
+ .buildParquetDataFrame(rawDf, targetColumns, writableColumns,
fields)
+ .withColumn(fileCol, input_file_name())
+ .cache()
+ try {
+ val counts = CopyIntoUtils.countPerFile(selectedDf, fileCol)
+
selectedDf.drop(fileCol).write.format("paimon").mode("append").insertInto(tableName)
+ counts
+ } finally {
+ selectedDf.unpersist()
+ }
+ case _ =>
+ val stringSchema = dataFrameBuilder.buildStringSchema(targetColumns)
+ val sourceDf = dataFrameBuilder.readSourceData(filePaths,
stringSchema, readerOptions)
+ val finalDf =
+ dataFrameBuilder.buildFinalDataFrame(sourceDf, targetColumns,
writableColumns, fields)
+ val castedDf = castValidator
+ .castAndValidate(finalDf, writableColumns, fields)
+ .withColumn(fileCol, input_file_name())
+ .cache()
+ try {
+ val counts = CopyIntoUtils.countPerFile(castedDf, fileCol)
+
castedDf.drop(fileCol).write.format("paimon").mode("append").insertInto(tableName)
+ counts
+ } finally {
+ castedDf.unpersist()
}
}
- rawDf.select(selectExprs: _*)
}
/**
- * Validate that casting Parquet source columns to target types does not
silently lose data.
- * Detection strategy: if a source value is non-null but becomes null after
casting, the cast
- * failed (e.g., a string "abc" cast to IntegerType → null). Aborts
immediately on first failure.
+ * Parquet import pipeline. Unlike CSV/JSON which read as strings then cast,
Parquet files already
+ * have typed columns, so the flow is:
+ * 1. Read source Parquet with native types
+ * 2. Project and cast columns to match target table schema (by column
name, not position)
+ * 3. Validate that no non-null values become null after casting (detect
type incompatibility)
+ * 4. Write to Paimon table
+ * 5. Record load history for idempotent re-runs (FORCE=FALSE dedup)
*/
- private def validateParquetCast(
- rawDf: DataFrame,
+ private def runParquetImport(
+ paimonTable: FileStoreTable,
+ filePaths: Array[String],
targetColumns: Seq[String],
writableColumns: Seq[String],
- fields: Seq[DataField]): Unit = {
- val resolver = spark.sessionState.conf.resolver
- val sourceColumns = rawDf.columns.toSeq
-
- val castCheckCols = ArrayBuffer[(String, String)]()
- var validationDf = rawDf
-
- writableColumns.zip(fields).foreach {
- case (colName, field) =>
- if (targetColumns.exists(tc => resolver(tc, colName))) {
- sourceColumns.find(s => resolver(s, colName)).foreach {
- srcColName =>
- val sparkType =
-
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
- val castColName = s"__pq_cv_$colName"
- validationDf = validationDf.withColumn(castColName,
col(srcColName).cast(sparkType))
- castCheckCols += ((srcColName, castColName))
- }
- }
- }
+ fields: Seq[DataField],
+ filesToLoad: Array[FileStatus],
+ skippedFiles: Array[FileStatus],
+ readerOptions: Map[String, String]): Seq[InternalRow] = {
+ val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
- if (castCheckCols.nonEmpty) {
- val badCastFilter = castCheckCols
- .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
- .reduce(_ || _)
- val badRows = validationDf.filter(badCastFilter).limit(1).collect()
- if (badRows.nonEmpty) {
- val example = castCheckCols.find {
- case (src, dst) =>
- val row = badRows(0)
- val srcIdx = validationDf.schema.fieldIndex(src)
- val dstIdx = validationDf.schema.fieldIndex(dst)
- !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+ onError match {
+ case OnErrorMode.Continue | OnErrorMode.SkipFile =>
+ val errorGranularity = if (onError == OnErrorMode.Continue) {
+ ErrorGranularity.RowLevel
+ } else {
+ ErrorGranularity.FileLevel
}
- throw new IllegalArgumentException(
- s"ON_ERROR = ABORT_STATEMENT: Cast failure in column
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that
cannot be converted to the target type.")
- }
+
+ runErrorTolerantMode(
+ paimonTable,
+ rawDf,
+ targetColumns,
+ writableColumns,
+ fields,
+ filesToLoad,
+ skippedFiles,
+ errorGranularity,
+ errorHandler.detectParquetErrors(targetColumns, writableColumns,
fields)
+ )
+
+ case _ =>
+ val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+ val countsPerFile =
+ importAbort(filePaths, targetColumns, writableColumns, fields,
readerOptions, tableName)
+ CopyIntoResultBuilder.recordHistoryAndBuildResultsDirect(
+ paimonTable,
+ filesToLoad,
+ skippedFiles,
+ countsPerFile)
}
}
@@ -234,297 +292,84 @@ case class CopyIntoTableExec(
filesToLoad: Array[FileStatus],
skippedFiles: Array[FileStatus],
readerOptions: Map[String, String]): Seq[InternalRow] = {
- val stringSchema = buildStringSchema(targetColumns)
-
- val sourceDf = readSourceData(filePaths, stringSchema, readerOptions)
- val finalDf =
- buildFinalDataFrame(sourceDf, targetColumns, writableColumns, fields)
- val castedDf = castAndValidate(finalDf, writableColumns, fields)
-
- val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
- castedDf.write.format("paimon").mode("append").insertInto(tableName)
-
- val countDf = fileFormat.formatType match {
- case FileFormatType.JSON =>
- spark.read.options(readerOptions).schema(stringSchema).json(filePaths:
_*)
- case _ =>
- spark.read.options(readerOptions).schema(stringSchema).csv(filePaths:
_*)
- }
- recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles,
countDf)
- }
-
- private def buildStringSchema(targetColumns: Seq[String]): StructType = {
- fileFormat.formatType match {
- case FileFormatType.JSON =>
- StructType(targetColumns.map(name => StructField(name, StringType,
nullable = true)))
- case _ =>
- StructType(
- (0 until targetColumns.size).map(i => StructField(s"_c$i",
StringType, nullable = true)))
- }
- }
+ val stringSchema = dataFrameBuilder.buildStringSchema(targetColumns)
- private def resolveTargetColumns(writableColumns: Seq[String]): Seq[String]
= {
- columns match {
- case Some(cols) =>
- val resolver = spark.sessionState.conf.resolver
- cols.indices.foreach {
- i =>
- cols.indices.filter(_ > i).foreach {
- j =>
- if (resolver(cols(i), cols(j))) {
- throw new IllegalArgumentException(
- s"Duplicate columns in column list: ${cols(i)}")
- }
- }
- }
- cols.map {
- c =>
- writableColumns.find(w => resolver(w, c)).getOrElse {
- throw new IllegalArgumentException(
- s"Column '$c' does not exist in target table. Available
columns: ${writableColumns.mkString(", ")}")
- }
+ onError match {
+ case OnErrorMode.Continue | OnErrorMode.SkipFile =>
+ val errorGranularity = if (onError == OnErrorMode.Continue) {
+ ErrorGranularity.RowLevel
+ } else {
+ ErrorGranularity.FileLevel
}
- case None => writableColumns
- }
- }
- private def validateNonNullableDefaults(
- writableColumns: Seq[String],
- targetColumns: Seq[String],
- fields: Seq[DataField]): Unit = {
- if (columns.isEmpty) return
- val unmapped = writableColumns.filterNot(targetColumns.contains)
- unmapped.foreach {
- colName =>
- val field = fields.find(_.name() == colName).get
- if (!field.`type`().isNullable && field.defaultValue() == null) {
- throw new IllegalArgumentException(
- s"Non-nullable column '$colName' is not in the column list and has
no default value")
+ // For error-tolerant modes, we need to read with corrupt record
tracking
+ val allTargetCols = writableColumns.toSet ++ targetColumns.toSet
+ val corruptCol = CopyIntoHelper.safeTempCol(spark, "_corrupt_record",
allTargetCols)
+ val schemaWithCorrupt =
+ stringSchema.add(StructField(corruptCol, StringType, nullable =
true))
+ val corruptRecordOption =
+ Map("columnNameOfCorruptRecord" -> corruptCol, "mode" ->
"PERMISSIVE")
+
+ val rawDf = fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ spark.read
+ .options(readerOptions ++ corruptRecordOption)
+ .schema(schemaWithCorrupt)
+ .json(filePaths: _*)
+ case _ =>
+ spark.read
+ .options(readerOptions ++ corruptRecordOption)
+ .schema(schemaWithCorrupt)
+ .csv(filePaths: _*)
}
- }
- }
-
- private def listAndFilterFiles(
- paimonTable: FileStoreTable): (Array[FileStatus], Array[FileStatus]) = {
- val hadoopConf = spark.sessionState.newHadoopConf()
- val fsPath = new Path(sourcePath)
- val fs = fsPath.getFileSystem(hadoopConf)
- val allFiles = fs.listStatus(fsPath).filter(_.isFile)
-
- val patternFiltered = pattern match {
- case Some(p) =>
- val regex = p.r
- allFiles.filter(f => regex.findFirstIn(f.getPath.getName).isDefined)
- case None => allFiles
- }
- if (patternFiltered.isEmpty) {
- return (Array.empty, Array.empty)
- }
-
- val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
- val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
-
- if (!force) {
- val (skip, load) = patternFiltered.partition {
- f => historyManager.isLoaded(f.getPath.toString, f.getLen,
f.getModificationTime)
- }
- (load, skip)
- } else {
- (patternFiltered, Array.empty[FileStatus])
- }
- }
+ runErrorTolerantMode(
+ paimonTable,
+ rawDf,
+ targetColumns,
+ writableColumns,
+ fields,
+ filesToLoad,
+ skippedFiles,
+ errorGranularity,
+ errorHandler.detectTextErrors(
+ targetColumns,
+ writableColumns,
+ fields,
+ stringSchema,
+ corruptCol)
+ )
- private def readSourceData(
- filePaths: Array[String],
- stringSchema: StructType,
- readerOptions: Map[String, String]): DataFrame = {
- var df = fileFormat.formatType match {
- case FileFormatType.JSON =>
- spark.read.options(readerOptions).schema(stringSchema).json(filePaths:
_*)
case _ =>
- spark.read.options(readerOptions).schema(stringSchema).csv(filePaths:
_*)
- }
-
- val nullIfVals = fileFormat.nullIfValues
- if (nullIfVals.nonEmpty) {
- df.columns.foreach {
- colName =>
- df = df.withColumn(
- colName,
- when(col(colName).isin(nullIfVals: _*), lit(null).cast(StringType))
- .otherwise(col(colName)))
- }
- }
-
- if (fileFormat.emptyFieldAsNull) {
- df.columns.foreach {
- colName =>
- df = df.withColumn(
- colName,
- when(col(colName) === lit(""), lit(null).cast(StringType))
- .otherwise(col(colName)))
- }
+ runTextImportAbort(
+ paimonTable,
+ filePaths,
+ targetColumns,
+ writableColumns,
+ fields,
+ filesToLoad,
+ skippedFiles,
+ readerOptions)
}
-
- df
}
- private def buildFinalDataFrame(
- sourceDf: DataFrame,
+ private def runTextImportAbort(
+ paimonTable: FileStoreTable,
+ filePaths: Array[String],
targetColumns: Seq[String],
writableColumns: Seq[String],
- fields: Seq[DataField]): DataFrame = {
- val renamedDf = fileFormat.formatType match {
- case FileFormatType.JSON =>
- sourceDf
- case _ =>
- targetColumns.zipWithIndex.foldLeft(sourceDf) {
- case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx",
targetCol)
- }
- }
-
- if (columns.isDefined) {
- val selectExprs: Seq[Column] = writableColumns.map {
- colName =>
- if (targetColumns.contains(colName)) {
- col(colName)
- } else {
- resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
- }
- }
- renamedDf.select(selectExprs: _*)
- } else {
- renamedDf
- }
- }
-
- private def castAndValidate(
- finalDf: DataFrame,
- writableColumns: Seq[String],
- fields: Seq[DataField]): DataFrame = {
- val nonStringCastCols = ArrayBuffer[String]()
- var castedDf = finalDf
- writableColumns.zip(fields).foreach {
- case (colName, field) =>
- val sparkType =
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
- castedDf = castedDf.withColumn(colName, col(colName).cast(sparkType))
- if (sparkType != StringType) {
- nonStringCastCols += colName
- }
- }
-
- if (nonStringCastCols.nonEmpty) {
- val castSuffix = "__cv"
- val validationDf = nonStringCastCols.zipWithIndex.foldLeft(finalDf) {
- case (df, (colName, idx)) =>
- val field = fields.find(_.name() == colName).get
- val sparkType =
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
- df.withColumn(castSuffix + idx, col(colName).cast(sparkType))
- }
- val badCastFilter = nonStringCastCols.zipWithIndex
- .map { case (cn, idx) => col(cn).isNotNull && col(castSuffix +
idx).isNull }
- .reduce(_ || _)
- val badRows = validationDf.filter(badCastFilter).limit(1).collect()
- if (badRows.nonEmpty) {
- val example = nonStringCastCols.zipWithIndex.find {
- case (cn, idx) =>
- val row = badRows(0)
- val srcIdx = validationDf.schema.fieldIndex(cn)
- val dstIdx = validationDf.schema.fieldIndex(castSuffix + idx)
- !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
- }
- throw new IllegalArgumentException(
- s"ON_ERROR = ABORT_STATEMENT: Cast failure in column
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that
cannot be converted to the target type.")
- }
- }
-
- castedDf
- }
-
- /**
- * Record successfully loaded files to load history (for FORCE=FALSE
idempotent dedup), and build
- * the result rows showing per-file load status. Accepts a pre-built countDf
that will be grouped
- * by input_file_name() to get per-file row counts — this allows both
Parquet and text paths to
- * share the same logic.
- */
- private def recordHistoryAndBuildResults(
- paimonTable: FileStoreTable,
+ fields: Seq[DataField],
filesToLoad: Array[FileStatus],
skippedFiles: Array[FileStatus],
- countDf: DataFrame): Seq[InternalRow] = {
- val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
- val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
- val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
- val loadedAt = System.currentTimeMillis()
-
- val rowCounts = countDf
- .groupBy(input_file_name().as("file"))
- .count()
- .collect()
-
- val fileCountMap = rowCounts.map {
- row =>
- val fullPath = row.getString(0)
- val baseName = fullPath.substring(fullPath.lastIndexOf('/') + 1)
- baseName -> row.getLong(1)
- }.toMap
-
- val loadedResults = filesToLoad.map {
- fileStatus =>
- val baseName = fileStatus.getPath.getName
- val rowCount = fileCountMap.getOrElse(baseName, 0L)
-
- historyManager.recordLoaded(
- CopyLoadRecord(
- filePath = fileStatus.getPath.toString,
- fileSize = fileStatus.getLen,
- lastModified = fileStatus.getModificationTime,
- loadedAt = loadedAt,
- snapshotId = snapshotId,
- rowsLoaded = rowCount
- ))
-
- InternalRow(
- UTF8String.fromString(baseName),
- UTF8String.fromString("LOADED"),
- rowCount,
- rowCount)
- }.toSeq
-
- val skippedResults = buildSkippedResults(skippedFiles)
- loadedResults ++ skippedResults
- }
-
- private def buildSkippedResults(files: Array[FileStatus]): Seq[InternalRow]
= {
- files.map {
- f =>
- InternalRow(
- UTF8String.fromString(f.getPath.getName),
- UTF8String.fromString("SKIPPED"),
- 0L,
- 0L)
- }.toSeq
- }
-
- /** Resolve the default value expression for a column not populated from
source data. */
- private def resolveDefaultColumn(field: DataField, colName: String): Column
= {
- val defaultVal = field.defaultValue()
- if (defaultVal != null) {
- val sparkType =
- org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
- try {
- val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal)
-
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
- } catch {
- case e: Exception =>
- logWarning(
- s"Failed to parse default value '$defaultVal' for column
'$colName': " +
- s"${e.getMessage}. Using null instead.")
- lit(null).cast(sparkType).as(colName)
- }
- } else {
- lit(null).as(colName)
- }
+ readerOptions: Map[String, String]): Seq[InternalRow] = {
+ val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+ val countsPerFile =
+ importAbort(filePaths, targetColumns, writableColumns, fields,
readerOptions, tableName)
+
+ CopyIntoResultBuilder.recordHistoryAndBuildResultsDirect(
+ paimonTable,
+ filesToLoad,
+ skippedFiles,
+ countsPerFile)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
index dd85af058d..fdf76b8dc6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
@@ -18,7 +18,9 @@
package org.apache.paimon.spark.execution
+import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.functions.col
object CopyIntoUtils {
@@ -28,4 +30,17 @@ object CopyIntoUtils {
Seq(ident.name())
parts.filter(_.nonEmpty).map(p => s"`${p.replace("`",
"``")}`").mkString(".")
}
+
+ def extractBaseName(fullPath: String): String = {
+ fullPath.substring(fullPath.lastIndexOf('/') + 1)
+ }
+
+ /** Count rows per file, keyed by base file name. */
+ def countPerFile(df: DataFrame, fileCol: String): Map[String, Long] = {
+ df.groupBy(col(fileCol))
+ .count()
+ .collect()
+ .map(row => extractBaseName(row.getString(0)) -> row.getLong(1))
+ .toMap
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 76bd6beb15..6870ce8832 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -149,7 +149,7 @@ case class PaimonStrategy(spark: SparkSession)
partitionPredicate: Option[PartitionPredicate]) =>
TruncatePaimonTableWithFilterExec(table, partitionPredicate) :: Nil
- case c @ CopyIntoTableCommand(PaimonCatalogAndIdentifier(catalog, ident),
_, _, _, _, _) =>
+ case c @ CopyIntoTableCommand(PaimonCatalogAndIdentifier(catalog, ident),
_, _, _, _, _, _) =>
CopyIntoTableExec(
spark,
catalog,
@@ -159,6 +159,7 @@ case class PaimonStrategy(spark: SparkSession)
c.fileFormat,
c.pattern,
c.force,
+ c.onError,
c.output) :: Nil
case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog,
ident), _, _) =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 3171190915..4fd533f401 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -78,6 +78,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val
delegate: ParserInterf
PaimonSqlExtensionsParser.FORCE,
PaimonSqlExtensionsParser.ON_ERROR,
PaimonSqlExtensionsParser.ABORT_STATEMENT,
+ PaimonSqlExtensionsParser.CONTINUE,
+ PaimonSqlExtensionsParser.SKIP_FILE,
PaimonSqlExtensionsParser.OVERWRITE,
PaimonSqlExtensionsParser.CSV
)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index da716ced11..0d54c698e6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -172,7 +172,15 @@ class PaimonSqlExtensionsAstBuilder(delegate:
ParserInterface)
val fileFormat = buildFileFormat(ctx.fileFormatClause())
val pattern = Option(ctx.patternClause()).map(p =>
unquoteString(p.STRING().getText))
val force = Option(ctx.forceClause()).exists(_.booleanValue().TRUE() !=
null)
- logical.CopyIntoTableCommand(table, columns, sourcePath, fileFormat,
pattern, force)
+ val onError = Option(ctx.onErrorClause())
+ .map {
+ clause =>
+ if (clause.CONTINUE() != null) OnErrorMode.Continue
+ else if (clause.SKIP_FILE() != null) OnErrorMode.SkipFile
+ else OnErrorMode.AbortStatement
+ }
+ .getOrElse(OnErrorMode.AbortStatement)
+ logical.CopyIntoTableCommand(table, columns, sourcePath, fileFormat,
pattern, force, onError)
}
/** Create a COPY INTO LOCATION (export) logical command. */
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoCastValidatorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoCastValidatorTest.scala
new file mode 100644
index 0000000000..ab006e2570
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoCastValidatorTest.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.{DataField, DataTypes}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+
+class CopyIntoCastValidatorTest extends PaimonSparkTestBase {
+
+ private lazy val validator = new CopyIntoCastValidator(spark)
+
+ test("buildParquetCastValidation: no validation needed when all columns are
compatible") {
+ val schema = StructType(Seq(StructField("id", IntegerType),
StructField("name", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val setup = validator.buildParquetCastValidation(df, targetColumns,
writableColumns, fields)
+ assert(setup.castColMapping.nonEmpty)
+ assert(setup.badCastFilter.isDefined)
+ }
+
+ test("buildParquetCastValidation: exclude specified columns") {
+ val schema = StructType(
+ Seq(
+ StructField("id", IntegerType),
+ StructField("name", StringType),
+ StructField("__file__", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, "Alice", "file1.csv"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val setup = validator.buildParquetCastValidation(
+ df,
+ targetColumns,
+ writableColumns,
+ fields,
+ excludeCols = Set("__file__"))
+ assert(!setup.castColMapping.contains("__file__"))
+ }
+
+ test("buildTextCastValidation: no validation for string columns") {
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("name", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "Alice"), Row("2", "Bob"))),
+ schema)
+
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.STRING()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val setup = validator.buildTextCastValidation(df, writableColumns, fields)
+ assert(setup.castColMapping.isEmpty)
+ assert(setup.badCastFilter.isEmpty)
+ }
+
+ test("buildTextCastValidation: add validation columns for non-string types")
{
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("age", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "30"), Row("2", "25"))),
+ schema)
+
+ val writableColumns = Seq("id", "age")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "age", DataTypes.INT())
+ )
+
+ val setup = validator.buildTextCastValidation(df, writableColumns, fields)
+ assert(setup.castColMapping.size == 2)
+ assert(setup.badCastFilter.isDefined)
+ assert(setup.castColMapping.contains("id"))
+ assert(setup.castColMapping.contains("age"))
+ }
+
+ test("validateParquetCast: pass when all casts are valid") {
+ val schema = StructType(Seq(StructField("id", IntegerType),
StructField("name", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ // Should not throw
+ validator.validateParquetCast(df, targetColumns, writableColumns, fields)
+ }
+
+ test("validateParquetCast: throw exception when cast fails") {
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("name", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("abc", "Alice"), Row("2",
"Bob"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val exception = intercept[IllegalArgumentException] {
+ validator.validateParquetCast(df, targetColumns, writableColumns, fields)
+ }
+ assert(exception.getMessage.contains("ON_ERROR = ABORT_STATEMENT"))
+ assert(exception.getMessage.contains("Cast failure"))
+ }
+
+ test("castColumns: cast all columns to target types") {
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("age", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "30"), Row("2", "25"))),
+ schema)
+
+ val writableColumns = Seq("id", "age")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "age", DataTypes.INT())
+ )
+
+ val castedDf = validator.castColumns(df, writableColumns, fields)
+ assert(castedDf.schema("id").dataType == IntegerType)
+ assert(castedDf.schema("age").dataType == IntegerType)
+
+ val rows = castedDf.collect()
+ assert(rows(0).getInt(0) == 1)
+ assert(rows(0).getInt(1) == 30)
+ }
+
+ test("castAndValidate: pass when all casts are valid") {
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("age", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "30"), Row("2", "25"))),
+ schema)
+
+ val writableColumns = Seq("id", "age")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "age", DataTypes.INT())
+ )
+
+ val castedDf = validator.castAndValidate(df, writableColumns, fields)
+ assert(castedDf.schema("id").dataType == IntegerType)
+ assert(castedDf.schema("age").dataType == IntegerType)
+ }
+
+ test("castAndValidate: throw exception when cast fails") {
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("age", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "invalid"), Row("2", "25"))),
+ schema)
+
+ val writableColumns = Seq("id", "age")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "age", DataTypes.INT())
+ )
+
+ val exception = intercept[IllegalArgumentException] {
+ validator.castAndValidate(df, writableColumns, fields)
+ }
+ assert(exception.getMessage.contains("ON_ERROR = ABORT_STATEMENT"))
+ assert(exception.getMessage.contains("Cast failure"))
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilderTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilderTest.scala
new file mode 100644
index 0000000000..542932b53c
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilderTest.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat,
FileFormatType}
+import org.apache.paimon.types.{DataField, DataTypes}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+
+class CopyIntoDataFrameBuilderTest extends PaimonSparkTestBase {
+
+ private def createBuilder(
+ formatType: FileFormatType,
+ columns: Option[Seq[String]] = None): CopyIntoDataFrameBuilder = {
+ val fileFormat = new CopyFileFormat(formatType = formatType, options =
Map.empty)
+ new CopyIntoDataFrameBuilder(spark, fileFormat, columns)
+ }
+
+ test("buildStringSchema: CSV format with positional columns") {
+ val builder = createBuilder(FileFormatType.CSV)
+ val targetColumns = Seq("id", "name", "age")
+ val schema = builder.buildStringSchema(targetColumns)
+
+ assert(schema.fields.length == 3)
+ assert(schema.fields(0).name == "_c0")
+ assert(schema.fields(1).name == "_c1")
+ assert(schema.fields(2).name == "_c2")
+ assert(schema.fields.forall(_.dataType == StringType))
+ }
+
+ test("buildStringSchema: JSON format with named columns") {
+ val builder = createBuilder(FileFormatType.JSON)
+ val targetColumns = Seq("id", "name", "age")
+ val schema = builder.buildStringSchema(targetColumns)
+
+ assert(schema.fields.length == 3)
+ assert(schema.fields(0).name == "id")
+ assert(schema.fields(1).name == "name")
+ assert(schema.fields(2).name == "age")
+ assert(schema.fields.forall(_.dataType == StringType))
+ }
+
+ test("buildParquetDataFrame: map source columns to target by name") {
+ val builder = createBuilder(FileFormatType.PARQUET)
+ val schema = StructType(Seq(StructField("id", IntegerType),
StructField("name", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val result = builder.buildParquetDataFrame(df, targetColumns,
writableColumns, fields)
+ assert(result.columns.toSeq == Seq("id", "name"))
+ assert(result.count() == 2)
+ }
+
+ test("buildParquetDataFrame: fill NULL for missing source columns") {
+ val builder = createBuilder(FileFormatType.PARQUET)
+ val schema = StructType(Seq(StructField("id", IntegerType)))
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(1),
Row(2))), schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val result = builder.buildParquetDataFrame(df, targetColumns,
writableColumns, fields)
+ assert(result.columns.toSeq == Seq("id", "name"))
+ val rows = result.collect()
+ assert(rows(0).isNullAt(1)) // name should be NULL
+ }
+
+ test("buildParquetDataFrame: fill default value for unmapped columns") {
+ val builder = createBuilder(FileFormatType.PARQUET, Some(Seq("id")))
+ val schema = StructType(Seq(StructField("id", IntegerType)))
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(1),
Row(2))), schema)
+
+ val targetColumns = Seq("id")
+ val writableColumns = Seq("id", "status")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "status", DataTypes.STRING(), "'active'")
+ )
+
+ val result = builder.buildParquetDataFrame(df, targetColumns,
writableColumns, fields)
+ assert(result.columns.toSeq == Seq("id", "status"))
+ }
+
+ test("buildFinalDataFrame: rename CSV positional columns to named columns") {
+ val builder = createBuilder(FileFormatType.CSV, Some(Seq("id", "name")))
+ val schema = StructType(Seq(StructField("_c0", StringType),
StructField("_c1", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "Alice"), Row("2", "Bob"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val result = builder.buildFinalDataFrame(df, targetColumns,
writableColumns, fields)
+ assert(result.columns.toSeq == Seq("id", "name"))
+ }
+
+ test("buildFinalDataFrame: keep JSON named columns as-is") {
+ val builder = createBuilder(FileFormatType.JSON)
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("name", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "Alice"), Row("2", "Bob"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val result = builder.buildFinalDataFrame(df, targetColumns,
writableColumns, fields)
+ assert(result.columns.toSeq == Seq("id", "name"))
+ }
+
+ test("buildFinalDataFrame: preserve extra columns") {
+ val builder = createBuilder(FileFormatType.CSV, Some(Seq("id", "name")))
+ val schema = StructType(
+ Seq(
+ StructField("_c0", StringType),
+ StructField("_c1", StringType),
+ StructField("__file__", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("1", "Alice", "file1.csv"))),
+ schema)
+
+ val targetColumns = Seq("id", "name")
+ val writableColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING())
+ )
+
+ val result =
+ builder.buildFinalDataFrame(df, targetColumns, writableColumns, fields,
Seq("__file__"))
+ assert(result.columns.toSeq == Seq("id", "name", "__file__"))
+ }
+
+ test("applyNullTransforms: replace NULL_IF values with null") {
+ val fileFormat = new CopyFileFormat(
+ formatType = FileFormatType.CSV,
+ options = Map("NULL_IF" -> Seq("N/A",
"NULL").mkString(CopyFileFormat.LIST_SEPARATOR)))
+ val builder = new CopyIntoDataFrameBuilder(spark, fileFormat, None)
+
+ val schema = StructType(Seq(StructField("col1", StringType),
StructField("col2", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("value", "N/A"), Row("NULL",
"data"))),
+ schema)
+
+ val result = builder.applyNullTransforms(df, Seq("col1", "col2"))
+ val rows = result.collect()
+ assert(rows(0).getString(0) == "value")
+ assert(rows(0).isNullAt(1)) // "N/A" -> null
+ assert(rows(1).isNullAt(0)) // "NULL" -> null
+ assert(rows(1).getString(1) == "data")
+ }
+
+ test("applyNullTransforms: replace empty strings with null when
EMPTY_FIELD_AS_NULL is true") {
+ val fileFormat = new CopyFileFormat(
+ formatType = FileFormatType.CSV,
+ options = Map("EMPTY_FIELD_AS_NULL" -> "TRUE"))
+ val builder = new CopyIntoDataFrameBuilder(spark, fileFormat, None)
+
+ val schema = StructType(Seq(StructField("col1", StringType),
StructField("col2", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("value", ""), Row("", "data"))),
+ schema)
+
+ val result = builder.applyNullTransforms(df, Seq("col1", "col2"))
+ val rows = result.collect()
+ assert(rows(0).getString(0) == "value")
+ assert(rows(0).isNullAt(1)) // "" -> null
+ assert(rows(1).isNullAt(0)) // "" -> null
+ assert(rows(1).getString(1) == "data")
+ }
+
+ test("applyNullTransforms: apply both NULL_IF and EMPTY_FIELD_AS_NULL") {
+ val fileFormat = new CopyFileFormat(
+ formatType = FileFormatType.CSV,
+ options = Map(
+ "NULL_IF" -> Seq("N/A").mkString(CopyFileFormat.LIST_SEPARATOR),
+ "EMPTY_FIELD_AS_NULL" -> "TRUE"))
+ val builder = new CopyIntoDataFrameBuilder(spark, fileFormat, None)
+
+ val schema = StructType(Seq(StructField("col1", StringType),
StructField("col2", StringType)))
+ val df = spark.createDataFrame(
+ spark.sparkContext.parallelize(Seq(Row("N/A", ""), Row("value",
"data"))),
+ schema)
+
+ val result = builder.applyNullTransforms(df, Seq("col1", "col2"))
+ val rows = result.collect()
+ assert(rows(0).isNullAt(0)) // "N/A" -> null
+ assert(rows(0).isNullAt(1)) // "" -> null
+ assert(rows(1).getString(0) == "value")
+ assert(rows(1).getString(1) == "data")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoHelperTest.scala
new file mode 100644
index 0000000000..d09b374efb
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoHelperTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.{DataField, DataTypes}
+
+class CopyIntoHelperTest extends PaimonSparkTestBase {
+
+ test("resolveTargetColumns: use all writable columns when columns is None") {
+ val writableColumns = Seq("id", "name", "age")
+ val result = CopyIntoHelper.resolveTargetColumns(spark, None,
writableColumns)
+ assert(result == writableColumns)
+ }
+
+ test("resolveTargetColumns: resolve specified columns case-insensitively") {
+ val writableColumns = Seq("id", "name", "age")
+ val columns = Some(Seq("ID", "Name"))
+ val result = CopyIntoHelper.resolveTargetColumns(spark, columns,
writableColumns)
+ assert(result == Seq("id", "name"))
+ }
+
+ test("resolveTargetColumns: throw exception for non-existent column") {
+ val writableColumns = Seq("id", "name", "age")
+ val columns = Some(Seq("id", "invalid_col"))
+ val exception = intercept[IllegalArgumentException] {
+ CopyIntoHelper.resolveTargetColumns(spark, columns, writableColumns)
+ }
+ assert(exception.getMessage.contains("invalid_col"))
+ assert(exception.getMessage.contains("does not exist"))
+ }
+
+ test("resolveTargetColumns: throw exception for duplicate columns") {
+ val writableColumns = Seq("id", "name", "age")
+ val columns = Some(Seq("id", "ID"))
+ val exception = intercept[IllegalArgumentException] {
+ CopyIntoHelper.resolveTargetColumns(spark, columns, writableColumns)
+ }
+ assert(exception.getMessage.contains("Duplicate columns"))
+ }
+
+ test("validateNonNullableDefaults: pass when all non-nullable columns are
mapped") {
+ val writableColumns = Seq("id", "name", "age")
+ val targetColumns = Seq("id", "name", "age")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT().notNull()),
+ new DataField(1, "name", DataTypes.STRING()),
+ new DataField(2, "age", DataTypes.INT())
+ )
+ // Should not throw
+ CopyIntoHelper.validateNonNullableDefaults(
+ Some(targetColumns),
+ writableColumns,
+ targetColumns,
+ fields)
+ }
+
+ test("validateNonNullableDefaults: pass when unmapped non-nullable column
has default") {
+ val writableColumns = Seq("id", "name", "status")
+ val targetColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT().notNull()),
+ new DataField(1, "name", DataTypes.STRING()),
+ new DataField(2, "status", DataTypes.STRING().notNull(), null, "default
value")
+ )
+ // Should not throw
+ CopyIntoHelper.validateNonNullableDefaults(
+ Some(targetColumns),
+ writableColumns,
+ targetColumns,
+ fields)
+ }
+
+ test("validateNonNullableDefaults: throw when unmapped non-nullable column
has no default") {
+ val writableColumns = Seq("id", "name", "status")
+ val targetColumns = Seq("id", "name")
+ val fields = Seq(
+ new DataField(0, "id", DataTypes.INT().notNull()),
+ new DataField(1, "name", DataTypes.STRING()),
+ new DataField(2, "status", DataTypes.STRING().notNull())
+ )
+ val exception = intercept[IllegalArgumentException] {
+ CopyIntoHelper.validateNonNullableDefaults(
+ Some(targetColumns),
+ writableColumns,
+ targetColumns,
+ fields)
+ }
+ assert(exception.getMessage.contains("status"))
+ assert(exception.getMessage.contains("not in the column list"))
+ assert(exception.getMessage.contains("no default value"))
+ }
+
+ test("safeTempCol: generate unique column name") {
+ val existingColumns = Set("col1", "col2", "__temp")
+ val result = CopyIntoHelper.safeTempCol(spark, "__new", existingColumns)
+ assert(result == "__new")
+ }
+
+ test("safeTempCol: add prefix when name conflicts") {
+ val existingColumns = Set("col1", "col2", "__temp")
+ val result = CopyIntoHelper.safeTempCol(spark, "__temp", existingColumns)
+ assert(result == "___temp")
+ }
+
+ test("safeTempCol: handle case-insensitive conflicts") {
+ val existingColumns = Set("Col1", "COL2")
+ val result = CopyIntoHelper.safeTempCol(spark, "col1", existingColumns)
+ assert(result == "_col1")
+ }
+
+ test("safeTempCol: add multiple prefixes for multiple conflicts") {
+ val existingColumns = Set("__temp", "___temp")
+ val result = CopyIntoHelper.safeTempCol(spark, "__temp", existingColumns)
+ assert(result == "____temp")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
index 03e109919b..c4eb2cd644 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
@@ -129,6 +129,16 @@ class CatalogQualifiedCreateTableLikeTest extends
PaimonSparkTestBase {
Assertions.assertEquals("tag",
nonReservedIdentifierCommand.targetIdent.name())
Assertions.assertEquals(Seq("test"),
nonReservedIdentifierCommand.targetIdent.namespace().toSeq)
+ val continueCommand =
+ parseCreateTableLikeCommand("CREATE TABLE paimon.test.continue LIKE
paimon.test.source_tbl")
+ Assertions.assertEquals("continue", continueCommand.targetIdent.name())
+ Assertions.assertEquals(Seq("test"),
continueCommand.targetIdent.namespace().toSeq)
+
+ val skipFileCommand =
+ parseCreateTableLikeCommand("CREATE TABLE paimon.test.skip_file LIKE
paimon.test.source_tbl")
+ Assertions.assertEquals("skip_file", skipFileCommand.targetIdent.name())
+ Assertions.assertEquals(Seq("test"),
skipFileCommand.targetIdent.namespace().toSeq)
+
val nestedIdentifierCommand =
parseCreateTableLikeCommand(
"CREATE TABLE paimon.test.extra.target_tbl LIKE
paimon.test.extra.source_tbl")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoOnErrorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoOnErrorTest.scala
new file mode 100644
index 0000000000..11942c7dfd
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoOnErrorTest.scala
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.sql.Row
+
+trait CopyIntoOnErrorTest { self: CopyIntoTestBase =>
+
+ test("COPY INTO: ON_ERROR = CONTINUE skips bad CSV rows") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_csv")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_csv (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\nabc,Bob\n3,Charlie\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_csv
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(4) > 0)
+ assert(rows(0).getString(1) == "PARTIALLY_LOADED")
+
+ val data = spark.sql(s"SELECT * FROM $dbName0.copy_continue_csv ORDER
BY id").collect()
+ assert(
+ data.length == 2,
+ s"Expected 2 rows but got ${data.length} — bad row should NOT be in
table")
+ assert(data(0).getInt(0) == 1 && data(0).getString(1) == "Alice")
+ assert(data(1).getInt(0) == 3 && data(1).getString(1) == "Charlie")
+ assert(
+ !data.exists(r => r.getString(1) == "Bob"),
+ "Bad row with 'abc' as id should not be in the table")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_csv")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE with no errors behaves like ABORT") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_ok")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_ok (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_ok
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getString(1) == "LOADED")
+ assert(rows(0).getLong(4) == 0L)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_continue_ok ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_ok")
+ }
+
+ test("COPY INTO: ON_ERROR = SKIP_FILE skips file with bad data") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile")
+ spark.sql(s"CREATE TABLE $dbName0.copy_skipfile (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "good.csv", "1,Alice\n2,Bob\n")
+ createCsvFile(dir, "bad.csv", "abc,Charlie\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = SKIP_FILE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ val loaded = rows.filter(_.getString(1) == "LOADED")
+ val failed = rows.filter(_.getString(1) == "LOAD_FAILED")
+ assert(loaded.length == 1)
+ assert(failed.length == 1)
+ assert(failed(0).getLong(4) >= 1L)
+ assert(failed(0).getString(5) != null)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_skipfile ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile")
+ }
+
+ test("COPY INTO: ON_ERROR = SKIP_FILE discards good rows in a file that also
has bad rows") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_mixed")
+ spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_mixed (id INT, name
STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "good.csv", "1,Alice\n2,Bob\n")
+ // 2 good rows (Carol, Eve) and 1 bad row (non-numeric id)
+ createCsvFile(dir, "mixed.csv", "3,Carol\nabc,Dave\n4,Eve\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_mixed
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = SKIP_FILE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.count(_.getString(1) == "LOADED") == 1)
+ assert(rows.count(_.getString(1) == "LOAD_FAILED") == 1)
+
+ // The whole mixed.csv is rejected: its good rows (Carol, Eve) must
not be loaded.
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_mixed ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_mixed")
+ }
+
+ test("COPY INTO: ON_ERROR = SKIP_FILE with all good files") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_ok")
+ spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_ok (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "a.csv", "1,Alice\n")
+ createCsvFile(dir, "b.csv", "2,Bob\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_ok
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = SKIP_FILE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.forall(_.getString(1) == "LOADED"))
+ assert(rows.length == 2)
+
+ assert(spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_ok").count()
== 2)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_ok")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE with JSON malformed records") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_json (id INT, name
STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id": "1", "name": "Alice"}
+ |{bad json here}
+ |{"id": "3", "name": "Charlie"}
+ |""".stripMargin
+ )
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_json
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows(0).getLong(4) > 0)
+
+ val data = spark.sql(s"SELECT * FROM $dbName0.copy_continue_json ORDER
BY id").collect()
+ assert(
+ data.length == 2,
+ s"Expected 2 rows but got ${data.length} — malformed JSON should NOT
be in table")
+ assert(data(0).getInt(0) == 1)
+ assert(data(1).getInt(0) == 3)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json")
+ }
+
+ test("COPY INTO: ON_ERROR = SKIP_FILE with JSON") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_json")
+ spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_json (id INT, name
STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(dir, "good.json", """{"id": "1", "name": "Alice"}""" +
"\n")
+ createJsonFile(dir, "bad.json", "{bad json}\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_json
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |ON_ERROR = SKIP_FILE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ val loaded = rows.filter(_.getString(1) == "LOADED")
+ val failed = rows.filter(_.getString(1) == "LOAD_FAILED")
+ assert(loaded.length == 1)
+ assert(failed.length == 1)
+
+ checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_json"),
Seq(Row(1, "Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_json")
+ }
+
+ test("COPY INTO: ABORT_STATEMENT still fails on bad data") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_abort_explicit")
+ spark.sql(s"CREATE TABLE $dbName0.copy_abort_explicit (id INT, name
STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "abc,Alice\n")
+
+ val e = intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_abort_explicit
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = ABORT_STATEMENT
+ |""".stripMargin)
+ }
+ assert(
+ e.getMessage.contains("Cast failure") ||
+ e.getMessage.contains("ABORT_STATEMENT") ||
+ e.getMessage.contains("CAST_INVALID_INPUT") ||
+ e.getCause != null)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_abort_explicit")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE with Parquet cast errors") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_pq")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_pq (id INT, name STRING)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ val schema = StructType(Seq(StructField("id", StringType),
StructField("name", StringType)))
+ createParquetSingleFile(
+ dir,
+ "data.parquet",
+ Seq(Row("1", "Alice"), Row("abc", "Bad"), Row("3", "Charlie")),
+ schema)
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_pq
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(4) > 0)
+
+ val data = spark.sql(s"SELECT * FROM $dbName0.copy_continue_pq ORDER
BY id").collect()
+ assert(
+ data.length == 2,
+ s"Expected 2 rows but got ${data.length} — bad row should NOT be in
table")
+ assert(data(0).getInt(0) == 1)
+ assert(data(1).getInt(0) == 3)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_pq")
+ }
+
+ test("COPY INTO: ON_ERROR = SKIP_FILE with Parquet") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_pq")
+ spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_pq (id INT, name STRING)")
+
+ withParquetDir {
+ dir =>
+ import org.apache.spark.sql.types._
+ // Both files share the same physical schema (id STRING) so the
directory read succeeds;
+ // good rows cast cleanly to the target INT column while bad.parquet's
"abc" fails the cast,
+ // which is what SKIP_FILE must catch. Using a mixed INT/STRING footer
here would instead
+ // crash Spark's vectorized Parquet reader before any cast validation
runs.
+ val schema =
+ StructType(Seq(StructField("id", StringType), StructField("name",
StringType)))
+ createParquetSingleFile(
+ dir,
+ "good.parquet",
+ Seq(Row("1", "Alice"), Row("2", "Bob")),
+ schema)
+
+ createParquetSingleFile(dir, "bad.parquet", Seq(Row("abc", "Bad")),
schema)
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_pq
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |ON_ERROR = SKIP_FILE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ val loaded = rows.filter(_.getString(1) == "LOADED")
+ val failed = rows.filter(_.getString(1) == "LOAD_FAILED")
+ assert(loaded.length == 1)
+ assert(failed.length == 1)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_pq ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_pq")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE clean file has no first_error") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_multi")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_multi (id INT, name
STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "good.csv", "1,Alice\n2,Bob\n")
+ createCsvFile(dir, "bad.csv", "abc,Charlie\n3,Dave\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_multi
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ val goodFile = rows.find(_.getString(0) == "good.csv").get
+ val badFile = rows.find(_.getString(0) == "bad.csv").get
+
+ assert(goodFile.getString(1) == "LOADED")
+ assert(goodFile.getLong(4) == 0L)
+ assert(goodFile.isNullAt(5), "Clean file should have null first_error")
+
+ assert(badFile.getString(1) == "PARTIALLY_LOADED")
+ assert(badFile.getLong(4) > 0)
+ assert(!badFile.isNullAt(5), "Bad file should have non-null
first_error")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_multi")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE with JSON cast errors reports
correctly") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json_cast")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_json_cast (id INT, name
STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id": "1", "name": "Alice"}
+ |{"id": "abc", "name": "Bad"}
+ |{"id": "3", "name": "Charlie"}
+ |""".stripMargin
+ )
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_json_cast
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(4) > 0, "Should report cast errors")
+ assert(
+ rows(0).getString(1) == "PARTIALLY_LOADED",
+ s"Expected PARTIALLY_LOADED but got ${rows(0).getString(1)}")
+
+ val data =
+ spark.sql(s"SELECT * FROM $dbName0.copy_continue_json_cast ORDER BY
id").collect()
+ assert(data.length == 2, s"Expected 2 rows but got ${data.length}")
+ assert(data(0).getInt(0) == 1)
+ assert(data(1).getInt(0) == 3)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json_cast")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE all rows fail reports LOAD_FAILED") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_allfail")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_allfail (id INT, name
STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "allfail.csv", "abc,Alice\nxyz,Bob\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_allfail
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(
+ rows(0).getString(1) == "LOAD_FAILED",
+ s"Expected LOAD_FAILED when all rows fail, got
${rows(0).getString(1)}")
+ assert(rows(0).getLong(2) == 0L, "rows_loaded should be 0")
+ assert(rows(0).getLong(3) == 2L, "rows_parsed should be 2")
+ assert(rows(0).getLong(4) == 2L, "errors_seen should be 2")
+
+ assert(
+ spark.sql(s"SELECT * FROM $dbName0.copy_continue_allfail").count()
== 0,
+ "No rows should be in table")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_allfail")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE all rows fail does not block re-run") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_retry")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_retry (id INT, name
STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "abc,Alice\n")
+
+ // First run: all rows fail
+ val result1 = spark.sql(s"""COPY INTO $dbName0.copy_continue_retry
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+ val rows1 = result1.collect()
+ assert(rows1(0).getString(1) == "LOAD_FAILED")
+ assert(spark.sql(s"SELECT * FROM
$dbName0.copy_continue_retry").count() == 0)
+
+ // Re-run without FORCE: file should NOT be skipped since 0 rows were
loaded
+ val result2 = spark.sql(s"""COPY INTO $dbName0.copy_continue_retry
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+ val rows2 = result2.collect()
+ assert(
+ rows2(0).getString(1) != "SKIPPED",
+ "File with 0 rows loaded should not be skipped on re-run")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_retry")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE skips CSV rows with fewer columns") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_fewer_cols")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_fewer_cols (id INT, name
STRING, age INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n2,Bob,20\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_fewer_cols
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ // A row with fewer columns than the target schema is a malformed
record
+ // (Spark CSV PERMISSIVE mode), so CONTINUE skips it and keeps the
well-formed row.
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getString(1) == "PARTIALLY_LOADED")
+ assert(rows(0).getLong(2) == 1L, "rows_loaded should be 1")
+ assert(rows(0).getLong(4) == 1L, "errors_seen should be 1")
+
+ val data =
+ spark.sql(s"SELECT * FROM $dbName0.copy_continue_fewer_cols ORDER BY
id").collect()
+ assert(data.length == 1)
+ assert(data(0).getInt(0) == 2 && data(0).getString(1) == "Bob" &&
data(0).getInt(2) == 20)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_fewer_cols")
+ }
+
+ test("COPY INTO: ON_ERROR = CONTINUE skips CSV rows with extra columns") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_extra_cols")
+ spark.sql(s"CREATE TABLE $dbName0.copy_continue_extra_cols (id INT, name
STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice,extra\n2,Bob\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_extra_cols
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |ON_ERROR = CONTINUE
+ |""".stripMargin)
+
+ // A row with more columns than the target schema is a malformed record
+ // (Spark CSV PERMISSIVE mode), so CONTINUE skips it and keeps the
well-formed row.
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getString(1) == "PARTIALLY_LOADED")
+ assert(rows(0).getLong(2) == 1L, "rows_loaded should be 1")
+ assert(rows(0).getLong(4) == 1L, "errors_seen should be 1")
+
+ val data =
+ spark.sql(s"SELECT * FROM $dbName0.copy_continue_extra_cols ORDER BY
id").collect()
+ assert(data.length == 1)
+ assert(data(0).getInt(0) == 2 && data(0).getString(1) == "Bob")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_extra_cols")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index e0970362a0..23365e35e6 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -27,7 +27,7 @@ import java.nio.file.Files
class CopyIntoTestBase extends PaimonSparkTestBase {
- private def createCsvFile(dir: File, name: String, content: String): File = {
+ protected def createCsvFile(dir: File, name: String, content: String): File
= {
val file = new File(dir, name)
val writer = new PrintWriter(file)
try writer.write(content)
@@ -35,13 +35,13 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
file
}
- private def withCsvDir(testBody: File => Unit): Unit = {
+ protected def withCsvDir(testBody: File => Unit): Unit = {
val dir = Files.createTempDirectory("copy_into_test").toFile
try testBody(dir)
finally deleteRecursively(dir)
}
- private def deleteRecursively(file: File): Unit = {
+ protected def deleteRecursively(file: File): Unit = {
if (file.isDirectory) {
file.listFiles().foreach(deleteRecursively)
}
@@ -648,7 +648,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
}
- private def createJsonFile(dir: File, name: String, content: String): File =
{
+ protected def createJsonFile(dir: File, name: String, content: String): File
= {
val file = new File(dir, name)
val writer = new PrintWriter(file)
try writer.write(content)
@@ -656,7 +656,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
file
}
- private def withJsonDir(testBody: File => Unit): Unit = {
+ protected def withJsonDir(testBody: File => Unit): Unit = {
val dir = Files.createTempDirectory("copy_into_json_test").toFile
try testBody(dir)
finally deleteRecursively(dir)
@@ -1086,7 +1086,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
// ========== Parquet Tests ==========
- private def withParquetDir(testBody: File => Unit): Unit = {
+ protected def withParquetDir(testBody: File => Unit): Unit = {
val dir = Files.createTempDirectory("copy_into_parquet_test").toFile
try testBody(dir)
finally deleteRecursively(dir)
@@ -1101,7 +1101,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
df.coalesce(1).write.parquet(new File(dir, name).getAbsolutePath)
}
- private def createParquetSingleFile(
+ protected def createParquetSingleFile(
dir: File,
fileName: String,
data: Seq[Row],