This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e6fba9a68c [spark] Add COPY INTO support for CSV import and file 
writing (#7926)
e6fba9a68c is described below

commit e6fba9a68c3260c492042f22f6bedde674d7e037
Author: Junrui Lee <[email protected]>
AuthorDate: Fri May 22 20:20:10 2026 +0800

    [spark] Add COPY INTO support for CSV import and file writing (#7926)
    
    This PR adds Spark SQL `COPY INTO` support for bulk CSV import and CSV
    file writing.
    
    Supported import syntax:
    
    ```sql
    COPY INTO table_name [(col1, col2, ...)]
    FROM 'source_path'
    FILE_FORMAT = (TYPE = CSV [, option = value, ...])
    [PATTERN = 'regex']
    [FORCE = TRUE|FALSE]
    [ON_ERROR = ABORT_STATEMENT]
    ```
    
    Supported file writing syntax:
    
    ```sql
    COPY INTO 'target_path'
    FROM table_name
    FILE_FORMAT = (TYPE = CSV [, option = value, ...])
    [OVERWRITE = TRUE|FALSE]
    ```
---
 docs/docs/spark/sql-write.md                       | 155 +++++
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |  21 +
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |  21 +
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |  21 +
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |  21 +
 .../AbstractPaimonSparkSqlExtensionsParser.scala   |   6 +-
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |  21 +
 .../org/apache/paimon/spark/sql/CopyIntoTest.scala |  21 +
 .../PaimonSqlExtensions.g4                         |  59 ++
 .../plans/logical/CopyIntoLocationCommand.scala    |  42 ++
 .../plans/logical/CopyIntoTableCommand.scala       |  45 ++
 .../spark/catalyst/plans/logical/CopyOptions.scala | 145 +++++
 .../spark/copyinto/CopyLoadHistoryManager.scala    | 109 ++++
 .../spark/execution/CopyIntoLocationExec.scala     |  67 +++
 .../paimon/spark/execution/CopyIntoTableExec.scala | 344 +++++++++++
 .../paimon/spark/execution/CopyIntoUtils.scala     |  31 +
 .../paimon/spark/execution/PaimonStrategy.scala    |  24 +-
 .../AbstractPaimonSparkSqlExtensionsParser.scala   |   6 +-
 .../extensions/PaimonSqlExtensionsAstBuilder.scala |  96 +++
 .../apache/paimon/spark/sql/CopyIntoTestBase.scala | 667 +++++++++++++++++++++
 20 files changed, 1919 insertions(+), 3 deletions(-)

diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 74bec3f9e7..3a05500ccc 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -294,3 +294,158 @@ INSERT INTO t VALUES (1, '1'), (2, '2');
 -- Need using `BY NAME` statement (requires Spark 3.5+)
 INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
 ```
+
+## COPY INTO
+
+`COPY INTO` provides a SQL command for bulk loading CSV files into Paimon 
tables and writing table data to CSV files.
+
+### CSV Import
+
+```sql
+COPY INTO table_name [(col1, col2, ...)]
+FROM 'source_path'
+FILE_FORMAT = (TYPE = CSV [, option = value, ...])
+[PATTERN = 'regex']
+[FORCE = TRUE|FALSE]
+[ON_ERROR = ABORT_STATEMENT]
+```
+
+**Basic import:**
+
+```sql
+COPY INTO my_db.my_table
+FROM '/data/csv_files/'
+FILE_FORMAT = (TYPE = CSV);
+```
+
+**Import with explicit column mapping:**
+
+```sql
+-- Only load into specified columns; omitted columns use their DEFAULT value 
or NULL
+COPY INTO my_db.users (id, name)
+FROM '/data/new_users/'
+FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1);
+```
+
+**Import with NULL_IF and PATTERN:**
+
+```sql
+COPY INTO my_db.events
+FROM '/data/logs/'
+FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|', NULL_IF = ('NULL', '\\N', 
''))
+PATTERN = '.*\.csv'
+FORCE = FALSE;
+```
+
+### Write CSV Files
+
+```sql
+COPY INTO 'target_path'
+FROM table_name
+FILE_FORMAT = (TYPE = CSV [, option = value, ...])
+[OVERWRITE = TRUE|FALSE]
+```
+
+**Write with header and overwrite:**
+
+```sql
+COPY INTO '/export/users_backup/'
+FROM my_db.users
+FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER = ',')
+OVERWRITE = TRUE;
+```
+
+### FILE_FORMAT Options
+
+`FILE_FORMAT` is required and must include `TYPE = CSV`.
+
+**Import options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. Must be `CSV`. | (required) |
+| FIELD_DELIMITER | Column delimiter character. | `,` |
+| SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
+| QUOTE | Quote character for enclosing fields. | `"` |
+| ESCAPE | Escape character within quoted fields. | `\` |
+| NULL_IF | List of string values to interpret as NULL, e.g. `('NULL', 
'\\N')`. | (none) |
+| EMPTY_FIELD_AS_NULL | Treat empty fields as NULL. `TRUE` or `FALSE`. | 
`FALSE` |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
+**Write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. Must be `CSV`. | (required) |
+| FIELD_DELIMITER | Column delimiter character. | `,` |
+| HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
+| QUOTE | Quote character for enclosing fields. | `"` |
+| ESCAPE | Escape character within quoted fields. | `\` |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
+### Import Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| PATTERN | Regex to filter source files by base file name. Only matching 
files are loaded. | (all files) |
+| FORCE | `FALSE`: skip files already loaded (idempotent). `TRUE`: reload all 
files. | `FALSE` |
+| ON_ERROR | Error handling strategy. Only `ABORT_STATEMENT` is supported. | 
`ABORT_STATEMENT` |
+
+### File Write Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| OVERWRITE | `FALSE`: fail if target path exists. `TRUE`: overwrite existing 
files. | `FALSE` |
+
+### Column Mapping
+
+When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM 
...`):
+
+- CSV columns are mapped **positionally** to the specified column list.
+- The number of CSV columns must match the column list length.
+- Columns not in the list are filled with their **DEFAULT value** (if defined 
in the table schema) or **NULL**.
+- Non-nullable columns without a default value that are not in the list will 
cause an error.
+
+When no column list is provided:
+
+- CSV columns are mapped positionally to all writable columns in the target 
table.
+- The number of CSV columns must match the number of writable columns.
+
+### Repeated Imports
+
+By default (`FORCE = FALSE`), COPY INTO tracks which files have been 
successfully loaded. A file is identified by its path, size, and last-modified 
timestamp.
+
+- Re-running the same COPY INTO command will **skip** already-loaded files and 
return status `SKIPPED`.
+- If a source file is modified (size or timestamp changes), it becomes 
eligible for re-loading.
+- `FORCE = TRUE` bypasses load history and always re-imports all matching 
files.
+
+### Result Output
+
+**Import** returns one row per source file:
+
+| Column | Type | Description |
+|--------|------|-------------|
+| file_name | STRING | Source file name |
+| status | STRING | `LOADED` or `SKIPPED` |
+| rows_loaded | BIGINT | Number of rows written |
+| rows_parsed | BIGINT | Number of rows parsed from the file |
+
+**File write** returns a single row:
+
+| Column | Type | Description |
+|--------|------|-------------|
+| output_path | STRING | Target output path |
+| file_count | INT | Number of files written |
+| rows_written | BIGINT | Total rows written |
+
+### Limitations
+
+- Only **CSV** format is supported.
+- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not 
supported.
+- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the 
entire command.
+- `SINGLE = TRUE` (single-file output) is not supported.
+- File format options must be specified inline in `FILE_FORMAT = (...)`.
+- File listing is **non-recursive**: only direct files under the source path 
are processed. Subdirectories are ignored.
+- `PATTERN` matches the **base file name** only (not the full path).
+- Concurrent COPY INTO commands targeting the same table may produce duplicate 
data.
+- `SKIP_HEADER` only supports values `0` or `1`.
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 0398df809f..67f72d953a 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -122,7 +122,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
       .replaceAll("/\\*.*?\\*/", " ")
       .replaceAll("`", "")
       .trim()
-    isPaimonProcedure(normalized) || isTagRefDdl(normalized)
+    isPaimonProcedure(normalized) || isTagRefDdl(normalized) || 
isCopyInto(normalized)
   }
 
   // All builtin paimon procedures are under the 'sys' namespace
@@ -140,6 +140,10 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
         normalized.contains("delete tag")))
   }
 
+  private def isCopyInto(normalized: String): Boolean = {
+    normalized.startsWith("copy into")
+  }
+
   protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser 
=> T): T = {
     val lexer = new PaimonSqlExtensionsLexer(
       new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git 
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 207d973216..4255e530c1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++ 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -74,6 +74,16 @@ statement
     | ALTER TABLE multipartIdentifier createReplaceTagClause                   
             #createOrReplaceTag
     | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier       
             #deleteTag
     | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier      
             #renameTag
+    | COPY INTO multipartIdentifier ('(' columnList ')')?
+      FROM sourcePath=STRING
+      fileFormatClause
+      patternClause?
+      forceClause?
+      onErrorClause?                                                           
             #copyIntoTable
+    | COPY INTO targetPath=STRING
+      FROM multipartIdentifier
+      fileFormatClause
+      overwriteClause?                                                         
             #copyIntoLocation
   ;
 
 callArgument
@@ -104,6 +114,42 @@ timeUnit
   | MINUTES
   ;
 
+columnList
+  : identifier (',' identifier)*
+  ;
+
+fileFormatClause
+  : FILE_FORMAT '=' '(' fileFormatOption (',' fileFormatOption)* ')'
+  ;
+
+fileFormatOption
+  : key=identifier '=' fileFormatValue
+  ;
+
+fileFormatValue
+  : STRING                                                             
#stringFormatValue
+  | identifier                                                         
#identFormatValue
+  | booleanValue                                                       
#boolFormatValue
+  | INTEGER_VALUE                                                      
#intFormatValue
+  | '(' STRING (',' STRING)* ')'                                       
#listFormatValue
+  ;
+
+patternClause
+  : PATTERN '=' STRING
+  ;
+
+forceClause
+  : FORCE '=' booleanValue
+  ;
+
+onErrorClause
+  : ON_ERROR '=' ABORT_STATEMENT
+  ;
+
+overwriteClause
+  : OVERWRITE '=' booleanValue
+  ;
+
 expression
     : constant
     | stringMap
@@ -155,6 +201,8 @@ nonReserved
     | REPLACE | RETAIN | VERSION | TAG
     | TRUE | FALSE
     | MAP
+    | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | 
ABORT_STATEMENT | OVERWRITE
+    | CSV
     ;
 
 ALTER: 'ALTER';
@@ -185,6 +233,17 @@ FALSE: 'FALSE';
 
 MAP: 'MAP';
 
+COPY: 'COPY';
+INTO: 'INTO';
+FROM: 'FROM';
+FILE_FORMAT: 'FILE_FORMAT';
+PATTERN: 'PATTERN';
+FORCE: 'FORCE';
+ON_ERROR: 'ON_ERROR';
+ABORT_STATEMENT: 'ABORT_STATEMENT';
+OVERWRITE: 'OVERWRITE';
+CSV: 'CSV';
+
 PLUS: '+';
 MINUS: '-';
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
new file mode 100644
index 0000000000..3e5d0a0666
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.leafnode.PaimonLeafCommand
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
+
+case class CopyIntoLocationCommand(
+    targetPath: String,
+    table: Seq[String],
+    fileFormat: CopyFileFormat,
+    overwrite: Boolean)
+  extends PaimonLeafCommand {
+
+  override def output: Seq[Attribute] = Seq(
+    AttributeReference("output_path", StringType, nullable = false)(),
+    AttributeReference("file_count", IntegerType, nullable = false)(),
+    AttributeReference("rows_written", LongType, nullable = false)()
+  )
+
+  override def simpleString(maxFields: Int): String = {
+    s"CopyIntoLocation: target=$targetPath, source=$table"
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
new file mode 100644
index 0000000000..eedad9763e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.leafnode.PaimonLeafCommand
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
+import org.apache.spark.sql.types.{LongType, StringType}
+
+case class CopyIntoTableCommand(
+    table: Seq[String],
+    columns: Option[Seq[String]],
+    sourcePath: String,
+    fileFormat: CopyFileFormat,
+    pattern: Option[String],
+    force: Boolean)
+  extends PaimonLeafCommand {
+
+  override def output: Seq[Attribute] = Seq(
+    AttributeReference("file_name", StringType, nullable = false)(),
+    AttributeReference("status", StringType, nullable = false)(),
+    AttributeReference("rows_loaded", LongType, nullable = false)(),
+    AttributeReference("rows_parsed", LongType, nullable = false)()
+  )
+
+  override def simpleString(maxFields: Int): String = {
+    s"CopyIntoTable: table=$table, source=$sourcePath"
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
new file mode 100644
index 0000000000..2e2f7e2ec1
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+sealed trait FileFormatType
+
+object FileFormatType {
+  case object CSV extends FileFormatType
+  case class Unsupported(name: String) extends FileFormatType
+}
+
+case class CopyFileFormat(formatType: FileFormatType, options: Map[String, 
String]) {
+
+  def toSparkReaderOptions: Map[String, String] = {
+    val mapped = scala.collection.mutable.Map[String, String]("mode" -> 
"FAILFAST")
+    options.foreach {
+      case (k, v) =>
+        k match {
+          case "FIELD_DELIMITER" => mapped("sep") = v
+          case "QUOTE" => mapped("quote") = v
+          case "ESCAPE" => mapped("escape") = v
+          case "COMPRESSION" => mapped("compression") = v
+          case "SKIP_HEADER" =>
+            mapped("header") = if (v == "1" || v.equalsIgnoreCase("TRUE")) 
"true" else "false"
+          case _ =>
+        }
+    }
+    mapped.toMap
+  }
+
+  def toSparkWriterOptions: Map[String, String] = {
+    val mapped = scala.collection.mutable.Map[String, String]()
+    options.foreach {
+      case (k, v) =>
+        k match {
+          case "FIELD_DELIMITER" => mapped("sep") = v
+          case "HEADER" => mapped("header") = v.toLowerCase
+          case "QUOTE" => mapped("quote") = v
+          case "ESCAPE" => mapped("escape") = v
+          case "COMPRESSION" => mapped("compression") = v
+          case _ =>
+        }
+    }
+    mapped.toMap
+  }
+
+  def nullIfValues: Seq[String] = {
+    options.get("NULL_IF") match {
+      case Some(v) if v.nonEmpty =>
+        v.split(CopyFileFormat.LIST_SEPARATOR, -1).toSeq
+      case _ => Seq.empty
+    }
+  }
+
+  def emptyFieldAsNull: Boolean = {
+    options.get("EMPTY_FIELD_AS_NULL").exists(v => v == "TRUE" || 
v.equalsIgnoreCase("TRUE"))
+  }
+
+  def validateForImport(): Unit = {
+    validateFormatType()
+    if (options.contains("MODE")) {
+      throw new IllegalArgumentException(
+        "MODE cannot be specified in FILE_FORMAT options; it is reserved for 
ON_ERROR handling")
+    }
+    val invalid = 
options.keys.filterNot(CopyFileFormat.VALID_IMPORT_KEYS.contains)
+    if (invalid.nonEmpty) {
+      throw new IllegalArgumentException(
+        s"Unsupported FILE_FORMAT options for import: ${invalid.mkString(", 
")}")
+    }
+    options.get("SKIP_HEADER").foreach {
+      v =>
+        val intVal =
+          try v.toInt
+          catch { case _: NumberFormatException => -1 }
+        if (intVal != 0 && intVal != 1) {
+          throw new IllegalArgumentException(s"SKIP_HEADER supports only 0 or 
1, got: $v")
+        }
+    }
+  }
+
+  def validateForExport(): Unit = {
+    validateFormatType()
+    val invalid = 
options.keys.filterNot(CopyFileFormat.VALID_EXPORT_KEYS.contains)
+    if (invalid.nonEmpty) {
+      throw new IllegalArgumentException(
+        s"Unsupported FILE_FORMAT options for export: ${invalid.mkString(", 
")}")
+    }
+  }
+
+  private def validateFormatType(): Unit = {
+    formatType match {
+      case FileFormatType.CSV =>
+      case FileFormatType.Unsupported(name) =>
+        throw new IllegalArgumentException(
+          s"Unsupported file format type: $name. Only CSV is currently 
supported")
+    }
+  }
+}
+
+object CopyFileFormat {
+
+  val VALID_IMPORT_KEYS: Set[String] = Set(
+    "FIELD_DELIMITER",
+    "SKIP_HEADER",
+    "QUOTE",
+    "ESCAPE",
+    "NULL_IF",
+    "EMPTY_FIELD_AS_NULL",
+    "COMPRESSION"
+  )
+
+  val VALID_EXPORT_KEYS: Set[String] = Set(
+    "FIELD_DELIMITER",
+    "HEADER",
+    "QUOTE",
+    "ESCAPE",
+    "COMPRESSION"
+  )
+
+  // Unit Separator (U+001F) used to encode multi-value lists in a single 
string
+  val LIST_SEPARATOR: String = "\u001f"
+
+  def parseFormatType(typeStr: String): FileFormatType = {
+    typeStr.toUpperCase match {
+      case "CSV" => FileFormatType.CSV
+      case other => FileFormatType.Unsupported(other)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala
new file mode 100644
index 0000000000..c85a552151
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copyinto
+
+import org.apache.paimon.fs.{FileIO, Path}
+import org.apache.paimon.utils.JsonSerdeUtil
+
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.{LinkedHashMap => JLinkedHashMap}
+
+case class CopyLoadRecord(
+    filePath: String,
+    fileSize: Long,
+    lastModified: Long,
+    loadedAt: Long,
+    snapshotId: Long,
+    rowsLoaded: Long) {
+
+  def toJson: String = {
+    val map = new JLinkedHashMap[String, AnyRef]()
+    map.put("filePath", filePath)
+    map.put("fileSize", java.lang.Long.valueOf(fileSize))
+    map.put("lastModified", java.lang.Long.valueOf(lastModified))
+    map.put("loadedAt", java.lang.Long.valueOf(loadedAt))
+    map.put("snapshotId", java.lang.Long.valueOf(snapshotId))
+    map.put("rowsLoaded", java.lang.Long.valueOf(rowsLoaded))
+    JsonSerdeUtil.toJson(map)
+  }
+}
+
+object CopyLoadRecord {
+  def fromJson(json: String): CopyLoadRecord = {
+    val map = JsonSerdeUtil.parseJsonMap(json, classOf[AnyRef])
+    CopyLoadRecord(
+      filePath = map.get("filePath").toString,
+      fileSize = map.get("fileSize").toString.toLong,
+      lastModified = map.get("lastModified").toString.toLong,
+      loadedAt = map.get("loadedAt").toString.toLong,
+      snapshotId = map.get("snapshotId").toString.toLong,
+      rowsLoaded = map.get("rowsLoaded").toString.toLong
+    )
+  }
+}
+
+class CopyLoadHistoryManager(fileIO: FileIO, tablePath: Path) {
+
+  private val LOG = 
org.slf4j.LoggerFactory.getLogger(classOf[CopyLoadHistoryManager])
+
+  private val historyDir = new Path(tablePath, "copy-into/history")
+
+  def isLoaded(filePath: String, fileSize: Long, lastModified: Long): Boolean 
= {
+    val prefix = s"load-${sha256(filePath)}-"
+    if (!fileIO.exists(historyDir)) return false
+    try {
+      val files = fileIO.listStatus(historyDir)
+      files.exists {
+        status =>
+          val name = status.getPath.getName
+          if (name.startsWith(prefix)) {
+            try {
+              val content = fileIO.readFileUtf8(status.getPath)
+              val record = CopyLoadRecord.fromJson(content)
+              record.fileSize == fileSize && record.lastModified == 
lastModified
+            } catch {
+              case e: Exception =>
+                LOG.warn(s"Failed to read load history record 
${status.getPath}: ${e.getMessage}")
+                false
+            }
+          } else {
+            false
+          }
+      }
+    } catch {
+      case e: Exception =>
+        LOG.warn(s"Failed to list load history directory $historyDir: 
${e.getMessage}")
+        false
+    }
+  }
+
+  def recordLoaded(record: CopyLoadRecord): Unit = {
+    fileIO.mkdirs(historyDir)
+    val hash = sha256(record.filePath)
+    val recordPath = new Path(historyDir, s"load-$hash-${record.loadedAt}")
+    fileIO.overwriteFileUtf8(recordPath, record.toJson)
+  }
+
+  private def sha256(input: String): String = {
+    val digest = MessageDigest.getInstance("SHA-256")
+    val hash = digest.digest(input.getBytes(StandardCharsets.UTF_8))
+    hash.map("%02x".format(_)).mkString.take(16)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
new file mode 100644
index 0000000000..f4f0720b28
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.unsafe.types.UTF8String
+
+case class CopyIntoLocationExec(
+    spark: SparkSession,
+    catalog: TableCatalog,
+    ident: Identifier,
+    targetPath: String,
+    fileFormat: CopyFileFormat,
+    overwrite: Boolean,
+    out: Seq[Attribute])
+  extends PaimonLeafV2CommandExec {
+
+  override def output: Seq[Attribute] = out
+
+  override protected def run(): Seq[InternalRow] = {
+    fileFormat.validateForExport()
+
+    val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+    val df = spark.table(tableName)
+
+    val rowCount = df.count()
+
+    val writerOptions = fileFormat.toSparkWriterOptions
+    val saveMode = if (overwrite) SaveMode.Overwrite else 
SaveMode.ErrorIfExists
+
+    df.write.options(writerOptions).mode(saveMode).csv(targetPath)
+
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val fsPath = new Path(targetPath)
+    val fs = fsPath.getFileSystem(hadoopConf)
+    val fileCount = if (fs.exists(fsPath)) {
+      fs.listStatus(fsPath).count(_.isFile)
+    } else {
+      0
+    }
+
+    Seq(InternalRow(UTF8String.fromString(targetPath), fileCount, rowCount))
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
new file mode 100644
index 0000000000..260add4349
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.copyinto.{CopyLoadHistoryManager, 
CopyLoadRecord}
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataField
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.functions.{col, input_file_name, lit, when}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class CopyIntoTableExec(
+    spark: SparkSession,
+    catalog: TableCatalog,
+    ident: Identifier,
+    sourcePath: String,
+    columns: Option[Seq[String]],
+    fileFormat: CopyFileFormat,
+    pattern: Option[String],
+    force: Boolean,
+    out: Seq[Attribute])
+  extends PaimonLeafV2CommandExec {
+
+  override def output: Seq[Attribute] = out
+
+  override protected def run(): Seq[InternalRow] = {
+    fileFormat.validateForImport()
+
+    val table = catalog.loadTable(ident)
+    assert(table.isInstanceOf[SparkTable])
+    val paimonTable = 
table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable]
+    val tableSchema = paimonTable.schema()
+    val writableColumns = tableSchema.fieldNames().asScala.toSeq
+    val fields = tableSchema.fields().asScala.toSeq
+    val targetColumns = resolveTargetColumns(writableColumns)
+
+    validateNonNullableDefaults(writableColumns, targetColumns, fields)
+
+    val (filesToLoad, skippedFiles) = listAndFilterFiles(paimonTable)
+
+    if (filesToLoad.isEmpty) {
+      return buildSkippedResults(skippedFiles)
+    }
+
+    val filePaths = filesToLoad.map(_.getPath.toString)
+    val stringSchema = StructType(
+      (0 until targetColumns.size).map(i => StructField(s"_c$i", StringType, 
nullable = true)))
+    val readerOptions = fileFormat.toSparkReaderOptions
+
+    val csvDf = readAndProcessCsv(filePaths, stringSchema, readerOptions)
+    val finalDf =
+      buildFinalDataFrame(csvDf, targetColumns, writableColumns, fields)
+    val castedDf = castAndValidate(finalDf, writableColumns, fields)
+
+    val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+    castedDf.write.format("paimon").mode("append").insertInto(tableName)
+
+    recordHistoryAndBuildResults(
+      paimonTable,
+      filesToLoad,
+      skippedFiles,
+      filePaths,
+      stringSchema,
+      readerOptions)
+  }
+
+  private def resolveTargetColumns(writableColumns: Seq[String]): Seq[String] 
= {
+    columns match {
+      case Some(cols) =>
+        val resolver = spark.sessionState.conf.resolver
+        cols.indices.foreach {
+          i =>
+            cols.indices.filter(_ > i).foreach {
+              j =>
+                if (resolver(cols(i), cols(j))) {
+                  throw new IllegalArgumentException(
+                    s"Duplicate columns in column list: ${cols(i)}")
+                }
+            }
+        }
+        cols.map {
+          c =>
+            writableColumns.find(w => resolver(w, c)).getOrElse {
+              throw new IllegalArgumentException(
+                s"Column '$c' does not exist in target table. Available 
columns: ${writableColumns.mkString(", ")}")
+            }
+        }
+      case None => writableColumns
+    }
+  }
+
+  private def validateNonNullableDefaults(
+      writableColumns: Seq[String],
+      targetColumns: Seq[String],
+      fields: Seq[DataField]): Unit = {
+    if (columns.isEmpty) return
+    val unmapped = writableColumns.filterNot(targetColumns.contains)
+    unmapped.foreach {
+      colName =>
+        val field = fields.find(_.name() == colName).get
+        if (!field.`type`().isNullable && field.defaultValue() == null) {
+          throw new IllegalArgumentException(
+            s"Non-nullable column '$colName' is not in the column list and has 
no default value")
+        }
+    }
+  }
+
+  private def listAndFilterFiles(
+      paimonTable: FileStoreTable): (Array[FileStatus], Array[FileStatus]) = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val fsPath = new Path(sourcePath)
+    val fs = fsPath.getFileSystem(hadoopConf)
+    val allFiles = fs.listStatus(fsPath).filter(_.isFile)
+
+    val patternFiltered = pattern match {
+      case Some(p) =>
+        val regex = p.r
+        allFiles.filter(f => regex.findFirstIn(f.getPath.getName).isDefined)
+      case None => allFiles
+    }
+
+    if (patternFiltered.isEmpty) {
+      return (Array.empty, Array.empty)
+    }
+
+    val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
+    val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
+
+    if (!force) {
+      val (skip, load) = patternFiltered.partition {
+        f => historyManager.isLoaded(f.getPath.toString, f.getLen, 
f.getModificationTime)
+      }
+      (load, skip)
+    } else {
+      (patternFiltered, Array.empty[FileStatus])
+    }
+  }
+
+  private def readAndProcessCsv(
+      filePaths: Array[String],
+      stringSchema: StructType,
+      readerOptions: Map[String, String]): DataFrame = {
+    var df = spark.read
+      .options(readerOptions)
+      .schema(stringSchema)
+      .csv(filePaths: _*)
+
+    val nullIfVals = fileFormat.nullIfValues
+    if (nullIfVals.nonEmpty) {
+      df.columns.foreach {
+        colName =>
+          df = df.withColumn(
+            colName,
+            when(col(colName).isin(nullIfVals: _*), lit(null).cast(StringType))
+              .otherwise(col(colName)))
+      }
+    }
+
+    if (fileFormat.emptyFieldAsNull) {
+      df.columns.foreach {
+        colName =>
+          df = df.withColumn(
+            colName,
+            when(col(colName) === lit(""), lit(null).cast(StringType))
+              .otherwise(col(colName)))
+      }
+    }
+
+    df
+  }
+
+  private def buildFinalDataFrame(
+      csvDf: DataFrame,
+      targetColumns: Seq[String],
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): DataFrame = {
+    val renamedDf = targetColumns.zipWithIndex.foldLeft(csvDf) {
+      case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", targetCol)
+    }
+
+    if (columns.isDefined) {
+      val selectExprs: Seq[Column] = writableColumns.map {
+        colName =>
+          if (targetColumns.contains(colName)) {
+            col(colName)
+          } else {
+            val field = fields.find(_.name() == colName).get
+            val defaultVal = field.defaultValue()
+            if (defaultVal != null) {
+              val sparkType =
+                
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+              try {
+                val parsed = 
spark.sessionState.sqlParser.parseExpression(defaultVal)
+                
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
+              } catch {
+                case _: Exception => lit(null).cast(sparkType).as(colName)
+              }
+            } else {
+              lit(null).as(colName)
+            }
+          }
+      }
+      renamedDf.select(selectExprs: _*)
+    } else {
+      renamedDf
+    }
+  }
+
+  private def castAndValidate(
+      finalDf: DataFrame,
+      writableColumns: Seq[String],
+      fields: Seq[DataField]): DataFrame = {
+    val nonStringCastCols = ArrayBuffer[String]()
+    var castedDf = finalDf
+    writableColumns.zip(fields).foreach {
+      case (colName, field) =>
+        val sparkType = 
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+        castedDf = castedDf.withColumn(colName, col(colName).cast(sparkType))
+        if (sparkType != StringType) {
+          nonStringCastCols += colName
+        }
+    }
+
+    if (nonStringCastCols.nonEmpty) {
+      val castSuffix = "__cv"
+      val validationDf = nonStringCastCols.zipWithIndex.foldLeft(finalDf) {
+        case (df, (colName, idx)) =>
+          val field = fields.find(_.name() == colName).get
+          val sparkType = 
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+          df.withColumn(castSuffix + idx, col(colName).cast(sparkType))
+      }
+      val badCastFilter = nonStringCastCols.zipWithIndex
+        .map { case (cn, idx) => col(cn).isNotNull && col(castSuffix + 
idx).isNull }
+        .reduce(_ || _)
+      val badRows = validationDf.filter(badCastFilter).limit(1).collect()
+      if (badRows.nonEmpty) {
+        val example = nonStringCastCols.zipWithIndex.find {
+          case (cn, idx) =>
+            val row = badRows(0)
+            val srcIdx = validationDf.schema.fieldIndex(cn)
+            val dstIdx = validationDf.schema.fieldIndex(castSuffix + idx)
+            !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+        }
+        throw new IllegalArgumentException(
+          s"ON_ERROR = ABORT_STATEMENT: Cast failure in column 
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that 
cannot be converted to the target type.")
+      }
+    }
+
+    castedDf
+  }
+
+  private def recordHistoryAndBuildResults(
+      paimonTable: FileStoreTable,
+      filesToLoad: Array[FileStatus],
+      skippedFiles: Array[FileStatus],
+      filePaths: Array[String],
+      stringSchema: StructType,
+      readerOptions: Map[String, String]): Seq[InternalRow] = {
+    val paimonPath = new 
org.apache.paimon.fs.Path(paimonTable.location().toString)
+    val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(), 
paimonPath)
+    val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+    val loadedAt = System.currentTimeMillis()
+
+    val rowCounts = spark.read
+      .options(readerOptions)
+      .schema(stringSchema)
+      .csv(filePaths: _*)
+      .groupBy(input_file_name().as("file"))
+      .count()
+      .collect()
+
+    val fileCountMap = rowCounts.map {
+      row =>
+        val fullPath = row.getString(0)
+        val baseName = fullPath.substring(fullPath.lastIndexOf('/') + 1)
+        baseName -> row.getLong(1)
+    }.toMap
+
+    val loadedResults = filesToLoad.map {
+      fileStatus =>
+        val baseName = fileStatus.getPath.getName
+        val rowCount = fileCountMap.getOrElse(baseName, 0L)
+
+        historyManager.recordLoaded(
+          CopyLoadRecord(
+            filePath = fileStatus.getPath.toString,
+            fileSize = fileStatus.getLen,
+            lastModified = fileStatus.getModificationTime,
+            loadedAt = loadedAt,
+            snapshotId = snapshotId,
+            rowsLoaded = rowCount
+          ))
+
+        InternalRow(
+          UTF8String.fromString(baseName),
+          UTF8String.fromString("LOADED"),
+          rowCount,
+          rowCount)
+    }.toSeq
+
+    val skippedResults = buildSkippedResults(skippedFiles)
+    loadedResults ++ skippedResults
+  }
+
+  private def buildSkippedResults(files: Array[FileStatus]): Seq[InternalRow] 
= {
+    files.map {
+      f =>
+        InternalRow(
+          UTF8String.fromString(f.getPath.getName),
+          UTF8String.fromString("SKIPPED"),
+          0L,
+          0L)
+    }.toSeq
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
new file mode 100644
index 0000000000..dd85af058d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.connector.catalog.Identifier
+
+object CopyIntoUtils {
+
+  def quoteIdentifier(catalogName: String, ident: Identifier): String = {
+    val parts = Seq(catalogName) ++
+      ident.namespace().toSeq ++
+      Seq(ident.name())
+    parts.filter(_.nonEmpty).map(p => s"`${p.replace("`", 
"``")}`").mkString(".")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 63c61a16e8..76bd6beb15 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable, 
SparkUtils}
 import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
 import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
-import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, 
PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, 
ShowTagsCommand, TruncatePaimonTableWithFilter}
+import 
org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, 
CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView, 
DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, 
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, 
TruncatePaimonTableWithFilter}
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.SparkSession
@@ -149,6 +149,28 @@ case class PaimonStrategy(spark: SparkSession)
           partitionPredicate: Option[PartitionPredicate]) =>
       TruncatePaimonTableWithFilterExec(table, partitionPredicate) :: Nil
 
+    case c @ CopyIntoTableCommand(PaimonCatalogAndIdentifier(catalog, ident), 
_, _, _, _, _) =>
+      CopyIntoTableExec(
+        spark,
+        catalog,
+        ident,
+        c.sourcePath,
+        c.columns,
+        c.fileFormat,
+        c.pattern,
+        c.force,
+        c.output) :: Nil
+
+    case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog, 
ident), _, _) =>
+      CopyIntoLocationExec(
+        spark,
+        catalog,
+        ident,
+        c.targetPath,
+        c.fileFormat,
+        c.overwrite,
+        c.output) :: Nil
+
     case _ => Nil
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 0398df809f..67f72d953a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -122,7 +122,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
       .replaceAll("/\\*.*?\\*/", " ")
       .replaceAll("`", "")
       .trim()
-    isPaimonProcedure(normalized) || isTagRefDdl(normalized)
+    isPaimonProcedure(normalized) || isTagRefDdl(normalized) || 
isCopyInto(normalized)
   }
 
   // All builtin paimon procedures are under the 'sys' namespace
@@ -140,6 +140,10 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
         normalized.contains("delete tag")))
   }
 
+  private def isCopyInto(normalized: String): Boolean = {
+    normalized.startsWith("copy into")
+  }
+
   protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser 
=> T): T = {
     val lexer = new PaimonSqlExtensionsLexer(
       new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index a1289a5f0b..dd3f3c4d15 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser
 import org.apache.spark.sql.catalyst.plans.logical._
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
  * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
@@ -153,6 +154,101 @@ class PaimonSqlExtensionsAstBuilder(delegate: 
ParserInterface)
       ctx.identifier(1).getText)
   }
 
+  /** Create a COPY INTO TABLE (import) logical command. */
+  override def visitCopyIntoTable(ctx: CopyIntoTableContext): 
logical.CopyIntoTableCommand =
+    withOrigin(ctx) {
+      val table = typedVisit[Seq[String]](ctx.multipartIdentifier)
+      val columns = 
Option(ctx.columnList()).map(_.identifier().asScala.map(_.getText).toSeq)
+      val sourcePath = unquoteString(ctx.sourcePath.getText)
+      val fileFormat = buildFileFormat(ctx.fileFormatClause())
+      val pattern = Option(ctx.patternClause()).map(p => 
unquoteString(p.STRING().getText))
+      val force = Option(ctx.forceClause()).exists(_.booleanValue().TRUE() != 
null)
+      logical.CopyIntoTableCommand(table, columns, sourcePath, fileFormat, 
pattern, force)
+    }
+
+  /** Create a COPY INTO LOCATION (export) logical command. */
+  override def visitCopyIntoLocation(
+      ctx: CopyIntoLocationContext): logical.CopyIntoLocationCommand = 
withOrigin(ctx) {
+    val targetPath = unquoteString(ctx.targetPath.getText)
+    val table = typedVisit[Seq[String]](ctx.multipartIdentifier)
+    val fileFormat = buildFileFormat(ctx.fileFormatClause())
+    val overwrite = 
Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null)
+    logical.CopyIntoLocationCommand(targetPath, table, fileFormat, overwrite)
+  }
+
+  private def buildFileFormat(ctx: FileFormatClauseContext): CopyFileFormat = {
+    val opts = ctx.fileFormatOption().asScala.toSeq
+    val seen = mutable.Set[String]()
+    val optionsBuilder = mutable.LinkedHashMap[String, String]()
+
+    opts.foreach {
+      opt =>
+        val key = opt.key.getText.toUpperCase
+        if (!seen.add(key)) {
+          throw new IllegalArgumentException(s"Duplicate FILE_FORMAT option: 
$key")
+        }
+        val value = extractFormatValue(opt.fileFormatValue())
+        optionsBuilder(key) = value
+    }
+
+    val typeValue = optionsBuilder.remove("TYPE")
+    if (typeValue.isEmpty) {
+      throw new IllegalArgumentException("FILE_FORMAT must include TYPE")
+    }
+
+    val formatType = CopyFileFormat.parseFormatType(typeValue.get)
+
+    CopyFileFormat(formatType, optionsBuilder.toMap)
+  }
+
+  private def extractFormatValue(ctx: FileFormatValueContext): String = {
+    ctx match {
+      case c: StringFormatValueContext =>
+        unquoteString(c.STRING().getText)
+      case c: IdentFormatValueContext =>
+        c.identifier().getText
+      case c: BoolFormatValueContext =>
+        if (c.booleanValue().TRUE() != null) "TRUE" else "FALSE"
+      case c: IntFormatValueContext =>
+        c.INTEGER_VALUE().getText
+      case c: ListFormatValueContext =>
+        c.STRING()
+          .asScala
+          .map(s => unquoteString(s.getText))
+          .mkString(CopyFileFormat.LIST_SEPARATOR)
+    }
+  }
+
+  private def unquoteString(s: String): String = {
+    if (s == null || s.length < 2) return s
+    val first = s.charAt(0)
+    if ((first == '\'' || first == '"') && s.charAt(s.length - 1) == first) {
+      val inner = s.substring(1, s.length - 1)
+      val sb = new StringBuilder
+      var i = 0
+      while (i < inner.length) {
+        val c = inner.charAt(i)
+        if (c == '\\' && i + 1 < inner.length) {
+          inner.charAt(i + 1) match {
+            case 'n' => sb.append('\n'); i += 2
+            case 't' => sb.append('\t'); i += 2
+            case 'r' => sb.append('\r'); i += 2
+            case '\\' => sb.append('\\'); i += 2
+            case q if q == first => sb.append(q); i += 2
+            case other => sb.append('\\'); sb.append(other); i += 2
+          }
+        } else if (c == first && i + 1 < inner.length && inner.charAt(i + 1) 
== first) {
+          sb.append(first); i += 2
+        } else {
+          sb.append(c); i += 1
+        }
+      }
+      sb.toString()
+    } else {
+      s
+    }
+  }
+
   private def toBuffer[T](list: java.util.List[T]) = list.asScala
 
   private def toSeq[T](list: java.util.List[T]) = toBuffer(list)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
new file mode 100644
index 0000000000..f3fb75a8c6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Files
+
+class CopyIntoTestBase extends PaimonSparkTestBase {
+
+  private def createCsvFile(dir: File, name: String, content: String): File = {
+    val file = new File(dir, name)
+    val writer = new PrintWriter(file)
+    try writer.write(content)
+    finally writer.close()
+    file
+  }
+
+  private def withCsvDir(testBody: File => Unit): Unit = {
+    val dir = Files.createTempDirectory("copy_into_test").toFile
+    try testBody(dir)
+    finally deleteRecursively(dir)
+  }
+
+  private def deleteRecursively(file: File): Unit = {
+    if (file.isDirectory) {
+      file.listFiles().foreach(deleteRecursively)
+    }
+    file.delete()
+  }
+
+  test("COPY INTO: basic CSV import") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_basic")
+    spark.sql(s"CREATE TABLE $dbName0.copy_basic (id INT, name STRING, age 
INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice,30\n2,Bob,25\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_basic
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |""".stripMargin)
+
+        assert(result.collect().length > 0)
+        val row = result.collect()(0)
+        assert(row.getString(1) == "LOADED")
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_basic ORDER BY id"),
+          Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_basic")
+  }
+
+  test("COPY INTO: CSV with custom delimiter") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_delim")
+    spark.sql(s"CREATE TABLE $dbName0.copy_delim (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1|Alice\n2|Bob\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_delim
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|')
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_delim ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_delim")
+  }
+
+  test("COPY INTO: CSV with SKIP_HEADER") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_header")
+    spark.sql(s"CREATE TABLE $dbName0.copy_header (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "id,name\n1,Alice\n2,Bob\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_header
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_header ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_header")
+  }
+
+  test("COPY INTO: CSV with quote and escape") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_quote")
+    spark.sql(s"CREATE TABLE $dbName0.copy_quote (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,\"Alice \"\"the 
great\"\"\"\n2,\"Bob\"\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_quote
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV, QUOTE = '"', ESCAPE = '"')
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_quote ORDER BY id"),
+          Seq(Row(1, "Alice \"the great\""), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_quote")
+  }
+
+  test("COPY INTO: NULL_IF with multiple values") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_null")
+    spark.sql(s"CREATE TABLE $dbName0.copy_null (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,NULL\n2,\\N\n3,Alice\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_null
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV, NULL_IF = ('NULL', '\\N'))
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_null ORDER BY id"),
+          Seq(Row(1, null), Row(2, null), Row(3, "Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_null")
+  }
+
+  test("COPY INTO: EMPTY_FIELD_AS_NULL") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_empty")
+    spark.sql(s"CREATE TABLE $dbName0.copy_empty (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,\n2,Alice\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_empty
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV, EMPTY_FIELD_AS_NULL = TRUE)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_empty ORDER BY id"),
+          Seq(Row(1, null), Row(2, "Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_empty")
+  }
+
+  test("COPY INTO: PATTERN filters files") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_pattern")
+    spark.sql(s"CREATE TABLE $dbName0.copy_pattern (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data1.csv", "1,Alice\n")
+        createCsvFile(dir, "data2.csv", "2,Bob\n")
+        createCsvFile(dir, "ignore.txt", "3,Charlie\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_pattern
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |PATTERN = '.*\\.csv'
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_pattern ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_pattern")
+  }
+
+  test("COPY INTO: explicit column list") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_cols")
+    spark.sql(s"CREATE TABLE $dbName0.copy_cols (id INT, name STRING, age 
INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_cols (id, name)
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_cols ORDER BY id"),
+          Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_cols")
+  }
+
+  test("COPY INTO: unsupported TYPE errors at validation time") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup")
+    spark.sql(s"CREATE TABLE $dbName0.copy_unsup (id INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1\n")
+
+        val e = intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO $dbName0.copy_unsup
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = PARQUET)
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("Unsupported file format type"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup")
+  }
+
+  test("COPY INTO: missing TYPE errors") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_notype")
+    spark.sql(s"CREATE TABLE $dbName0.copy_notype (id INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1\n")
+
+        val e = intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO $dbName0.copy_notype
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (FIELD_DELIMITER = ',')
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("FILE_FORMAT must include TYPE"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_notype")
+  }
+
+  test("COPY INTO: duplicate option key errors") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup")
+    spark.sql(s"CREATE TABLE $dbName0.copy_dup (id INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1\n")
+
+        val e = intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO $dbName0.copy_dup
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV, TYPE = CSV)
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("Duplicate FILE_FORMAT option"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup")
+  }
+
+  test("COPY INTO: export table to CSV") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export (id INT, name STRING)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export VALUES (1, 'Alice'), (2, 
'Bob')")
+
+    withCsvDir {
+      dir =>
+        val outputPath = new File(dir, "output").getAbsolutePath
+
+        val result = spark.sql(s"""COPY INTO '$outputPath'
+                                  |FROM $dbName0.copy_export
+                                  |FILE_FORMAT = (TYPE = CSV, HEADER = TRUE)
+                                  |""".stripMargin)
+
+        val row = result.collect()(0)
+        assert(row.getString(0) == outputPath)
+        assert(row.getLong(2) == 2L)
+
+        // Verify exported data
+        val exported = spark.read.option("header", "true").csv(outputPath)
+        assert(exported.count() == 2)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export")
+  }
+
+  test("COPY INTO: export OVERWRITE=FALSE fails on existing path") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_fail")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_fail (id INT)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export_fail VALUES (1)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "existing.csv", "data\n")
+
+        intercept[Exception] {
+          spark.sql(s"""COPY INTO '${dir.getAbsolutePath}'
+                       |FROM $dbName0.copy_export_fail
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_fail")
+  }
+
+  test("COPY INTO: export OVERWRITE=TRUE succeeds") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_ow")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_ow (id INT, name STRING)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export_ow VALUES (1, 'Alice')")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "existing.csv", "old data\n")
+
+        val result = spark.sql(s"""COPY INTO '${dir.getAbsolutePath}'
+                                  |FROM $dbName0.copy_export_ow
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |OVERWRITE = TRUE
+                                  |""".stripMargin)
+
+        assert(result.collect()(0).getLong(2) == 1L)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_ow")
+  }
+
+  test("COPY INTO: FORCE=FALSE skips already-loaded files") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force")
+    spark.sql(s"CREATE TABLE $dbName0.copy_force (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+        // First load
+        spark.sql(s"""COPY INTO $dbName0.copy_force
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |""".stripMargin)
+
+        // Second load with FORCE=FALSE (default) should skip
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_force
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |FORCE = FALSE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1) == "SKIPPED")
+
+        // Table should still have only original 2 rows
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_force ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force")
+  }
+
+  test("COPY INTO: FORCE=TRUE reloads already-loaded files") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force_true")
+    spark.sql(s"CREATE TABLE $dbName0.copy_force_true (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n")
+
+        // First load
+        spark.sql(s"""COPY INTO $dbName0.copy_force_true
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |""".stripMargin)
+
+        // Second load with FORCE=TRUE should re-import
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_force_true
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |FORCE = TRUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1) == "LOADED")
+
+        // Table should have duplicated data (2 rows total)
+        assert(spark.sql(s"SELECT * FROM $dbName0.copy_force_true").count() == 
2)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force_true")
+  }
+
+  test("COPY INTO: bad numeric cast fails with ABORT_STATEMENT") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_badcast")
+    spark.sql(s"CREATE TABLE $dbName0.copy_badcast (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "abc,Alice\n")
+
+        val e = intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_badcast
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+        val msg = e.getMessage
+        assert(
+          msg.contains("Cast failure") ||
+            msg.contains("ABORT_STATEMENT") ||
+            msg.contains("CAST_INVALID_INPUT") ||
+            msg.contains("cannot be cast to") ||
+            e.getCause != null)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_badcast")
+  }
+
+  test("COPY INTO: no namespace table works") {
+    spark.sql(s"DROP TABLE IF EXISTS copy_no_ns")
+    spark.sql(s"CREATE TABLE copy_no_ns (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n")
+
+        spark.sql(s"""COPY INTO copy_no_ns
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |""".stripMargin)
+
+        checkAnswer(spark.sql(s"SELECT * FROM copy_no_ns"), Seq(Row(1, 
"Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS copy_no_ns")
+  }
+
+  test("COPY INTO: lowercase options are accepted") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_lcase")
+    spark.sql(s"CREATE TABLE $dbName0.copy_lcase (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1|Alice\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_lcase
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (type = csv, field_delimiter = '|')
+                     |""".stripMargin)
+
+        checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_lcase"), 
Seq(Row(1, "Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_lcase")
+  }
+
+  test("COPY INTO: unknown option key errors") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unknown_opt")
+    spark.sql(s"CREATE TABLE $dbName0.copy_unknown_opt (id INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1\n")
+
+        val e = intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_unknown_opt
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV, BOGUS_OPTION = TRUE)
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("Unsupported FILE_FORMAT options"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unknown_opt")
+  }
+
+  test("COPY INTO: SKIP_HEADER > 1 errors") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skip2")
+    spark.sql(s"CREATE TABLE $dbName0.copy_skip2 (id INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "h1\nh2\n1\n")
+
+        val e = intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_skip2
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 2)
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("SKIP_HEADER supports only 0 or 1"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skip2")
+  }
+
+  test("COPY INTO: export rejects import-only options") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_bad")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_bad (id INT)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export_bad VALUES (1)")
+
+    withCsvDir {
+      dir =>
+        val outputPath = new File(dir, "out").getAbsolutePath
+        val e = intercept[Exception] {
+          spark.sql(s"""COPY INTO '$outputPath'
+                       |FROM $dbName0.copy_export_bad
+                       |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1)
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("Unsupported FILE_FORMAT options for 
export"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_bad")
+  }
+
+  test("COPY INTO: explicit column list with default value column") {
+    assume(gteqSpark3_4)
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_default")
+    spark.sql(
+      s"CREATE TABLE $dbName0.copy_default (id INT, name STRING, status STRING 
DEFAULT 'active')")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_default (id, name)
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_default ORDER BY id"),
+          Seq(Row(1, "Alice", "active"), Row(2, "Bob", "active")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_default")
+  }
+
+  test("COPY INTO: too many CSV columns fails") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toomany")
+    spark.sql(s"CREATE TABLE $dbName0.copy_toomany (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice,extra,columns\n")
+
+        intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_toomany
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+        assert(spark.sql(s"SELECT * FROM $dbName0.copy_toomany").count() == 0)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toomany")
+  }
+
+  test("COPY INTO: too few CSV columns fails") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toofew")
+    spark.sql(s"CREATE TABLE $dbName0.copy_toofew (id INT, name STRING, age 
INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n")
+
+        intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_toofew
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+        assert(spark.sql(s"SELECT * FROM $dbName0.copy_toofew").count() == 0)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toofew")
+  }
+
+  test("COPY INTO: rows_loaded count is accurate") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_count")
+    spark.sql(s"CREATE TABLE $dbName0.copy_count (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n3,Charlie\n")
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_count
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1) == "LOADED")
+        assert(rows(0).getLong(2) == 3L)
+        assert(rows(0).getLong(3) == 3L)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_count")
+  }
+
+  test("COPY INTO: duplicate column list errors") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
+    spark.sql(s"CREATE TABLE $dbName0.copy_dup_col (id INT, name STRING)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n")
+
+        val e = intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO $dbName0.copy_dup_col (id, id)
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("Duplicate columns"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
+  }
+
+  test("COPY INTO: case-insensitive column matching") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
+    spark.sql(s"CREATE TABLE $dbName0.copy_case (id INT, name STRING, age 
INT)")
+
+    withCsvDir {
+      dir =>
+        createCsvFile(dir, "data.csv", "1,Alice\n")
+
+        spark.sql(s"""COPY INTO $dbName0.copy_case (ID, NAME)
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |""".stripMargin)
+
+        checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_case"), Seq(Row(1, 
"Alice", null)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
+  }
+}

Reply via email to