This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5adb6c9d4c [spark] Support ON_ERROR = CONTINUE / SKIP_FILE in COPY 
INTO (#8062)
5adb6c9d4c is described below

commit 5adb6c9d4cb7dfa102c4dd3da2d0e5de0674d75e
Author: Junrui Lee <[email protected]>
AuthorDate: Tue Jun 2 23:27:27 2026 +0800

    [spark] Support ON_ERROR = CONTINUE / SKIP_FILE in COPY INTO (#8062)
    
    This is part of #8005.
    
    COPY INTO previously only supported `ON_ERROR = ABORT_STATEMENT`: any
    parse or
    cast error aborted the entire command. In production data-loading
    pipelines a
    single malformed row or file would then fail the whole batch, which is
    often
    too strict. This adds two error-tolerant modes:
    
    - `CONTINUE` — skip bad rows and load the rest (row-level tolerance).
    - `SKIP_FILE` — skip any file that contains an error, all-or-nothing per
    file.
    
    `ABORT_STATEMENT` remains the default, so existing behavior is
    unchanged.
---
 docs/docs/spark/sql-write.md                       |  18 +-
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |   2 +-
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |   2 +-
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |   2 +-
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |   2 +-
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |   2 +-
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |   2 +-
 .../PaimonSqlExtensions.g4                         |   6 +-
 .../plans/logical/CopyIntoTableCommand.scala       |   7 +-
 .../spark/catalyst/plans/logical/CopyOptions.scala |   8 +
 .../spark/copyinto/CopyIntoResultBuilder.scala     | 145 +++++
 .../spark/execution/CopyIntoCastValidator.scala    | 196 +++++++
 .../spark/execution/CopyIntoDataFrameBuilder.scala | 187 +++++++
 .../spark/execution/CopyIntoErrorHandler.scala     | 276 ++++++++++
 .../paimon/spark/execution/CopyIntoHelper.scala    | 138 +++++
 .../paimon/spark/execution/CopyIntoTableExec.scala | 601 ++++++++-------------
 .../paimon/spark/execution/CopyIntoUtils.scala     |  15 +
 .../paimon/spark/execution/PaimonStrategy.scala    |   3 +-
 .../AbstractPaimonSparkSqlExtensionsParser.scala   |   2 +
 .../extensions/PaimonSqlExtensionsAstBuilder.scala |  10 +-
 .../execution/CopyIntoCastValidatorTest.scala      | 204 +++++++
 .../execution/CopyIntoDataFrameBuilderTest.scala   | 231 ++++++++
 .../spark/execution/CopyIntoHelperTest.scala       | 133 +++++
 .../sql/CatalogQualifiedCreateTableLikeTest.scala  |  10 +
 .../paimon/spark/sql/CopyIntoOnErrorTest.scala     | 527 ++++++++++++++++++
 .../apache/paimon/spark/sql/CopyIntoTestBase.scala |  14 +-
 26 files changed, 2340 insertions(+), 403 deletions(-)

diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 83d26da622..703976857a 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -325,6 +325,10 @@ Notes:
 
 `COPY INTO` provides a SQL command for bulk loading data files into Paimon 
tables and exporting table data to files. Supported formats: **CSV**, **JSON**, 
and **Parquet**.
 
+:::info
+**SQL dialect:** Paimon's `COPY INTO` is a Snowflake-style extension 
(`FILE_FORMAT = (TYPE = ...)`, `PATTERN`, `FORCE`, `ON_ERROR`), not the 
Databricks `COPY INTO` form (`FILEFORMAT` + `FORMAT_OPTIONS (...)` / 
`COPY_OPTIONS (...)`). It implements only a subset of the Snowflake syntax. In 
particular, `ON_ERROR` supports `ABORT_STATEMENT` (default), `CONTINUE`, and 
`SKIP_FILE`; the Snowflake variants `SKIP_FILE_<num>` and `SKIP_FILE_<num>%` 
are **not** supported.
+:::
+
 #### CSV Import
 
 ```sql
@@ -333,7 +337,7 @@ FROM 'source_path'
 FILE_FORMAT = (TYPE = CSV [, option = value, ...])
 [PATTERN = 'regex']
 [FORCE = TRUE|FALSE]
-[ON_ERROR = ABORT_STATEMENT]
+[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]
 ```
 
 **Basic import:**
@@ -371,7 +375,7 @@ FROM 'source_path'
 FILE_FORMAT = (TYPE = JSON [, option = value, ...])
 [PATTERN = 'regex']
 [FORCE = TRUE|FALSE]
-[ON_ERROR = ABORT_STATEMENT]
+[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]
 ```
 
 **Basic import:**
@@ -400,7 +404,7 @@ FROM 'source_path'
 FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
 [PATTERN = 'regex']
 [FORCE = TRUE|FALSE]
-[ON_ERROR = ABORT_STATEMENT]
+[ON_ERROR = { ABORT_STATEMENT | CONTINUE | SKIP_FILE }]
 ```
 
 **Basic import:**
@@ -553,7 +557,7 @@ OVERWRITE = TRUE;
 |--------|-------------|---------|
 | PATTERN | Regex to filter source files by base file name. Only matching 
files are loaded. | (all files) |
 | FORCE | `FALSE`: skip files already loaded (idempotent). `TRUE`: reload all 
files. | `FALSE` |
-| ON_ERROR | Error handling strategy. Only `ABORT_STATEMENT` is supported. | 
`ABORT_STATEMENT` |
+| ON_ERROR | Error handling strategy. `ABORT_STATEMENT`: abort on any error. 
`CONTINUE`: skip bad rows and continue loading. `SKIP_FILE`: skip files that 
contain errors. | `ABORT_STATEMENT` |
 
 #### File Write Options
 
@@ -592,9 +596,11 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files 
have been successfull
 | Column | Type | Description |
 |--------|------|-------------|
 | file_name | STRING | Source file name |
-| status | STRING | `LOADED` or `SKIPPED` |
+| status | STRING | `LOADED`, `PARTIALLY_LOADED`, `LOAD_FAILED`, or `SKIPPED` |
 | rows_loaded | BIGINT | Number of rows written |
 | rows_parsed | BIGINT | Number of rows parsed from the file |
+| errors_seen | BIGINT | Number of error rows (parse or cast failures) |
+| first_error | STRING | First error message encountered (NULL if no errors) |
 
 **File write** returns a single row:
 
@@ -606,9 +612,9 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files 
have been successfull
 
 #### Limitations
 
+- **CSV column-count mismatch**: Rows with fewer or more columns than the 
target schema are treated as malformed records. With `ON_ERROR = CONTINUE`, 
these rows are skipped and counted as errors.
 - Only **CSV**, **JSON**, and **Parquet** formats are supported.
 - Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not 
supported.
-- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the 
entire command.
 - `SINGLE = TRUE` (single-file output) is not supported.
 - File format options must be specified inline in `FILE_FORMAT = (...)`.
 - File listing is **non-recursive**: only direct files under the source path 
are processed. Subdirectories are ignored.
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
index d1515031e5..e7eb9a0d95 100644
--- 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
+++ 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -18,4 +18,4 @@
 
 package org.apache.paimon.spark.sql
 
-class CopyIntoTest extends CopyIntoTestBase {}
+class CopyIntoTest extends CopyIntoTestBase with CopyIntoOnErrorTest {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 1bbbca478b..620a2bb95a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++ 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -145,7 +145,7 @@ forceClause
   ;
 
 onErrorClause
-  : ON_ERROR '=' ABORT_STATEMENT
+  : ON_ERROR '=' (ABORT_STATEMENT | CONTINUE | SKIP_FILE)
   ;
 
 overwriteClause
@@ -203,7 +203,7 @@ nonReserved
     | NOT | OF | OR | TABLE | REPLACE | RETAIN | VERSION | TAG
     | TRUE | FALSE
     | MAP
-    | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | 
ABORT_STATEMENT | OVERWRITE
+    | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | 
ABORT_STATEMENT | CONTINUE | SKIP_FILE | OVERWRITE
     | CSV
     | JSON
     | PARQUET
@@ -246,6 +246,8 @@ PATTERN: 'PATTERN';
 FORCE: 'FORCE';
 ON_ERROR: 'ON_ERROR';
 ABORT_STATEMENT: 'ABORT_STATEMENT';
+CONTINUE: 'CONTINUE';
+SKIP_FILE: 'SKIP_FILE';
 OVERWRITE: 'OVERWRITE';
 CSV: 'CSV';
 JSON: 'JSON';
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
index eedad9763e..b7cc456f3b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
@@ -29,14 +29,17 @@ case class CopyIntoTableCommand(
     sourcePath: String,
     fileFormat: CopyFileFormat,
     pattern: Option[String],
-    force: Boolean)
+    force: Boolean,
+    onError: OnErrorMode = OnErrorMode.AbortStatement)
   extends PaimonLeafCommand {
 
   override def output: Seq[Attribute] = Seq(
     AttributeReference("file_name", StringType, nullable = false)(),
     AttributeReference("status", StringType, nullable = false)(),
     AttributeReference("rows_loaded", LongType, nullable = false)(),
-    AttributeReference("rows_parsed", LongType, nullable = false)()
+    AttributeReference("rows_parsed", LongType, nullable = false)(),
+    AttributeReference("errors_seen", LongType, nullable = false)(),
+    AttributeReference("first_error", StringType, nullable = true)()
   )
 
   override def simpleString(maxFields: Int): String = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
index 4730307193..3e5e35b2b7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -18,6 +18,14 @@
 
 package org.apache.paimon.spark.catalyst.plans.logical
 
+sealed trait OnErrorMode
+
+object OnErrorMode {
+  case object AbortStatement extends OnErrorMode
+  case object Continue extends OnErrorMode
+  case object SkipFile extends OnErrorMode
+}
+
 sealed trait FileFormatType
 
 object FileFormatType {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyIntoResultBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyIntoResultBuilder.scala
new file mode 100644
index 0000000000..a165a45233
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyIntoResultBuilder.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copyinto
+
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.unsafe.types.UTF8String
+
+/** Builds per-file result rows for COPY INTO commands. */
+object CopyIntoResultBuilder {
+
+  /** Build a single result row with the standard 6-column schema. */
+  def buildResultRow(
+      fileName: String,
+      status: String,
+      rowsLoaded: Long,
+      rowsParsed: Long,
+      errorsSeen: Long,
+      firstError: String): InternalRow = {
+    InternalRow(
+      UTF8String.fromString(fileName),
+      UTF8String.fromString(status),
+      rowsLoaded,
+      rowsParsed,
+      errorsSeen,
+      if (firstError != null) UTF8String.fromString(firstError) else null
+    )
+  }
+
+  /**
+   * Build per-file result rows for ON_ERROR=CONTINUE mode. Merges parse 
errors and cast errors per
+   * file, determines status (LOADED / PARTIALLY_LOADED / LOAD_FAILED), and 
records load history for
+   * files that loaded at least one row or had no errors (including empty 
files).
+   */
+  def buildContinueResults(
+      paimonTable: FileStoreTable,
+      filesToLoad: Array[FileStatus],
+      skippedFiles: Array[FileStatus],
+      totalRowsPerFile: Map[String, Long],
+      parseErrors: Map[String, Long],
+      castErrors: Map[String, Long],
+      firstParseErrorPerFile: Map[String, String],
+      firstCastErrorPerFile: Map[String, String]): Seq[InternalRow] = {
+    val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
+    val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
+    val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+    val loadedAt = System.currentTimeMillis()
+
+    val loadedResults = filesToLoad.map {
+      fileStatus =>
+        val baseName = fileStatus.getPath.getName
+        val fullPath = fileStatus.getPath.toString
+        val parsedCount = totalRowsPerFile.getOrElse(baseName, 0L)
+        val fileParseErrors = parseErrors.getOrElse(baseName, 0L)
+        val fileCastErrors = castErrors.getOrElse(baseName, 0L)
+        val totalFileErrors = fileParseErrors + fileCastErrors
+        val rowsLoaded = Math.max(0, parsedCount - totalFileErrors)
+
+        if (rowsLoaded > 0 || totalFileErrors == 0) {
+          historyManager.recordLoaded(
+            CopyLoadRecord(
+              filePath = fullPath,
+              fileSize = fileStatus.getLen,
+              lastModified = fileStatus.getModificationTime,
+              loadedAt = loadedAt,
+              snapshotId = snapshotId,
+              rowsLoaded = rowsLoaded
+            ))
+        }
+
+        val status =
+          if (rowsLoaded == 0 && totalFileErrors > 0) "LOAD_FAILED"
+          else if (totalFileErrors > 0) "PARTIALLY_LOADED"
+          else "LOADED"
+        val fileFirstError =
+          
firstParseErrorPerFile.get(baseName).orElse(firstCastErrorPerFile.get(baseName))
+        buildResultRow(
+          baseName,
+          status,
+          rowsLoaded,
+          parsedCount,
+          totalFileErrors,
+          fileFirstError.orNull)
+    }.toSeq
+
+    loadedResults ++ buildSkippedResults(skippedFiles)
+  }
+
+  /**
+   * Record load history and build results using pre-computed per-file row 
counts (ABORT mode).
+   * Avoids re-reading source files just to count rows.
+   */
+  def recordHistoryAndBuildResultsDirect(
+      paimonTable: FileStoreTable,
+      filesToLoad: Array[FileStatus],
+      skippedFiles: Array[FileStatus],
+      rowCountsPerFile: Map[String, Long]): Seq[InternalRow] = {
+    val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
+    val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
+    val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+    val loadedAt = System.currentTimeMillis()
+
+    val loadedResults = filesToLoad.map {
+      fileStatus =>
+        val baseName = fileStatus.getPath.getName
+        val rowCount = rowCountsPerFile.getOrElse(baseName, 0L)
+
+        historyManager.recordLoaded(
+          CopyLoadRecord(
+            filePath = fileStatus.getPath.toString,
+            fileSize = fileStatus.getLen,
+            lastModified = fileStatus.getModificationTime,
+            loadedAt = loadedAt,
+            snapshotId = snapshotId,
+            rowsLoaded = rowCount
+          ))
+
+        buildResultRow(baseName, "LOADED", rowCount, rowCount, 0L, null)
+    }.toSeq
+
+    loadedResults ++ buildSkippedResults(skippedFiles)
+  }
+
+  def buildSkippedResults(files: Array[FileStatus]): Seq[InternalRow] = {
+    files.map(f => buildResultRow(f.getPath.getName, "SKIPPED", 0L, 0L, 0L, 
null)).toSeq
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoCastValidator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoCastValidator.scala
new file mode 100644
index 0000000000..971b9803d4
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoCastValidator.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.types.DataField
+
+import org.apache.spark.sql.{Column, DataFrame}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{DataType, StringType}
+
+/**
+ * Handles cast validation for COPY INTO operations. Validates that source 
data can be safely cast
+ * to target table types without data loss.
+ */
+private[execution] class CopyIntoCastValidator(spark: 
org.apache.spark.sql.SparkSession) {
+
+  /**
+   * Build cast validation columns for Parquet import. For each writable 
column that exists in both
+   * targetColumns and source, adds a casted temp column for comparison. 
Returns the augmented
+   * DataFrame, the (source, cast) column pairs, and the bad-cast filter 
expression.
+   */
+  def buildParquetCastValidation(
+      rawDf: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField],
+      excludeCols: Set[String] = Set.empty): CastValidationSetup = {
+    val resolver = spark.sessionState.conf.resolver
+    val sourceColumns = rawDf.columns.toSeq.filterNot(excludeCols.contains)
+
+    val castColMapping = scala.collection.mutable.LinkedHashMap[String, 
String]()
+    var validationDf = rawDf
+    val existingCols = rawDf.columns.toSet ++ writableColumns.toSet
+    var usedCols = existingCols
+
+    writableColumns.zip(fields).foreach {
+      case (colName, field) =>
+        if (targetColumns.exists(tc => resolver(tc, colName))) {
+          sourceColumns.find(s => resolver(s, colName)).foreach {
+            srcColName =>
+              val sparkType =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+              val castColName = safeTempCol("__pq_cv_" + colName, usedCols)
+              usedCols += castColName
+              validationDf =
+                validationDf.withColumn(castColName, 
nonAnsiCast(col(srcColName), sparkType))
+              castColMapping(srcColName) = castColName
+          }
+        }
+    }
+
+    val badCastFilter = if (castColMapping.nonEmpty) {
+      Some(
+        castColMapping
+          .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
+          .reduce(_ || _))
+    } else None
+
+    CastValidationSetup(validationDf, castColMapping.toMap, badCastFilter)
+  }
+
+  /**
+   * Build cast validation columns for text-based import. For each non-string 
writable column, adds
+   * a casted temp column for comparison. Returns the augmented DataFrame, the 
column names
+   * requiring validation, a mapping from original to temp column names, and 
the bad-cast filter.
+   */
+  def buildTextCastValidation(
+      df: DataFrame,
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): CastValidationSetup = {
+    val existingCols = df.columns.toSet ++ writableColumns.toSet
+    val castColMapping = scala.collection.mutable.LinkedHashMap[String, 
String]()
+    var usedCols = existingCols
+    var validationDf = df
+
+    writableColumns.zip(fields).foreach {
+      case (colName, field) =>
+        val sparkType = 
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+        if (sparkType != StringType) {
+          val tempName = safeTempCol("__cv_" + colName, usedCols)
+          usedCols += tempName
+          castColMapping(colName) = tempName
+          validationDf = validationDf.withColumn(tempName, 
nonAnsiCast(col(colName), sparkType))
+        }
+    }
+
+    if (castColMapping.isEmpty) {
+      return CastValidationSetup(df, Map.empty, None)
+    }
+
+    val badCastFilter = castColMapping
+      .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
+      .reduce(_ || _)
+
+    CastValidationSetup(validationDf, castColMapping.toMap, 
Some(badCastFilter))
+  }
+
+  /**
+   * Validate Parquet cast and abort on first failure. Detection strategy: if 
a source value is
+   * non-null but becomes null after casting, the cast failed (e.g., a string 
"abc" cast to
+   * IntegerType → null). Aborts immediately on first failure.
+   */
+  def validateParquetCast(
+      rawDf: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): Unit = {
+    val setup = buildParquetCastValidation(rawDf, targetColumns, 
writableColumns, fields)
+    abortOnCastFailure(setup)
+  }
+
+  /** Abort on first cast failure found in the validation setup. */
+  def abortOnCastFailure(setup: CastValidationSetup): Unit = {
+    setup.badCastFilter.foreach {
+      filter =>
+        val badRows = setup.validationDf.filter(filter).limit(1).collect()
+        if (badRows.nonEmpty) {
+          val example = setup.castColMapping.find {
+            case (src, dst) =>
+              val row = badRows(0)
+              val srcIdx = setup.validationDf.schema.fieldIndex(src)
+              val dstIdx = setup.validationDf.schema.fieldIndex(dst)
+              !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+          }
+          throw new IllegalArgumentException(
+            s"ON_ERROR = ABORT_STATEMENT: Cast failure in column 
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that 
cannot be converted to the target type.")
+        }
+    }
+  }
+
+  /**
+   * Cast all writable columns to their target Paimon types and validate. Used 
for text-based ABORT
+   * mode.
+   */
+  def castAndValidate(
+      finalDf: DataFrame,
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): DataFrame = {
+    val castedDf = castColumns(finalDf, writableColumns, fields)
+
+    val setup = buildTextCastValidation(finalDf, writableColumns, fields)
+    abortOnCastFailure(setup)
+
+    castedDf
+  }
+
+  /** Cast all writable columns to their target Paimon types. */
+  def castColumns(
+      df: DataFrame,
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): DataFrame = {
+    writableColumns.zip(fields).foldLeft(df) {
+      case (d, (colName, field)) =>
+        val sparkType = 
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+        d.withColumn(colName, col(colName).cast(sparkType))
+    }
+  }
+
+  private def safeTempCol(baseName: String, existingColumns: Set[String]): 
String =
+    CopyIntoHelper.safeTempCol(spark, baseName, existingColumns)
+
+  /**
+   * Cast a column with ANSI disabled so a failed cast yields NULL instead of 
throwing. This is the
+   * basis for bad-row detection: a source value that is non-null but becomes 
null after casting
+   * could not be converted to the target type. Under Spark's default ANSI 
mode a plain `.cast`
+   * would raise `CAST_INVALID_INPUT` before any filtering could run.
+   */
+  private def nonAnsiCast(column: Column, dataType: DataType): Column = {
+    val expr = SparkShimLoader.shim.classicApi.expression(spark, column)
+    SparkShimLoader.shim.classicApi.column(Compatibility.cast(expr, dataType, 
ansiEnabled = false))
+  }
+}
+
+/** Unified cast validation setup for both Parquet and text (CSV/JSON) paths. 
*/
+private[execution] case class CastValidationSetup(
+    validationDf: DataFrame,
+    castColMapping: Map[String, String],
+    badCastFilter: Option[org.apache.spark.sql.Column])
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilder.scala
new file mode 100644
index 0000000000..e44ba8c63a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilder.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat, 
FileFormatType}
+import org.apache.paimon.types.DataField
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.functions.{col, lit, when}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+/**
+ * Handles DataFrame construction and transformation for COPY INTO operations. 
Responsible for
+ * building DataFrames from source files, applying transformations, and 
preparing data for writing
+ * to Paimon tables.
+ */
+private[execution] class CopyIntoDataFrameBuilder(
+    spark: SparkSession,
+    fileFormat: CopyFileFormat,
+    columns: Option[Seq[String]])
+  extends Logging {
+
+  /**
+   * Build the projection DataFrame for Parquet import. Maps source columns to 
target table columns
+   * by name (case-insensitive). For each writable column:
+   *   - If it's in targetColumns AND exists in source: cast source column to 
target type
+   *   - If it's in targetColumns but missing from source: fill with NULL
+   *   - If it's NOT in targetColumns (unmapped): fill with default value or 
NULL
+   */
+  def buildParquetDataFrame(
+      rawDf: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): DataFrame = {
+    val resolver = spark.sessionState.conf.resolver
+    val sourceColumns = rawDf.columns.toSeq
+
+    val selectExprs: Seq[Column] = writableColumns.map {
+      colName =>
+        if (targetColumns.exists(tc => resolver(tc, colName))) {
+          val srcCol = sourceColumns.find(s => resolver(s, colName))
+          srcCol match {
+            case Some(s) =>
+              val field = fields.find(_.name() == colName).get
+              val sparkType =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+              col(s).cast(sparkType).as(colName)
+            case None =>
+              val field = fields.find(_.name() == colName).get
+              val sparkType =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+              lit(null).cast(sparkType).as(colName)
+          }
+        } else {
+          resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+        }
+    }
+    rawDf.select(selectExprs: _*)
+  }
+
+  /**
+   * Build the final DataFrame for text-based import. Handles column renaming 
(CSV positional →
+   * named), default value filling for unmapped columns, and preserves extra 
columns like file name.
+   */
+  def buildFinalDataFrame(
+      sourceDf: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField],
+      extraCols: Seq[String] = Seq.empty): DataFrame = {
+    val renamedDf = fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        sourceDf
+      case _ =>
+        targetColumns.zipWithIndex.foldLeft(sourceDf) {
+          case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", 
targetCol)
+        }
+    }
+
+    if (columns.isDefined) {
+      val selectExprs: Seq[Column] = writableColumns.map {
+        colName =>
+          if (targetColumns.contains(colName)) {
+            col(colName)
+          } else {
+            resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+          }
+      }
+      renamedDf.select((selectExprs ++ extraCols.map(col)): _*)
+    } else {
+      renamedDf
+    }
+  }
+
+  /** Build string schema for text-based formats (CSV/JSON). */
+  def buildStringSchema(targetColumns: Seq[String]): StructType = {
+    fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        StructType(targetColumns.map(name => StructField(name, StringType, 
nullable = true)))
+      case _ =>
+        StructType(
+          (0 until targetColumns.size).map(i => StructField(s"_c$i", 
StringType, nullable = true)))
+    }
+  }
+
+  /** Read source data for text-based formats with NULL transforms applied. */
+  def readSourceData(
+      filePaths: Array[String],
+      stringSchema: StructType,
+      readerOptions: Map[String, String]): DataFrame = {
+    val df = fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        spark.read.options(readerOptions).schema(stringSchema).json(filePaths: 
_*)
+      case _ =>
+        spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: 
_*)
+    }
+
+    applyNullTransforms(df, df.columns)
+  }
+
+  /** Apply NULL_IF and EMPTY_FIELD_AS_NULL transforms to the specified 
columns. */
+  def applyNullTransforms(df: DataFrame, columns: Seq[String]): DataFrame = {
+    var result = df
+
+    val nullIfVals = fileFormat.nullIfValues
+    if (nullIfVals.nonEmpty) {
+      columns.foreach {
+        colName =>
+          result = result.withColumn(
+            colName,
+            when(col(colName).isin(nullIfVals: _*), lit(null).cast(StringType))
+              .otherwise(col(colName)))
+      }
+    }
+
+    if (fileFormat.emptyFieldAsNull) {
+      columns.foreach {
+        colName =>
+          result = result.withColumn(
+            colName,
+            when(col(colName) === lit(""), lit(null).cast(StringType))
+              .otherwise(col(colName)))
+      }
+    }
+
+    result
+  }
+
+  /** Resolve the default value expression for a column not populated from 
source data. */
+  private def resolveDefaultColumn(field: DataField, colName: String): Column 
= {
+    val defaultVal = field.defaultValue()
+    if (defaultVal != null) {
+      val sparkType =
+        org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+      try {
+        val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal)
+        
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
+      } catch {
+        case e: Exception =>
+          logWarning(
+            s"Failed to parse default value '$defaultVal' for column 
'$colName'; using NULL instead.",
+            e)
+          lit(null).cast(sparkType).as(colName)
+      }
+    } else {
+      lit(null).as(colName)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoErrorHandler.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoErrorHandler.scala
new file mode 100644
index 0000000000..466134900f
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoErrorHandler.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.plans.logical.FileFormatType
+import org.apache.paimon.spark.copyinto.{CopyIntoResultBuilder, 
CopyLoadHistoryManager, CopyLoadRecord}
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataField
+
+import org.apache.hadoop.fs.FileStatus
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+/**
+ * Handles error detection and result building for COPY INTO operations. 
Supports both row-level
+ * (CONTINUE) and file-level (SKIP_FILE) error handling.
+ */
+private[execution] class CopyIntoErrorHandler(
+    spark: org.apache.spark.sql.SparkSession,
+    castValidator: CopyIntoCastValidator,
+    dataFrameBuilder: CopyIntoDataFrameBuilder) {
+
+  /** Detect cast errors in Parquet files. Returns error statistics and good 
rows DataFrame. */
+  def detectParquetErrors(
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField])(rawDfWithFile: DataFrame, fileCol: String): 
ErrorDetectionResult = {
+
+    val totalRowsPerFile = CopyIntoUtils.countPerFile(rawDfWithFile, fileCol)
+
+    val castResult =
+      filterParquetCastErrors(rawDfWithFile, targetColumns, writableColumns, 
fields, fileCol)
+
+    ErrorDetectionResult(
+      totalRowsPerFile = totalRowsPerFile,
+      parseErrors = Map.empty, // Parquet has no parse errors
+      castErrors = castResult.errorsPerFile,
+      firstParseError = Map.empty,
+      firstCastError = castResult.firstErrorPerFile,
+      goodRowsDf = castResult.df
+    )
+  }
+
+  /**
+   * Detect parse and cast errors in text files (CSV/JSON). Returns error 
statistics and good rows
+   * DataFrame (already processed and cast).
+   */
+  def detectTextErrors(
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField],
+      stringSchema: StructType,
+      corruptCol: String)(rawDfWithFile: DataFrame, fileCol: String): 
ErrorDetectionResult = {
+
+    val totalRowsPerFile = CopyIntoUtils.countPerFile(rawDfWithFile, fileCol)
+
+    // Separate corrupt rows (parse errors) from valid rows
+    val corruptDf = rawDfWithFile.filter(col(corruptCol).isNotNull)
+    val validDf = rawDfWithFile.filter(col(corruptCol).isNull).drop(corruptCol)
+
+    val parseErrors = CopyIntoUtils.countPerFile(corruptDf, fileCol)
+    val firstParseError = if (parseErrors.nonEmpty) {
+      val samplesPerFile = corruptDf
+        .select(col(fileCol), col(corruptCol))
+        .dropDuplicates(fileCol)
+        .collect()
+      samplesPerFile.map {
+        row =>
+          CopyIntoUtils.extractBaseName(
+            row.getString(0)) -> s"Malformed record: ${row.getString(1)}"
+      }.toMap
+    } else Map.empty[String, String]
+
+    // Process valid rows: apply null transforms, build final DataFrame, 
detect cast errors
+    val processedDf = dataFrameBuilder.applyNullTransforms(validDf, 
stringSchema.fieldNames)
+    val finalDfWithFile = dataFrameBuilder.buildFinalDataFrame(
+      processedDf,
+      targetColumns,
+      writableColumns,
+      fields,
+      extraCols = Seq(fileCol))
+
+    val castResult = castAndFilterErrors(finalDfWithFile, writableColumns, 
fields, fileCol)
+
+    ErrorDetectionResult(
+      totalRowsPerFile = totalRowsPerFile,
+      parseErrors = parseErrors,
+      castErrors = castResult.errorsPerFile,
+      firstParseError = firstParseError,
+      firstCastError = castResult.firstErrorPerFile,
+      goodRowsDf = castResult.df
+    )
+  }
+
+  /**
+   * Build results for error-tolerant modes (CONTINUE/SKIP_FILE). Handles both 
row-level and
+   * file-level error reporting.
+   */
+  def buildErrorTolerantResults(
+      paimonTable: FileStoreTable,
+      filesToLoad: Array[FileStatus],
+      skippedFiles: Array[FileStatus],
+      errorResult: ErrorDetectionResult,
+      filesWithErrors: Set[String],
+      errorGranularity: ErrorGranularity): Seq[InternalRow] = {
+
+    errorGranularity match {
+      case ErrorGranularity.RowLevel =>
+        // CONTINUE mode: use existing buildContinueResults
+        CopyIntoResultBuilder.buildContinueResults(
+          paimonTable,
+          filesToLoad,
+          skippedFiles,
+          errorResult.totalRowsPerFile,
+          errorResult.parseErrors,
+          errorResult.castErrors,
+          errorResult.firstParseError,
+          errorResult.firstCastError
+        )
+
+      case ErrorGranularity.FileLevel =>
+        // SKIP_FILE mode: files are either fully loaded or fully failed
+        val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
+        val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
+        val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+        val loadedAt = System.currentTimeMillis()
+
+        val results = filesToLoad.map {
+          fileStatus =>
+            val baseName = fileStatus.getPath.getName
+            val fullPath = fileStatus.getPath.toString
+
+            if (filesWithErrors.contains(baseName)) {
+              // File had errors, mark as LOAD_FAILED
+              val parseErrorCount = 
errorResult.parseErrors.getOrElse(baseName, 0L)
+              val castErrorCount = errorResult.castErrors.getOrElse(baseName, 
0L)
+              val totalErrors = parseErrorCount + castErrorCount
+              val firstError = errorResult.firstParseError
+                .get(baseName)
+                .orElse(errorResult.firstCastError.get(baseName))
+
+              CopyIntoResultBuilder.buildResultRow(
+                baseName,
+                "LOAD_FAILED",
+                0L,
+                errorResult.totalRowsPerFile.getOrElse(baseName, 0L),
+                totalErrors,
+                firstError.orNull)
+            } else {
+              // File had no errors, mark as LOADED
+              val rowCount = errorResult.totalRowsPerFile.getOrElse(baseName, 
0L)
+              historyManager.recordLoaded(
+                CopyLoadRecord(
+                  filePath = fullPath,
+                  fileSize = fileStatus.getLen,
+                  lastModified = fileStatus.getModificationTime,
+                  loadedAt = loadedAt,
+                  snapshotId = snapshotId,
+                  rowsLoaded = rowCount
+                ))
+
+              CopyIntoResultBuilder.buildResultRow(baseName, "LOADED", 
rowCount, rowCount, 0L, null)
+            }
+        }.toSeq
+
+        results ++ CopyIntoResultBuilder.buildSkippedResults(skippedFiles)
+    }
+  }
+
+  /**
+   * Filter out rows with cast errors and collect per-file error stats. Shared 
by both Parquet and
+   * text CONTINUE paths.
+   */
+  private def filterCastErrors(setup: CastValidationSetup, fileCol: String): 
CastFilterResult = {
+    setup.badCastFilter match {
+      case Some(filter) =>
+        val badRowsDf = setup.validationDf.filter(filter)
+
+        // Count errors and sample first error per file
+        val sampleRows = badRowsDf.dropDuplicates(fileCol).collect()
+        val errorsPerFile = CopyIntoUtils.countPerFile(badRowsDf, fileCol)
+
+        val firstErrorPerFile = if (sampleRows.nonEmpty) {
+          sampleRows.map {
+            sampleRow =>
+              val fileName =
+                
CopyIntoUtils.extractBaseName(sampleRow.getString(sampleRow.fieldIndex(fileCol)))
+              val example = setup.castColMapping.find {
+                case (src, dst) =>
+                  val srcIdx = setup.validationDf.schema.fieldIndex(src)
+                  val dstIdx = setup.validationDf.schema.fieldIndex(dst)
+                  !sampleRow.isNullAt(srcIdx) && sampleRow.isNullAt(dstIdx)
+              }
+              fileName -> s"Cast failure in column 
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that 
cannot be converted to the target type."
+          }.toMap
+        } else Map.empty[String, String]
+
+        // Keep good rows and drop validation temp columns
+        var goodDf = setup.validationDf.filter(!filter)
+        setup.castColMapping.values.foreach(c => goodDf = goodDf.drop(c))
+
+        CastFilterResult(goodDf, errorsPerFile, firstErrorPerFile)
+      case None =>
+        CastFilterResult(setup.validationDf, Map.empty, Map.empty)
+    }
+  }
+
+  private def filterParquetCastErrors(
+      rawDfWithFile: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField],
+      fileCol: String): CastFilterResult = {
+    val setup =
+      castValidator.buildParquetCastValidation(
+        rawDfWithFile,
+        targetColumns,
+        writableColumns,
+        fields,
+        excludeCols = Set(fileCol))
+    filterCastErrors(setup, fileCol)
+  }
+
+  private def castAndFilterErrors(
+      dfWithFile: DataFrame,
+      writableColumns: Seq[String],
+      fields: Seq[DataField],
+      fileCol: String): CastFilterResult = {
+    val setup = castValidator.buildTextCastValidation(dfWithFile, 
writableColumns, fields)
+    val result = filterCastErrors(setup, fileCol)
+    // Apply final cast to target types for good rows
+    CastFilterResult(
+      castValidator.castColumns(result.df, writableColumns, fields),
+      result.errorsPerFile,
+      result.firstErrorPerFile)
+  }
+}
+
+private[execution] case class CastFilterResult(
+    df: DataFrame,
+    errorsPerFile: Map[String, Long],
+    firstErrorPerFile: Map[String, String])
+
+/** Error granularity for error-tolerant modes (CONTINUE/SKIP_FILE). */
+sealed private[execution] trait ErrorGranularity
+private[execution] object ErrorGranularity {
+  case object RowLevel extends ErrorGranularity
+  case object FileLevel extends ErrorGranularity
+}
+
+/** Unified error detection result for both Parquet and text formats. */
+private[execution] case class ErrorDetectionResult(
+    totalRowsPerFile: Map[String, Long],
+    parseErrors: Map[String, Long],
+    castErrors: Map[String, Long],
+    firstParseError: Map[String, String],
+    firstCastError: Map[String, String],
+    goodRowsDf: DataFrame)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoHelper.scala
new file mode 100644
index 0000000000..c00ac34f7b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoHelper.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.copyinto.CopyLoadHistoryManager
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataField
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.spark.sql.SparkSession
+
+/**
+ * Utility methods for COPY INTO operations. Provides helper functions for 
column resolution, file
+ * filtering, and validation.
+ */
+private[execution] object CopyIntoHelper {
+
+  /**
+   * Resolve target columns from user-specified column list or use all 
writable columns. Validates
+   * that specified columns exist in the table and are not duplicated.
+   */
+  def resolveTargetColumns(
+      spark: SparkSession,
+      columns: Option[Seq[String]],
+      writableColumns: Seq[String]): Seq[String] = {
+    columns match {
+      case Some(cols) =>
+        val resolver = spark.sessionState.conf.resolver
+        cols.indices.foreach {
+          i =>
+            cols.indices.filter(_ > i).foreach {
+              j =>
+                if (resolver(cols(i), cols(j))) {
+                  throw new IllegalArgumentException(
+                    s"Duplicate columns in column list: ${cols(i)}")
+                }
+            }
+        }
+        cols.map {
+          c =>
+            writableColumns.find(w => resolver(w, c)).getOrElse {
+              throw new IllegalArgumentException(
+                s"Column '$c' does not exist in target table. Available 
columns: ${writableColumns.mkString(", ")}")
+            }
+        }
+      case None => writableColumns
+    }
+  }
+
+  /**
+   * Validate that non-nullable columns without default values are included in 
the target column
+   * list. Throws exception if validation fails.
+   */
+  def validateNonNullableDefaults(
+      columns: Option[Seq[String]],
+      writableColumns: Seq[String],
+      targetColumns: Seq[String],
+      fields: Seq[DataField]): Unit = {
+    if (columns.isEmpty) return
+    val unmapped = writableColumns.filterNot(targetColumns.contains)
+    unmapped.foreach {
+      colName =>
+        val field = fields.find(_.name() == colName).get
+        if (!field.`type`().isNullable && field.defaultValue() == null) {
+          throw new IllegalArgumentException(
+            s"Non-nullable column '$colName' is not in the column list and has 
no default value")
+        }
+    }
+  }
+
+  /**
+   * List files from source path and filter based on pattern and load history. 
Returns (files to
+   * load, files to skip).
+   */
+  def listAndFilterFiles(
+      spark: SparkSession,
+      paimonTable: FileStoreTable,
+      sourcePath: String,
+      pattern: Option[String],
+      force: Boolean): (Array[FileStatus], Array[FileStatus]) = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val fsPath = new Path(sourcePath)
+    val fs = fsPath.getFileSystem(hadoopConf)
+    val allFiles = fs.listStatus(fsPath).filter(_.isFile)
+
+    val patternFiltered = pattern match {
+      case Some(p) =>
+        val regex = p.r
+        allFiles.filter(f => regex.findFirstIn(f.getPath.getName).isDefined)
+      case None => allFiles
+    }
+
+    if (patternFiltered.isEmpty) {
+      return (Array.empty, Array.empty)
+    }
+
+    val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
+    val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
+
+    if (!force) {
+      val (skip, load) = patternFiltered.partition {
+        f => historyManager.isLoaded(f.getPath.toString, f.getLen, 
f.getModificationTime)
+      }
+      (load, skip)
+    } else {
+      (patternFiltered, Array.empty[FileStatus])
+    }
+  }
+
+  /**
+   * Generate a safe temporary column name that doesn't conflict with existing 
columns.
+   * Case-insensitive conflict detection.
+   */
+  def safeTempCol(spark: SparkSession, baseName: String, existingColumns: 
Set[String]): String = {
+    val resolver = spark.sessionState.conf.resolver
+    var candidate = baseName
+    while (existingColumns.exists(c => resolver(c, candidate))) {
+      candidate = "_" + candidate
+    }
+    candidate
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
index 5ef05b4188..4701262f13 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -19,25 +19,22 @@
 package org.apache.paimon.spark.execution
 
 import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat, 
FileFormatType}
-import org.apache.paimon.spark.copyinto.{CopyLoadHistoryManager, 
CopyLoadRecord}
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat, 
FileFormatType, OnErrorMode}
+import org.apache.paimon.spark.copyinto.CopyIntoResultBuilder
 import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.DataField
 
-import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.fs.FileStatus
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
-import org.apache.spark.sql.functions.{col, input_file_name, lit, when}
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.sql.functions.{col, input_file_name, substring_index}
+import org.apache.spark.sql.types.{StringType, StructField}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 case class CopyIntoTableExec(
     spark: SparkSession,
@@ -48,10 +45,16 @@ case class CopyIntoTableExec(
     fileFormat: CopyFileFormat,
     pattern: Option[String],
     force: Boolean,
+    onError: OnErrorMode,
     out: Seq[Attribute])
   extends PaimonLeafV2CommandExec
   with Logging {
 
+  // Initialize helper classes
+  private val castValidator = new CopyIntoCastValidator(spark)
+  private val dataFrameBuilder = new CopyIntoDataFrameBuilder(spark, 
fileFormat, columns)
+  private val errorHandler = new CopyIntoErrorHandler(spark, castValidator, 
dataFrameBuilder)
+
   override def output: Seq[Attribute] = out
 
   override protected def run(): Seq[InternalRow] = {
@@ -63,14 +66,15 @@ case class CopyIntoTableExec(
     val tableSchema = paimonTable.schema()
     val writableColumns = tableSchema.fieldNames().asScala.toSeq
     val fields = tableSchema.fields().asScala.toSeq
-    val targetColumns = resolveTargetColumns(writableColumns)
+    val targetColumns = CopyIntoHelper.resolveTargetColumns(spark, columns, 
writableColumns)
 
-    validateNonNullableDefaults(writableColumns, targetColumns, fields)
+    CopyIntoHelper.validateNonNullableDefaults(columns, writableColumns, 
targetColumns, fields)
 
-    val (filesToLoad, skippedFiles) = listAndFilterFiles(paimonTable)
+    val (filesToLoad, skippedFiles) =
+      CopyIntoHelper.listAndFilterFiles(spark, paimonTable, sourcePath, 
pattern, force)
 
     if (filesToLoad.isEmpty) {
-      return buildSkippedResults(skippedFiles)
+      return CopyIntoResultBuilder.buildSkippedResults(skippedFiles)
     }
 
     val filePaths = filesToLoad.map(_.getPath.toString)
@@ -101,119 +105,173 @@ case class CopyIntoTableExec(
   }
 
   /**
-   * Parquet import pipeline. Unlike CSV/JSON which read as strings then cast, 
Parquet files already
-   * have typed columns, so the flow is:
-   *   1. Read source Parquet with native types
-   *   2. Project and cast columns to match target table schema (by column 
name, not position)
-   *   3. Validate that no non-null values become null after casting (detect 
type incompatibility)
-   *   4. Write to Paimon table
-   *   5. Record load history for idempotent re-runs (FORCE=FALSE dedup)
+   * Unified error-tolerant import for both CONTINUE and SKIP_FILE modes. Both 
start from
+   * `goodRowsDf`, the row-clean DataFrame produced by error detection (bad 
rows already removed
+   * and, for text formats, already cast to target types).
+   *   - CONTINUE: writes every good row, so a file with errors is partially 
loaded.
+   *   - SKIP_FILE: additionally drops every row belonging to a file that had 
any error, so such a
+   *     file is loaded all-or-nothing.
+   *
+   * Both modes use a single batch write (one commit) regardless of file count.
    */
-  private def runParquetImport(
+  private def runErrorTolerantMode(
       paimonTable: FileStoreTable,
-      filePaths: Array[String],
+      rawDf: DataFrame,
       targetColumns: Seq[String],
       writableColumns: Seq[String],
       fields: Seq[DataField],
       filesToLoad: Array[FileStatus],
       skippedFiles: Array[FileStatus],
-      readerOptions: Map[String, String]): Seq[InternalRow] = {
-    val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+      errorGranularity: ErrorGranularity,
+      detectErrors: (DataFrame, String) => ErrorDetectionResult): 
Seq[InternalRow] = {
+
+    val allTargetCols = writableColumns.toSet ++ targetColumns.toSet
+    val inputFileCol = CopyIntoHelper.safeTempCol(spark, "__input_file", 
allTargetCols)
+    val rawDfWithFile = rawDf.withColumn(inputFileCol, 
input_file_name()).cache()
+
+    try {
+      val errorResult = detectErrors(rawDfWithFile, inputFileCol)
+
+      // `goodRowsDf` carries `inputFileCol` (full path); error keys are base 
names, so compare on
+      // the base name extracted from the path.
+      val filesWithErrors = errorResult.parseErrors.keySet ++ 
errorResult.castErrors.keySet
+      val dfToWrite = errorGranularity match {
+        case ErrorGranularity.RowLevel =>
+          // CONTINUE: keep all good rows, including good rows from files that 
also had errors.
+          errorResult.goodRowsDf
+
+        case ErrorGranularity.FileLevel =>
+          // SKIP_FILE: drop every row whose file had any error.
+          if (filesWithErrors.nonEmpty) {
+            val baseName = substring_index(col(inputFileCol), "/", -1)
+            
errorResult.goodRowsDf.filter(!baseName.isin(filesWithErrors.toSeq: _*))
+          } else {
+            errorResult.goodRowsDf
+          }
+      }
+
+      val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+      val finalDf = fileFormat.formatType match {
+        case FileFormatType.PARQUET =>
+          dataFrameBuilder.buildParquetDataFrame(dfToWrite, targetColumns, 
writableColumns, fields)
+        case _ =>
+          // For text formats, goodRowsDf is already processed and cast.
+          dfToWrite
+      }
 
-    val selectedDf = buildParquetDataFrame(rawDf, targetColumns, 
writableColumns, fields)
-    validateParquetCast(rawDf, targetColumns, writableColumns, fields)
+      
finalDf.drop(inputFileCol).write.format("paimon").mode("append").insertInto(tableName)
 
-    val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
-    selectedDf.write.format("paimon").mode("append").insertInto(tableName)
+      errorHandler.buildErrorTolerantResults(
+        paimonTable,
+        filesToLoad,
+        skippedFiles,
+        errorResult,
+        filesWithErrors,
+        errorGranularity)
 
-    val countDf = spark.read.options(readerOptions).parquet(filePaths: _*)
-    recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles, 
countDf)
+    } finally {
+      rawDfWithFile.unpersist()
+    }
   }
 
   /**
-   * Build the projection DataFrame for Parquet import. Maps source columns to 
target table columns
-   * by name (case-insensitive). For each writable column:
-   *   - If it's in targetColumns AND exists in source: cast source column to 
target type
-   *   - If it's in targetColumns but missing from source: fill with NULL
-   *   - If it's NOT in targetColumns (unmapped): fill with default value or 
NULL
+   * Import files in ABORT mode: read, validate, write, and return per-file 
row counts. Validation
+   * aborts the whole statement on the first parse or cast error, so either 
all files are written or
+   * none are.
    */
-  private def buildParquetDataFrame(
-      rawDf: DataFrame,
+  private def importAbort(
+      filePaths: Array[String],
       targetColumns: Seq[String],
       writableColumns: Seq[String],
-      fields: Seq[DataField]): DataFrame = {
-    val resolver = spark.sessionState.conf.resolver
-    val sourceColumns = rawDf.columns.toSeq
-
-    val selectExprs: Seq[Column] = writableColumns.map {
-      colName =>
-        if (targetColumns.exists(tc => resolver(tc, colName))) {
-          val srcCol = sourceColumns.find(s => resolver(s, colName))
-          srcCol match {
-            case Some(s) =>
-              val field = fields.find(_.name() == colName).get
-              val sparkType =
-                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
-              col(s).cast(sparkType).as(colName)
-            case None =>
-              val field = fields.find(_.name() == colName).get
-              val sparkType =
-                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
-              lit(null).cast(sparkType).as(colName)
-          }
-        } else {
-          resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
+      fields: Seq[DataField],
+      readerOptions: Map[String, String],
+      tableName: String): Map[String, Long] = {
+    val allTargetCols = writableColumns.toSet ++ targetColumns.toSet
+    val fileCol = CopyIntoHelper.safeTempCol(spark, "__file__", allTargetCols)
+    fileFormat.formatType match {
+      case FileFormatType.PARQUET =>
+        val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
+        castValidator.validateParquetCast(rawDf, targetColumns, 
writableColumns, fields)
+        val selectedDf =
+          dataFrameBuilder
+            .buildParquetDataFrame(rawDf, targetColumns, writableColumns, 
fields)
+            .withColumn(fileCol, input_file_name())
+            .cache()
+        try {
+          val counts = CopyIntoUtils.countPerFile(selectedDf, fileCol)
+          
selectedDf.drop(fileCol).write.format("paimon").mode("append").insertInto(tableName)
+          counts
+        } finally {
+          selectedDf.unpersist()
+        }
+      case _ =>
+        val stringSchema = dataFrameBuilder.buildStringSchema(targetColumns)
+        val sourceDf = dataFrameBuilder.readSourceData(filePaths, 
stringSchema, readerOptions)
+        val finalDf =
+          dataFrameBuilder.buildFinalDataFrame(sourceDf, targetColumns, 
writableColumns, fields)
+        val castedDf = castValidator
+          .castAndValidate(finalDf, writableColumns, fields)
+          .withColumn(fileCol, input_file_name())
+          .cache()
+        try {
+          val counts = CopyIntoUtils.countPerFile(castedDf, fileCol)
+          
castedDf.drop(fileCol).write.format("paimon").mode("append").insertInto(tableName)
+          counts
+        } finally {
+          castedDf.unpersist()
         }
     }
-    rawDf.select(selectExprs: _*)
   }
 
   /**
-   * Validate that casting Parquet source columns to target types does not 
silently lose data.
-   * Detection strategy: if a source value is non-null but becomes null after 
casting, the cast
-   * failed (e.g., a string "abc" cast to IntegerType → null). Aborts 
immediately on first failure.
+   * Parquet import pipeline. Unlike CSV/JSON which read as strings then cast, 
Parquet files already
+   * have typed columns, so the flow is:
+   *   1. Read source Parquet with native types
+   *   2. Project and cast columns to match target table schema (by column 
name, not position)
+   *   3. Validate that no non-null values become null after casting (detect 
type incompatibility)
+   *   4. Write to Paimon table
+   *   5. Record load history for idempotent re-runs (FORCE=FALSE dedup)
    */
-  private def validateParquetCast(
-      rawDf: DataFrame,
+  private def runParquetImport(
+      paimonTable: FileStoreTable,
+      filePaths: Array[String],
       targetColumns: Seq[String],
       writableColumns: Seq[String],
-      fields: Seq[DataField]): Unit = {
-    val resolver = spark.sessionState.conf.resolver
-    val sourceColumns = rawDf.columns.toSeq
-
-    val castCheckCols = ArrayBuffer[(String, String)]()
-    var validationDf = rawDf
-
-    writableColumns.zip(fields).foreach {
-      case (colName, field) =>
-        if (targetColumns.exists(tc => resolver(tc, colName))) {
-          sourceColumns.find(s => resolver(s, colName)).foreach {
-            srcColName =>
-              val sparkType =
-                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
-              val castColName = s"__pq_cv_$colName"
-              validationDf = validationDf.withColumn(castColName, 
col(srcColName).cast(sparkType))
-              castCheckCols += ((srcColName, castColName))
-          }
-        }
-    }
+      fields: Seq[DataField],
+      filesToLoad: Array[FileStatus],
+      skippedFiles: Array[FileStatus],
+      readerOptions: Map[String, String]): Seq[InternalRow] = {
+    val rawDf = spark.read.options(readerOptions).parquet(filePaths: _*)
 
-    if (castCheckCols.nonEmpty) {
-      val badCastFilter = castCheckCols
-        .map { case (src, dst) => col(src).isNotNull && col(dst).isNull }
-        .reduce(_ || _)
-      val badRows = validationDf.filter(badCastFilter).limit(1).collect()
-      if (badRows.nonEmpty) {
-        val example = castCheckCols.find {
-          case (src, dst) =>
-            val row = badRows(0)
-            val srcIdx = validationDf.schema.fieldIndex(src)
-            val dstIdx = validationDf.schema.fieldIndex(dst)
-            !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+    onError match {
+      case OnErrorMode.Continue | OnErrorMode.SkipFile =>
+        val errorGranularity = if (onError == OnErrorMode.Continue) {
+          ErrorGranularity.RowLevel
+        } else {
+          ErrorGranularity.FileLevel
         }
-        throw new IllegalArgumentException(
-          s"ON_ERROR = ABORT_STATEMENT: Cast failure in column 
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that 
cannot be converted to the target type.")
-      }
+
+        runErrorTolerantMode(
+          paimonTable,
+          rawDf,
+          targetColumns,
+          writableColumns,
+          fields,
+          filesToLoad,
+          skippedFiles,
+          errorGranularity,
+          errorHandler.detectParquetErrors(targetColumns, writableColumns, 
fields)
+        )
+
+      case _ =>
+        val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+        val countsPerFile =
+          importAbort(filePaths, targetColumns, writableColumns, fields, 
readerOptions, tableName)
+        CopyIntoResultBuilder.recordHistoryAndBuildResultsDirect(
+          paimonTable,
+          filesToLoad,
+          skippedFiles,
+          countsPerFile)
     }
   }
 
@@ -234,297 +292,84 @@ case class CopyIntoTableExec(
       filesToLoad: Array[FileStatus],
       skippedFiles: Array[FileStatus],
       readerOptions: Map[String, String]): Seq[InternalRow] = {
-    val stringSchema = buildStringSchema(targetColumns)
-
-    val sourceDf = readSourceData(filePaths, stringSchema, readerOptions)
-    val finalDf =
-      buildFinalDataFrame(sourceDf, targetColumns, writableColumns, fields)
-    val castedDf = castAndValidate(finalDf, writableColumns, fields)
-
-    val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
-    castedDf.write.format("paimon").mode("append").insertInto(tableName)
-
-    val countDf = fileFormat.formatType match {
-      case FileFormatType.JSON =>
-        spark.read.options(readerOptions).schema(stringSchema).json(filePaths: 
_*)
-      case _ =>
-        spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: 
_*)
-    }
-    recordHistoryAndBuildResults(paimonTable, filesToLoad, skippedFiles, 
countDf)
-  }
-
-  private def buildStringSchema(targetColumns: Seq[String]): StructType = {
-    fileFormat.formatType match {
-      case FileFormatType.JSON =>
-        StructType(targetColumns.map(name => StructField(name, StringType, 
nullable = true)))
-      case _ =>
-        StructType(
-          (0 until targetColumns.size).map(i => StructField(s"_c$i", 
StringType, nullable = true)))
-    }
-  }
+    val stringSchema = dataFrameBuilder.buildStringSchema(targetColumns)
 
-  private def resolveTargetColumns(writableColumns: Seq[String]): Seq[String] 
= {
-    columns match {
-      case Some(cols) =>
-        val resolver = spark.sessionState.conf.resolver
-        cols.indices.foreach {
-          i =>
-            cols.indices.filter(_ > i).foreach {
-              j =>
-                if (resolver(cols(i), cols(j))) {
-                  throw new IllegalArgumentException(
-                    s"Duplicate columns in column list: ${cols(i)}")
-                }
-            }
-        }
-        cols.map {
-          c =>
-            writableColumns.find(w => resolver(w, c)).getOrElse {
-              throw new IllegalArgumentException(
-                s"Column '$c' does not exist in target table. Available 
columns: ${writableColumns.mkString(", ")}")
-            }
+    onError match {
+      case OnErrorMode.Continue | OnErrorMode.SkipFile =>
+        val errorGranularity = if (onError == OnErrorMode.Continue) {
+          ErrorGranularity.RowLevel
+        } else {
+          ErrorGranularity.FileLevel
         }
-      case None => writableColumns
-    }
-  }
 
-  private def validateNonNullableDefaults(
-      writableColumns: Seq[String],
-      targetColumns: Seq[String],
-      fields: Seq[DataField]): Unit = {
-    if (columns.isEmpty) return
-    val unmapped = writableColumns.filterNot(targetColumns.contains)
-    unmapped.foreach {
-      colName =>
-        val field = fields.find(_.name() == colName).get
-        if (!field.`type`().isNullable && field.defaultValue() == null) {
-          throw new IllegalArgumentException(
-            s"Non-nullable column '$colName' is not in the column list and has 
no default value")
+        // For error-tolerant modes, we need to read with corrupt record 
tracking
+        val allTargetCols = writableColumns.toSet ++ targetColumns.toSet
+        val corruptCol = CopyIntoHelper.safeTempCol(spark, "_corrupt_record", 
allTargetCols)
+        val schemaWithCorrupt =
+          stringSchema.add(StructField(corruptCol, StringType, nullable = 
true))
+        val corruptRecordOption =
+          Map("columnNameOfCorruptRecord" -> corruptCol, "mode" -> 
"PERMISSIVE")
+
+        val rawDf = fileFormat.formatType match {
+          case FileFormatType.JSON =>
+            spark.read
+              .options(readerOptions ++ corruptRecordOption)
+              .schema(schemaWithCorrupt)
+              .json(filePaths: _*)
+          case _ =>
+            spark.read
+              .options(readerOptions ++ corruptRecordOption)
+              .schema(schemaWithCorrupt)
+              .csv(filePaths: _*)
         }
-    }
-  }
-
-  private def listAndFilterFiles(
-      paimonTable: FileStoreTable): (Array[FileStatus], Array[FileStatus]) = {
-    val hadoopConf = spark.sessionState.newHadoopConf()
-    val fsPath = new Path(sourcePath)
-    val fs = fsPath.getFileSystem(hadoopConf)
-    val allFiles = fs.listStatus(fsPath).filter(_.isFile)
-
-    val patternFiltered = pattern match {
-      case Some(p) =>
-        val regex = p.r
-        allFiles.filter(f => regex.findFirstIn(f.getPath.getName).isDefined)
-      case None => allFiles
-    }
 
-    if (patternFiltered.isEmpty) {
-      return (Array.empty, Array.empty)
-    }
-
-    val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
-    val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
-
-    if (!force) {
-      val (skip, load) = patternFiltered.partition {
-        f => historyManager.isLoaded(f.getPath.toString, f.getLen, 
f.getModificationTime)
-      }
-      (load, skip)
-    } else {
-      (patternFiltered, Array.empty[FileStatus])
-    }
-  }
+        runErrorTolerantMode(
+          paimonTable,
+          rawDf,
+          targetColumns,
+          writableColumns,
+          fields,
+          filesToLoad,
+          skippedFiles,
+          errorGranularity,
+          errorHandler.detectTextErrors(
+            targetColumns,
+            writableColumns,
+            fields,
+            stringSchema,
+            corruptCol)
+        )
 
-  private def readSourceData(
-      filePaths: Array[String],
-      stringSchema: StructType,
-      readerOptions: Map[String, String]): DataFrame = {
-    var df = fileFormat.formatType match {
-      case FileFormatType.JSON =>
-        spark.read.options(readerOptions).schema(stringSchema).json(filePaths: 
_*)
       case _ =>
-        spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: 
_*)
-    }
-
-    val nullIfVals = fileFormat.nullIfValues
-    if (nullIfVals.nonEmpty) {
-      df.columns.foreach {
-        colName =>
-          df = df.withColumn(
-            colName,
-            when(col(colName).isin(nullIfVals: _*), lit(null).cast(StringType))
-              .otherwise(col(colName)))
-      }
-    }
-
-    if (fileFormat.emptyFieldAsNull) {
-      df.columns.foreach {
-        colName =>
-          df = df.withColumn(
-            colName,
-            when(col(colName) === lit(""), lit(null).cast(StringType))
-              .otherwise(col(colName)))
-      }
+        runTextImportAbort(
+          paimonTable,
+          filePaths,
+          targetColumns,
+          writableColumns,
+          fields,
+          filesToLoad,
+          skippedFiles,
+          readerOptions)
     }
-
-    df
   }
 
-  private def buildFinalDataFrame(
-      sourceDf: DataFrame,
+  private def runTextImportAbort(
+      paimonTable: FileStoreTable,
+      filePaths: Array[String],
       targetColumns: Seq[String],
       writableColumns: Seq[String],
-      fields: Seq[DataField]): DataFrame = {
-    val renamedDf = fileFormat.formatType match {
-      case FileFormatType.JSON =>
-        sourceDf
-      case _ =>
-        targetColumns.zipWithIndex.foldLeft(sourceDf) {
-          case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", 
targetCol)
-        }
-    }
-
-    if (columns.isDefined) {
-      val selectExprs: Seq[Column] = writableColumns.map {
-        colName =>
-          if (targetColumns.contains(colName)) {
-            col(colName)
-          } else {
-            resolveDefaultColumn(fields.find(_.name() == colName).get, colName)
-          }
-      }
-      renamedDf.select(selectExprs: _*)
-    } else {
-      renamedDf
-    }
-  }
-
-  private def castAndValidate(
-      finalDf: DataFrame,
-      writableColumns: Seq[String],
-      fields: Seq[DataField]): DataFrame = {
-    val nonStringCastCols = ArrayBuffer[String]()
-    var castedDf = finalDf
-    writableColumns.zip(fields).foreach {
-      case (colName, field) =>
-        val sparkType = 
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
-        castedDf = castedDf.withColumn(colName, col(colName).cast(sparkType))
-        if (sparkType != StringType) {
-          nonStringCastCols += colName
-        }
-    }
-
-    if (nonStringCastCols.nonEmpty) {
-      val castSuffix = "__cv"
-      val validationDf = nonStringCastCols.zipWithIndex.foldLeft(finalDf) {
-        case (df, (colName, idx)) =>
-          val field = fields.find(_.name() == colName).get
-          val sparkType = 
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
-          df.withColumn(castSuffix + idx, col(colName).cast(sparkType))
-      }
-      val badCastFilter = nonStringCastCols.zipWithIndex
-        .map { case (cn, idx) => col(cn).isNotNull && col(castSuffix + 
idx).isNull }
-        .reduce(_ || _)
-      val badRows = validationDf.filter(badCastFilter).limit(1).collect()
-      if (badRows.nonEmpty) {
-        val example = nonStringCastCols.zipWithIndex.find {
-          case (cn, idx) =>
-            val row = badRows(0)
-            val srcIdx = validationDf.schema.fieldIndex(cn)
-            val dstIdx = validationDf.schema.fieldIndex(castSuffix + idx)
-            !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
-        }
-        throw new IllegalArgumentException(
-          s"ON_ERROR = ABORT_STATEMENT: Cast failure in column 
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that 
cannot be converted to the target type.")
-      }
-    }
-
-    castedDf
-  }
-
-  /**
-   * Record successfully loaded files to load history (for FORCE=FALSE 
idempotent dedup), and build
-   * the result rows showing per-file load status. Accepts a pre-built countDf 
that will be grouped
-   * by input_file_name() to get per-file row counts — this allows both 
Parquet and text paths to
-   * share the same logic.
-   */
-  private def recordHistoryAndBuildResults(
-      paimonTable: FileStoreTable,
+      fields: Seq[DataField],
       filesToLoad: Array[FileStatus],
       skippedFiles: Array[FileStatus],
-      countDf: DataFrame): Seq[InternalRow] = {
-    val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
-    val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
-    val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
-    val loadedAt = System.currentTimeMillis()
-
-    val rowCounts = countDf
-      .groupBy(input_file_name().as("file"))
-      .count()
-      .collect()
-
-    val fileCountMap = rowCounts.map {
-      row =>
-        val fullPath = row.getString(0)
-        val baseName = fullPath.substring(fullPath.lastIndexOf('/') + 1)
-        baseName -> row.getLong(1)
-    }.toMap
-
-    val loadedResults = filesToLoad.map {
-      fileStatus =>
-        val baseName = fileStatus.getPath.getName
-        val rowCount = fileCountMap.getOrElse(baseName, 0L)
-
-        historyManager.recordLoaded(
-          CopyLoadRecord(
-            filePath = fileStatus.getPath.toString,
-            fileSize = fileStatus.getLen,
-            lastModified = fileStatus.getModificationTime,
-            loadedAt = loadedAt,
-            snapshotId = snapshotId,
-            rowsLoaded = rowCount
-          ))
-
-        InternalRow(
-          UTF8String.fromString(baseName),
-          UTF8String.fromString("LOADED"),
-          rowCount,
-          rowCount)
-    }.toSeq
-
-    val skippedResults = buildSkippedResults(skippedFiles)
-    loadedResults ++ skippedResults
-  }
-
-  private def buildSkippedResults(files: Array[FileStatus]): Seq[InternalRow] 
= {
-    files.map {
-      f =>
-        InternalRow(
-          UTF8String.fromString(f.getPath.getName),
-          UTF8String.fromString("SKIPPED"),
-          0L,
-          0L)
-    }.toSeq
-  }
-
-  /** Resolve the default value expression for a column not populated from 
source data. */
-  private def resolveDefaultColumn(field: DataField, colName: String): Column 
= {
-    val defaultVal = field.defaultValue()
-    if (defaultVal != null) {
-      val sparkType =
-        org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
-      try {
-        val parsed = spark.sessionState.sqlParser.parseExpression(defaultVal)
-        
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
-      } catch {
-        case e: Exception =>
-          logWarning(
-            s"Failed to parse default value '$defaultVal' for column 
'$colName': " +
-              s"${e.getMessage}. Using null instead.")
-          lit(null).cast(sparkType).as(colName)
-      }
-    } else {
-      lit(null).as(colName)
-    }
+      readerOptions: Map[String, String]): Seq[InternalRow] = {
+    val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+    val countsPerFile =
+      importAbort(filePaths, targetColumns, writableColumns, fields, 
readerOptions, tableName)
+
+    CopyIntoResultBuilder.recordHistoryAndBuildResultsDirect(
+      paimonTable,
+      filesToLoad,
+      skippedFiles,
+      countsPerFile)
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
index dd85af058d..fdf76b8dc6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
@@ -18,7 +18,9 @@
 
 package org.apache.paimon.spark.execution
 
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.functions.col
 
 object CopyIntoUtils {
 
@@ -28,4 +30,17 @@ object CopyIntoUtils {
       Seq(ident.name())
     parts.filter(_.nonEmpty).map(p => s"`${p.replace("`", 
"``")}`").mkString(".")
   }
+
+  def extractBaseName(fullPath: String): String = {
+    fullPath.substring(fullPath.lastIndexOf('/') + 1)
+  }
+
+  /** Count rows per file, keyed by base file name. */
+  def countPerFile(df: DataFrame, fileCol: String): Map[String, Long] = {
+    df.groupBy(col(fileCol))
+      .count()
+      .collect()
+      .map(row => extractBaseName(row.getString(0)) -> row.getLong(1))
+      .toMap
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 76bd6beb15..6870ce8832 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -149,7 +149,7 @@ case class PaimonStrategy(spark: SparkSession)
           partitionPredicate: Option[PartitionPredicate]) =>
       TruncatePaimonTableWithFilterExec(table, partitionPredicate) :: Nil
 
-    case c @ CopyIntoTableCommand(PaimonCatalogAndIdentifier(catalog, ident), 
_, _, _, _, _) =>
+    case c @ CopyIntoTableCommand(PaimonCatalogAndIdentifier(catalog, ident), 
_, _, _, _, _, _) =>
       CopyIntoTableExec(
         spark,
         catalog,
@@ -159,6 +159,7 @@ case class PaimonStrategy(spark: SparkSession)
         c.fileFormat,
         c.pattern,
         c.force,
+        c.onError,
         c.output) :: Nil
 
     case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog, 
ident), _, _) =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 3171190915..4fd533f401 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -78,6 +78,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
     PaimonSqlExtensionsParser.FORCE,
     PaimonSqlExtensionsParser.ON_ERROR,
     PaimonSqlExtensionsParser.ABORT_STATEMENT,
+    PaimonSqlExtensionsParser.CONTINUE,
+    PaimonSqlExtensionsParser.SKIP_FILE,
     PaimonSqlExtensionsParser.OVERWRITE,
     PaimonSqlExtensionsParser.CSV
   )
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index da716ced11..0d54c698e6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -172,7 +172,15 @@ class PaimonSqlExtensionsAstBuilder(delegate: 
ParserInterface)
       val fileFormat = buildFileFormat(ctx.fileFormatClause())
       val pattern = Option(ctx.patternClause()).map(p => 
unquoteString(p.STRING().getText))
       val force = Option(ctx.forceClause()).exists(_.booleanValue().TRUE() != 
null)
-      logical.CopyIntoTableCommand(table, columns, sourcePath, fileFormat, 
pattern, force)
+      val onError = Option(ctx.onErrorClause())
+        .map {
+          clause =>
+            if (clause.CONTINUE() != null) OnErrorMode.Continue
+            else if (clause.SKIP_FILE() != null) OnErrorMode.SkipFile
+            else OnErrorMode.AbortStatement
+        }
+        .getOrElse(OnErrorMode.AbortStatement)
+      logical.CopyIntoTableCommand(table, columns, sourcePath, fileFormat, 
pattern, force, onError)
     }
 
   /** Create a COPY INTO LOCATION (export) logical command. */
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoCastValidatorTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoCastValidatorTest.scala
new file mode 100644
index 0000000000..ab006e2570
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoCastValidatorTest.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.{DataField, DataTypes}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+
+class CopyIntoCastValidatorTest extends PaimonSparkTestBase {
+
+  private lazy val validator = new CopyIntoCastValidator(spark)
+
+  test("buildParquetCastValidation: no validation needed when all columns are 
compatible") {
+    val schema = StructType(Seq(StructField("id", IntegerType), 
StructField("name", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val setup = validator.buildParquetCastValidation(df, targetColumns, 
writableColumns, fields)
+    assert(setup.castColMapping.nonEmpty)
+    assert(setup.badCastFilter.isDefined)
+  }
+
+  test("buildParquetCastValidation: exclude specified columns") {
+    val schema = StructType(
+      Seq(
+        StructField("id", IntegerType),
+        StructField("name", StringType),
+        StructField("__file__", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(1, "Alice", "file1.csv"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val setup = validator.buildParquetCastValidation(
+      df,
+      targetColumns,
+      writableColumns,
+      fields,
+      excludeCols = Set("__file__"))
+    assert(!setup.castColMapping.contains("__file__"))
+  }
+
+  test("buildTextCastValidation: no validation for string columns") {
+    val schema = StructType(Seq(StructField("id", StringType), 
StructField("name", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "Alice"), Row("2", "Bob"))),
+      schema)
+
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.STRING()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val setup = validator.buildTextCastValidation(df, writableColumns, fields)
+    assert(setup.castColMapping.isEmpty)
+    assert(setup.badCastFilter.isEmpty)
+  }
+
+  test("buildTextCastValidation: add validation columns for non-string types") 
{
+    val schema = StructType(Seq(StructField("id", StringType), 
StructField("age", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "30"), Row("2", "25"))),
+      schema)
+
+    val writableColumns = Seq("id", "age")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "age", DataTypes.INT())
+    )
+
+    val setup = validator.buildTextCastValidation(df, writableColumns, fields)
+    assert(setup.castColMapping.size == 2)
+    assert(setup.badCastFilter.isDefined)
+    assert(setup.castColMapping.contains("id"))
+    assert(setup.castColMapping.contains("age"))
+  }
+
+  test("validateParquetCast: pass when all casts are valid") {
+    val schema = StructType(Seq(StructField("id", IntegerType), 
StructField("name", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    // Should not throw
+    validator.validateParquetCast(df, targetColumns, writableColumns, fields)
+  }
+
+  test("validateParquetCast: throw exception when cast fails") {
+    val schema = StructType(Seq(StructField("id", StringType), 
StructField("name", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("abc", "Alice"), Row("2", 
"Bob"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val exception = intercept[IllegalArgumentException] {
+      validator.validateParquetCast(df, targetColumns, writableColumns, fields)
+    }
+    assert(exception.getMessage.contains("ON_ERROR = ABORT_STATEMENT"))
+    assert(exception.getMessage.contains("Cast failure"))
+  }
+
+  test("castColumns: cast all columns to target types") {
+    val schema = StructType(Seq(StructField("id", StringType), 
StructField("age", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "30"), Row("2", "25"))),
+      schema)
+
+    val writableColumns = Seq("id", "age")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "age", DataTypes.INT())
+    )
+
+    val castedDf = validator.castColumns(df, writableColumns, fields)
+    assert(castedDf.schema("id").dataType == IntegerType)
+    assert(castedDf.schema("age").dataType == IntegerType)
+
+    val rows = castedDf.collect()
+    assert(rows(0).getInt(0) == 1)
+    assert(rows(0).getInt(1) == 30)
+  }
+
+  test("castAndValidate: pass when all casts are valid") {
+    val schema = StructType(Seq(StructField("id", StringType), 
StructField("age", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "30"), Row("2", "25"))),
+      schema)
+
+    val writableColumns = Seq("id", "age")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "age", DataTypes.INT())
+    )
+
+    val castedDf = validator.castAndValidate(df, writableColumns, fields)
+    assert(castedDf.schema("id").dataType == IntegerType)
+    assert(castedDf.schema("age").dataType == IntegerType)
+  }
+
+  test("castAndValidate: throw exception when cast fails") {
+    val schema = StructType(Seq(StructField("id", StringType), 
StructField("age", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "invalid"), Row("2", "25"))),
+      schema)
+
+    val writableColumns = Seq("id", "age")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "age", DataTypes.INT())
+    )
+
+    val exception = intercept[IllegalArgumentException] {
+      validator.castAndValidate(df, writableColumns, fields)
+    }
+    assert(exception.getMessage.contains("ON_ERROR = ABORT_STATEMENT"))
+    assert(exception.getMessage.contains("Cast failure"))
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilderTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilderTest.scala
new file mode 100644
index 0000000000..542932b53c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoDataFrameBuilderTest.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat, 
FileFormatType}
+import org.apache.paimon.types.{DataField, DataTypes}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+
+class CopyIntoDataFrameBuilderTest extends PaimonSparkTestBase {
+
+  private def createBuilder(
+      formatType: FileFormatType,
+      columns: Option[Seq[String]] = None): CopyIntoDataFrameBuilder = {
+    val fileFormat = new CopyFileFormat(formatType = formatType, options = 
Map.empty)
+    new CopyIntoDataFrameBuilder(spark, fileFormat, columns)
+  }
+
+  test("buildStringSchema: CSV format with positional columns") {
+    val builder = createBuilder(FileFormatType.CSV)
+    val targetColumns = Seq("id", "name", "age")
+    val schema = builder.buildStringSchema(targetColumns)
+
+    assert(schema.fields.length == 3)
+    assert(schema.fields(0).name == "_c0")
+    assert(schema.fields(1).name == "_c1")
+    assert(schema.fields(2).name == "_c2")
+    assert(schema.fields.forall(_.dataType == StringType))
+  }
+
+  test("buildStringSchema: JSON format with named columns") {
+    val builder = createBuilder(FileFormatType.JSON)
+    val targetColumns = Seq("id", "name", "age")
+    val schema = builder.buildStringSchema(targetColumns)
+
+    assert(schema.fields.length == 3)
+    assert(schema.fields(0).name == "id")
+    assert(schema.fields(1).name == "name")
+    assert(schema.fields(2).name == "age")
+    assert(schema.fields.forall(_.dataType == StringType))
+  }
+
+  test("buildParquetDataFrame: map source columns to target by name") {
+    val builder = createBuilder(FileFormatType.PARQUET)
+    val schema = StructType(Seq(StructField("id", IntegerType), 
StructField("name", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(1, "Alice"), Row(2, "Bob"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val result = builder.buildParquetDataFrame(df, targetColumns, 
writableColumns, fields)
+    assert(result.columns.toSeq == Seq("id", "name"))
+    assert(result.count() == 2)
+  }
+
+  test("buildParquetDataFrame: fill NULL for missing source columns") {
+    val builder = createBuilder(FileFormatType.PARQUET)
+    val schema = StructType(Seq(StructField("id", IntegerType)))
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(1), 
Row(2))), schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val result = builder.buildParquetDataFrame(df, targetColumns, 
writableColumns, fields)
+    assert(result.columns.toSeq == Seq("id", "name"))
+    val rows = result.collect()
+    assert(rows(0).isNullAt(1)) // name should be NULL
+  }
+
+  test("buildParquetDataFrame: fill default value for unmapped columns") {
+    val builder = createBuilder(FileFormatType.PARQUET, Some(Seq("id")))
+    val schema = StructType(Seq(StructField("id", IntegerType)))
+    val df = spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row(1), 
Row(2))), schema)
+
+    val targetColumns = Seq("id")
+    val writableColumns = Seq("id", "status")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "status", DataTypes.STRING(), "'active'")
+    )
+
+    val result = builder.buildParquetDataFrame(df, targetColumns, 
writableColumns, fields)
+    assert(result.columns.toSeq == Seq("id", "status"))
+  }
+
+  test("buildFinalDataFrame: rename CSV positional columns to named columns") {
+    val builder = createBuilder(FileFormatType.CSV, Some(Seq("id", "name")))
+    val schema = StructType(Seq(StructField("_c0", StringType), 
StructField("_c1", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "Alice"), Row("2", "Bob"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val result = builder.buildFinalDataFrame(df, targetColumns, 
writableColumns, fields)
+    assert(result.columns.toSeq == Seq("id", "name"))
+  }
+
+  test("buildFinalDataFrame: keep JSON named columns as-is") {
+    val builder = createBuilder(FileFormatType.JSON)
+    val schema = StructType(Seq(StructField("id", StringType), 
StructField("name", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "Alice"), Row("2", "Bob"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val result = builder.buildFinalDataFrame(df, targetColumns, 
writableColumns, fields)
+    assert(result.columns.toSeq == Seq("id", "name"))
+  }
+
+  test("buildFinalDataFrame: preserve extra columns") {
+    val builder = createBuilder(FileFormatType.CSV, Some(Seq("id", "name")))
+    val schema = StructType(
+      Seq(
+        StructField("_c0", StringType),
+        StructField("_c1", StringType),
+        StructField("__file__", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("1", "Alice", "file1.csv"))),
+      schema)
+
+    val targetColumns = Seq("id", "name")
+    val writableColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT()),
+      new DataField(1, "name", DataTypes.STRING())
+    )
+
+    val result =
+      builder.buildFinalDataFrame(df, targetColumns, writableColumns, fields, 
Seq("__file__"))
+    assert(result.columns.toSeq == Seq("id", "name", "__file__"))
+  }
+
+  test("applyNullTransforms: replace NULL_IF values with null") {
+    val fileFormat = new CopyFileFormat(
+      formatType = FileFormatType.CSV,
+      options = Map("NULL_IF" -> Seq("N/A", 
"NULL").mkString(CopyFileFormat.LIST_SEPARATOR)))
+    val builder = new CopyIntoDataFrameBuilder(spark, fileFormat, None)
+
+    val schema = StructType(Seq(StructField("col1", StringType), 
StructField("col2", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("value", "N/A"), Row("NULL", 
"data"))),
+      schema)
+
+    val result = builder.applyNullTransforms(df, Seq("col1", "col2"))
+    val rows = result.collect()
+    assert(rows(0).getString(0) == "value")
+    assert(rows(0).isNullAt(1)) // "N/A" -> null
+    assert(rows(1).isNullAt(0)) // "NULL" -> null
+    assert(rows(1).getString(1) == "data")
+  }
+
+  test("applyNullTransforms: replace empty strings with null when 
EMPTY_FIELD_AS_NULL is true") {
+    val fileFormat = new CopyFileFormat(
+      formatType = FileFormatType.CSV,
+      options = Map("EMPTY_FIELD_AS_NULL" -> "TRUE"))
+    val builder = new CopyIntoDataFrameBuilder(spark, fileFormat, None)
+
+    val schema = StructType(Seq(StructField("col1", StringType), 
StructField("col2", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("value", ""), Row("", "data"))),
+      schema)
+
+    val result = builder.applyNullTransforms(df, Seq("col1", "col2"))
+    val rows = result.collect()
+    assert(rows(0).getString(0) == "value")
+    assert(rows(0).isNullAt(1)) // "" -> null
+    assert(rows(1).isNullAt(0)) // "" -> null
+    assert(rows(1).getString(1) == "data")
+  }
+
+  test("applyNullTransforms: apply both NULL_IF and EMPTY_FIELD_AS_NULL") {
+    val fileFormat = new CopyFileFormat(
+      formatType = FileFormatType.CSV,
+      options = Map(
+        "NULL_IF" -> Seq("N/A").mkString(CopyFileFormat.LIST_SEPARATOR),
+        "EMPTY_FIELD_AS_NULL" -> "TRUE"))
+    val builder = new CopyIntoDataFrameBuilder(spark, fileFormat, None)
+
+    val schema = StructType(Seq(StructField("col1", StringType), 
StructField("col2", StringType)))
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row("N/A", ""), Row("value", 
"data"))),
+      schema)
+
+    val result = builder.applyNullTransforms(df, Seq("col1", "col2"))
+    val rows = result.collect()
+    assert(rows(0).isNullAt(0)) // "N/A" -> null
+    assert(rows(0).isNullAt(1)) // "" -> null
+    assert(rows(1).getString(0) == "value")
+    assert(rows(1).getString(1) == "data")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoHelperTest.scala
new file mode 100644
index 0000000000..d09b374efb
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/execution/CopyIntoHelperTest.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.{DataField, DataTypes}
+
+class CopyIntoHelperTest extends PaimonSparkTestBase {
+
+  test("resolveTargetColumns: use all writable columns when columns is None") {
+    val writableColumns = Seq("id", "name", "age")
+    val result = CopyIntoHelper.resolveTargetColumns(spark, None, 
writableColumns)
+    assert(result == writableColumns)
+  }
+
+  test("resolveTargetColumns: resolve specified columns case-insensitively") {
+    val writableColumns = Seq("id", "name", "age")
+    val columns = Some(Seq("ID", "Name"))
+    val result = CopyIntoHelper.resolveTargetColumns(spark, columns, 
writableColumns)
+    assert(result == Seq("id", "name"))
+  }
+
+  test("resolveTargetColumns: throw exception for non-existent column") {
+    val writableColumns = Seq("id", "name", "age")
+    val columns = Some(Seq("id", "invalid_col"))
+    val exception = intercept[IllegalArgumentException] {
+      CopyIntoHelper.resolveTargetColumns(spark, columns, writableColumns)
+    }
+    assert(exception.getMessage.contains("invalid_col"))
+    assert(exception.getMessage.contains("does not exist"))
+  }
+
+  test("resolveTargetColumns: throw exception for duplicate columns") {
+    val writableColumns = Seq("id", "name", "age")
+    val columns = Some(Seq("id", "ID"))
+    val exception = intercept[IllegalArgumentException] {
+      CopyIntoHelper.resolveTargetColumns(spark, columns, writableColumns)
+    }
+    assert(exception.getMessage.contains("Duplicate columns"))
+  }
+
+  test("validateNonNullableDefaults: pass when all non-nullable columns are 
mapped") {
+    val writableColumns = Seq("id", "name", "age")
+    val targetColumns = Seq("id", "name", "age")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT().notNull()),
+      new DataField(1, "name", DataTypes.STRING()),
+      new DataField(2, "age", DataTypes.INT())
+    )
+    // Should not throw
+    CopyIntoHelper.validateNonNullableDefaults(
+      Some(targetColumns),
+      writableColumns,
+      targetColumns,
+      fields)
+  }
+
+  test("validateNonNullableDefaults: pass when unmapped non-nullable column 
has default") {
+    val writableColumns = Seq("id", "name", "status")
+    val targetColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT().notNull()),
+      new DataField(1, "name", DataTypes.STRING()),
+      new DataField(2, "status", DataTypes.STRING().notNull(), null, "default 
value")
+    )
+    // Should not throw
+    CopyIntoHelper.validateNonNullableDefaults(
+      Some(targetColumns),
+      writableColumns,
+      targetColumns,
+      fields)
+  }
+
+  test("validateNonNullableDefaults: throw when unmapped non-nullable column 
has no default") {
+    val writableColumns = Seq("id", "name", "status")
+    val targetColumns = Seq("id", "name")
+    val fields = Seq(
+      new DataField(0, "id", DataTypes.INT().notNull()),
+      new DataField(1, "name", DataTypes.STRING()),
+      new DataField(2, "status", DataTypes.STRING().notNull())
+    )
+    val exception = intercept[IllegalArgumentException] {
+      CopyIntoHelper.validateNonNullableDefaults(
+        Some(targetColumns),
+        writableColumns,
+        targetColumns,
+        fields)
+    }
+    assert(exception.getMessage.contains("status"))
+    assert(exception.getMessage.contains("not in the column list"))
+    assert(exception.getMessage.contains("no default value"))
+  }
+
+  test("safeTempCol: generate unique column name") {
+    val existingColumns = Set("col1", "col2", "__temp")
+    val result = CopyIntoHelper.safeTempCol(spark, "__new", existingColumns)
+    assert(result == "__new")
+  }
+
+  test("safeTempCol: add prefix when name conflicts") {
+    val existingColumns = Set("col1", "col2", "__temp")
+    val result = CopyIntoHelper.safeTempCol(spark, "__temp", existingColumns)
+    assert(result == "___temp")
+  }
+
+  test("safeTempCol: handle case-insensitive conflicts") {
+    val existingColumns = Set("Col1", "COL2")
+    val result = CopyIntoHelper.safeTempCol(spark, "col1", existingColumns)
+    assert(result == "_col1")
+  }
+
+  test("safeTempCol: add multiple prefixes for multiple conflicts") {
+    val existingColumns = Set("__temp", "___temp")
+    val result = CopyIntoHelper.safeTempCol(spark, "__temp", existingColumns)
+    assert(result == "____temp")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
index 03e109919b..c4eb2cd644 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CatalogQualifiedCreateTableLikeTest.scala
@@ -129,6 +129,16 @@ class CatalogQualifiedCreateTableLikeTest extends 
PaimonSparkTestBase {
     Assertions.assertEquals("tag", 
nonReservedIdentifierCommand.targetIdent.name())
     Assertions.assertEquals(Seq("test"), 
nonReservedIdentifierCommand.targetIdent.namespace().toSeq)
 
+    val continueCommand =
+      parseCreateTableLikeCommand("CREATE TABLE paimon.test.continue LIKE 
paimon.test.source_tbl")
+    Assertions.assertEquals("continue", continueCommand.targetIdent.name())
+    Assertions.assertEquals(Seq("test"), 
continueCommand.targetIdent.namespace().toSeq)
+
+    val skipFileCommand =
+      parseCreateTableLikeCommand("CREATE TABLE paimon.test.skip_file LIKE 
paimon.test.source_tbl")
+    Assertions.assertEquals("skip_file", skipFileCommand.targetIdent.name())
+    Assertions.assertEquals(Seq("test"), 
skipFileCommand.targetIdent.namespace().toSeq)
+
     val nestedIdentifierCommand =
       parseCreateTableLikeCommand(
         "CREATE TABLE paimon.test.extra.target_tbl LIKE 
paimon.test.extra.source_tbl")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoOnErrorTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoOnErrorTest.scala
new file mode 100644
index 0000000000..11942c7dfd
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoOnErrorTest.scala
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.spark.sql.Row
+
+trait CopyIntoOnErrorTest { self: CopyIntoTestBase =>
+
+  test("COPY INTO: ON_ERROR = CONTINUE skips bad CSV rows") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_csv")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_csv (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\nabc,Bob\n3,Charlie\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_csv
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(4) > 0)
+        assert(rows(0).getString(1) == "PARTIALLY_LOADED")
+
+        val data = spark.sql(s"SELECT * FROM $dbName0.copy_continue_csv ORDER 
BY id").collect()
+        assert(
+          data.length == 2,
+          s"Expected 2 rows but got ${data.length} — bad row should NOT be in 
table")
+        assert(data(0).getInt(0) == 1 && data(0).getString(1) == "Alice")
+        assert(data(1).getInt(0) == 3 && data(1).getString(1) == "Charlie")
+        assert(
+          !data.exists(r => r.getString(1) == "Bob"),
+          "Bad row with 'abc' as id should not be in the table")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_csv")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE with no errors behaves like ABORT") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_ok")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_ok (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_ok
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1) == "LOADED")
+        assert(rows(0).getLong(4) == 0L)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_continue_ok ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_ok")
+  }
+
+  test("COPY INTO: ON_ERROR = SKIP_FILE skips file with bad data") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile")
+    spark.sql(s"CREATE TABLE $dbName0.copy_skipfile (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "good.csv", "1,Alice\n2,Bob\n")
+        createCsvFile(dir, "bad.csv", "abc,Charlie\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = SKIP_FILE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        val loaded = rows.filter(_.getString(1) == "LOADED")
+        val failed = rows.filter(_.getString(1) == "LOAD_FAILED")
+        assert(loaded.length == 1)
+        assert(failed.length == 1)
+        assert(failed(0).getLong(4) >= 1L)
+        assert(failed(0).getString(5) != null)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_skipfile ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile")
+  }
+
+  test("COPY INTO: ON_ERROR = SKIP_FILE discards good rows in a file that also 
has bad rows") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_mixed")
+    spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_mixed (id INT, name 
STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "good.csv", "1,Alice\n2,Bob\n")
+        // 2 good rows (Carol, Eve) and 1 bad row (non-numeric id)
+        createCsvFile(dir, "mixed.csv", "3,Carol\nabc,Dave\n4,Eve\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_mixed
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = SKIP_FILE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.count(_.getString(1) == "LOADED") == 1)
+        assert(rows.count(_.getString(1) == "LOAD_FAILED") == 1)
+
+        // The whole mixed.csv is rejected: its good rows (Carol, Eve) must 
not be loaded.
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_mixed ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_mixed")
+  }
+
+  test("COPY INTO: ON_ERROR = SKIP_FILE with all good files") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_ok")
+    spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_ok (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "a.csv", "1,Alice\n")
+        createCsvFile(dir, "b.csv", "2,Bob\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_ok
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = SKIP_FILE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.forall(_.getString(1) == "LOADED"))
+        assert(rows.length == 2)
+
+        assert(spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_ok").count() 
== 2)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_ok")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE with JSON malformed records") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_json (id INT, name 
STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id": "1", "name": "Alice"}
+            |{bad json here}
+            |{"id": "3", "name": "Charlie"}
+            |""".stripMargin
+        )
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_json
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = JSON)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows(0).getLong(4) > 0)
+
+        val data = spark.sql(s"SELECT * FROM $dbName0.copy_continue_json ORDER 
BY id").collect()
+        assert(
+          data.length == 2,
+          s"Expected 2 rows but got ${data.length} — malformed JSON should NOT 
be in table")
+        assert(data(0).getInt(0) == 1)
+        assert(data(1).getInt(0) == 3)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json")
+  }
+
+  test("COPY INTO: ON_ERROR = SKIP_FILE with JSON") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_json")
+    spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_json (id INT, name 
STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(dir, "good.json", """{"id": "1", "name": "Alice"}""" + 
"\n")
+        createJsonFile(dir, "bad.json", "{bad json}\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_json
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = JSON)
+                                  |ON_ERROR = SKIP_FILE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        val loaded = rows.filter(_.getString(1) == "LOADED")
+        val failed = rows.filter(_.getString(1) == "LOAD_FAILED")
+        assert(loaded.length == 1)
+        assert(failed.length == 1)
+
+        checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_json"), 
Seq(Row(1, "Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_json")
+  }
+
+  test("COPY INTO: ABORT_STATEMENT still fails on bad data") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_abort_explicit")
+    spark.sql(s"CREATE TABLE $dbName0.copy_abort_explicit (id INT, name 
STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "abc,Alice\n")
+
+        val e = intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_abort_explicit
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |ON_ERROR = ABORT_STATEMENT
+                       |""".stripMargin)
+        }
+        assert(
+          e.getMessage.contains("Cast failure") ||
+            e.getMessage.contains("ABORT_STATEMENT") ||
+            e.getMessage.contains("CAST_INVALID_INPUT") ||
+            e.getCause != null)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_abort_explicit")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE with Parquet cast errors") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_pq")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_pq (id INT, name STRING)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        val schema = StructType(Seq(StructField("id", StringType), 
StructField("name", StringType)))
+        createParquetSingleFile(
+          dir,
+          "data.parquet",
+          Seq(Row("1", "Alice"), Row("abc", "Bad"), Row("3", "Charlie")),
+          schema)
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_pq
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = PARQUET)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(4) > 0)
+
+        val data = spark.sql(s"SELECT * FROM $dbName0.copy_continue_pq ORDER 
BY id").collect()
+        assert(
+          data.length == 2,
+          s"Expected 2 rows but got ${data.length} — bad row should NOT be in 
table")
+        assert(data(0).getInt(0) == 1)
+        assert(data(1).getInt(0) == 3)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_pq")
+  }
+
+  test("COPY INTO: ON_ERROR = SKIP_FILE with Parquet") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_pq")
+    spark.sql(s"CREATE TABLE $dbName0.copy_skipfile_pq (id INT, name STRING)")
+
+    withParquetDir {
+      dir =>
+        import org.apache.spark.sql.types._
+        // Both files share the same physical schema (id STRING) so the 
directory read succeeds;
+        // good rows cast cleanly to the target INT column while bad.parquet's 
"abc" fails the cast,
+        // which is what SKIP_FILE must catch. Using a mixed INT/STRING footer 
here would instead
+        // crash Spark's vectorized Parquet reader before any cast validation 
runs.
+        val schema =
+          StructType(Seq(StructField("id", StringType), StructField("name", 
StringType)))
+        createParquetSingleFile(
+          dir,
+          "good.parquet",
+          Seq(Row("1", "Alice"), Row("2", "Bob")),
+          schema)
+
+        createParquetSingleFile(dir, "bad.parquet", Seq(Row("abc", "Bad")), 
schema)
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_skipfile_pq
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = PARQUET)
+                                  |ON_ERROR = SKIP_FILE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        val loaded = rows.filter(_.getString(1) == "LOADED")
+        val failed = rows.filter(_.getString(1) == "LOAD_FAILED")
+        assert(loaded.length == 1)
+        assert(failed.length == 1)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_skipfile_pq ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skipfile_pq")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE clean file has no first_error") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_multi")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_multi (id INT, name 
STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "good.csv", "1,Alice\n2,Bob\n")
+        createCsvFile(dir, "bad.csv", "abc,Charlie\n3,Dave\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_multi
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        val goodFile = rows.find(_.getString(0) == "good.csv").get
+        val badFile = rows.find(_.getString(0) == "bad.csv").get
+
+        assert(goodFile.getString(1) == "LOADED")
+        assert(goodFile.getLong(4) == 0L)
+        assert(goodFile.isNullAt(5), "Clean file should have null first_error")
+
+        assert(badFile.getString(1) == "PARTIALLY_LOADED")
+        assert(badFile.getLong(4) > 0)
+        assert(!badFile.isNullAt(5), "Bad file should have non-null 
first_error")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_multi")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE with JSON cast errors reports 
correctly") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json_cast")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_json_cast (id INT, name 
STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id": "1", "name": "Alice"}
+            |{"id": "abc", "name": "Bad"}
+            |{"id": "3", "name": "Charlie"}
+            |""".stripMargin
+        )
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_json_cast
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = JSON)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(4) > 0, "Should report cast errors")
+        assert(
+          rows(0).getString(1) == "PARTIALLY_LOADED",
+          s"Expected PARTIALLY_LOADED but got ${rows(0).getString(1)}")
+
+        val data =
+          spark.sql(s"SELECT * FROM $dbName0.copy_continue_json_cast ORDER BY 
id").collect()
+        assert(data.length == 2, s"Expected 2 rows but got ${data.length}")
+        assert(data(0).getInt(0) == 1)
+        assert(data(1).getInt(0) == 3)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_json_cast")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE all rows fail reports LOAD_FAILED") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_allfail")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_allfail (id INT, name 
STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "allfail.csv", "abc,Alice\nxyz,Bob\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_allfail
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(
+          rows(0).getString(1) == "LOAD_FAILED",
+          s"Expected LOAD_FAILED when all rows fail, got 
${rows(0).getString(1)}")
+        assert(rows(0).getLong(2) == 0L, "rows_loaded should be 0")
+        assert(rows(0).getLong(3) == 2L, "rows_parsed should be 2")
+        assert(rows(0).getLong(4) == 2L, "errors_seen should be 2")
+
+        assert(
+          spark.sql(s"SELECT * FROM $dbName0.copy_continue_allfail").count() 
== 0,
+          "No rows should be in table")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_allfail")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE all rows fail does not block re-run") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_retry")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_retry (id INT, name 
STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "abc,Alice\n")
+
+        // First run: all rows fail
+        val result1 = spark.sql(s"""COPY INTO $dbName0.copy_continue_retry
+                                   |FROM '${dir.getAbsolutePath}'
+                                   |FILE_FORMAT = (TYPE = CSV)
+                                   |ON_ERROR = CONTINUE
+                                   |""".stripMargin)
+        val rows1 = result1.collect()
+        assert(rows1(0).getString(1) == "LOAD_FAILED")
+        assert(spark.sql(s"SELECT * FROM 
$dbName0.copy_continue_retry").count() == 0)
+
+        // Re-run without FORCE: file should NOT be skipped since 0 rows were 
loaded
+        val result2 = spark.sql(s"""COPY INTO $dbName0.copy_continue_retry
+                                   |FROM '${dir.getAbsolutePath}'
+                                   |FILE_FORMAT = (TYPE = CSV)
+                                   |ON_ERROR = CONTINUE
+                                   |""".stripMargin)
+        val rows2 = result2.collect()
+        assert(
+          rows2(0).getString(1) != "SKIPPED",
+          "File with 0 rows loaded should not be skipped on re-run")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_retry")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE skips CSV rows with fewer columns") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_fewer_cols")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_fewer_cols (id INT, name 
STRING, age INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n2,Bob,20\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_fewer_cols
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        // A row with fewer columns than the target schema is a malformed 
record
+        // (Spark CSV PERMISSIVE mode), so CONTINUE skips it and keeps the 
well-formed row.
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1) == "PARTIALLY_LOADED")
+        assert(rows(0).getLong(2) == 1L, "rows_loaded should be 1")
+        assert(rows(0).getLong(4) == 1L, "errors_seen should be 1")
+
+        val data =
+          spark.sql(s"SELECT * FROM $dbName0.copy_continue_fewer_cols ORDER BY 
id").collect()
+        assert(data.length == 1)
+        assert(data(0).getInt(0) == 2 && data(0).getString(1) == "Bob" && 
data(0).getInt(2) == 20)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_fewer_cols")
+  }
+
+  test("COPY INTO: ON_ERROR = CONTINUE skips CSV rows with extra columns") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_extra_cols")
+    spark.sql(s"CREATE TABLE $dbName0.copy_continue_extra_cols (id INT, name 
STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice,extra\n2,Bob\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_continue_extra_cols
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |ON_ERROR = CONTINUE
+                                  |""".stripMargin)
+
+        // A row with more columns than the target schema is a malformed record
+        // (Spark CSV PERMISSIVE mode), so CONTINUE skips it and keeps the 
well-formed row.
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1) == "PARTIALLY_LOADED")
+        assert(rows(0).getLong(2) == 1L, "rows_loaded should be 1")
+        assert(rows(0).getLong(4) == 1L, "errors_seen should be 1")
+
+        val data =
+          spark.sql(s"SELECT * FROM $dbName0.copy_continue_extra_cols ORDER BY 
id").collect()
+        assert(data.length == 1)
+        assert(data(0).getInt(0) == 2 && data(0).getString(1) == "Bob")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_continue_extra_cols")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index e0970362a0..23365e35e6 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -27,7 +27,7 @@ import java.nio.file.Files
 
 class CopyIntoTestBase extends PaimonSparkTestBase {
 
-  private def createCsvFile(dir: File, name: String, content: String): File = {
+  protected def createCsvFile(dir: File, name: String, content: String): File 
= {
     val file = new File(dir, name)
     val writer = new PrintWriter(file)
     try writer.write(content)
@@ -35,13 +35,13 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
     file
   }
 
-  private def withCsvDir(testBody: File => Unit): Unit = {
+  protected def withCsvDir(testBody: File => Unit): Unit = {
     val dir = Files.createTempDirectory("copy_into_test").toFile
     try testBody(dir)
     finally deleteRecursively(dir)
   }
 
-  private def deleteRecursively(file: File): Unit = {
+  protected def deleteRecursively(file: File): Unit = {
     if (file.isDirectory) {
       file.listFiles().foreach(deleteRecursively)
     }
@@ -648,7 +648,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
     spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
   }
 
-  private def createJsonFile(dir: File, name: String, content: String): File = 
{
+  protected def createJsonFile(dir: File, name: String, content: String): File 
= {
     val file = new File(dir, name)
     val writer = new PrintWriter(file)
     try writer.write(content)
@@ -656,7 +656,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
     file
   }
 
-  private def withJsonDir(testBody: File => Unit): Unit = {
+  protected def withJsonDir(testBody: File => Unit): Unit = {
     val dir = Files.createTempDirectory("copy_into_json_test").toFile
     try testBody(dir)
     finally deleteRecursively(dir)
@@ -1086,7 +1086,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
 
   // ========== Parquet Tests ==========
 
-  private def withParquetDir(testBody: File => Unit): Unit = {
+  protected def withParquetDir(testBody: File => Unit): Unit = {
     val dir = Files.createTempDirectory("copy_into_parquet_test").toFile
     try testBody(dir)
     finally deleteRecursively(dir)
@@ -1101,7 +1101,7 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
     df.coalesce(1).write.parquet(new File(dir, name).getAbsolutePath)
   }
 
-  private def createParquetSingleFile(
+  protected def createParquetSingleFile(
       dir: File,
       fileName: String,
       data: Seq[Row],

Reply via email to