This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e6fba9a68c [spark] Add COPY INTO support for CSV import and file
writing (#7926)
e6fba9a68c is described below
commit e6fba9a68c3260c492042f22f6bedde674d7e037
Author: Junrui Lee <[email protected]>
AuthorDate: Fri May 22 20:20:10 2026 +0800
[spark] Add COPY INTO support for CSV import and file writing (#7926)
This PR adds Spark SQL `COPY INTO` support for bulk CSV import and CSV
file writing.
Supported import syntax:
```sql
COPY INTO table_name [(col1, col2, ...)]
FROM 'source_path'
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[PATTERN = 'regex']
[FORCE = TRUE|FALSE]
[ON_ERROR = ABORT_STATEMENT]
```
Supported file writing syntax:
```sql
COPY INTO 'target_path'
FROM table_name
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```
---
docs/docs/spark/sql-write.md | 155 +++++
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 21 +
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 21 +
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 21 +
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 21 +
.../AbstractPaimonSparkSqlExtensionsParser.scala | 6 +-
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 21 +
.../org/apache/paimon/spark/sql/CopyIntoTest.scala | 21 +
.../PaimonSqlExtensions.g4 | 59 ++
.../plans/logical/CopyIntoLocationCommand.scala | 42 ++
.../plans/logical/CopyIntoTableCommand.scala | 45 ++
.../spark/catalyst/plans/logical/CopyOptions.scala | 145 +++++
.../spark/copyinto/CopyLoadHistoryManager.scala | 109 ++++
.../spark/execution/CopyIntoLocationExec.scala | 67 +++
.../paimon/spark/execution/CopyIntoTableExec.scala | 344 +++++++++++
.../paimon/spark/execution/CopyIntoUtils.scala | 31 +
.../paimon/spark/execution/PaimonStrategy.scala | 24 +-
.../AbstractPaimonSparkSqlExtensionsParser.scala | 6 +-
.../extensions/PaimonSqlExtensionsAstBuilder.scala | 96 +++
.../apache/paimon/spark/sql/CopyIntoTestBase.scala | 667 +++++++++++++++++++++
20 files changed, 1919 insertions(+), 3 deletions(-)
diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 74bec3f9e7..3a05500ccc 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -294,3 +294,158 @@ INSERT INTO t VALUES (1, '1'), (2, '2');
-- Need using `BY NAME` statement (requires Spark 3.5+)
INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
```
+
+## COPY INTO
+
+`COPY INTO` provides a SQL command for bulk loading CSV files into Paimon
tables and writing table data to CSV files.
+
+### CSV Import
+
+```sql
+COPY INTO table_name [(col1, col2, ...)]
+FROM 'source_path'
+FILE_FORMAT = (TYPE = CSV [, option = value, ...])
+[PATTERN = 'regex']
+[FORCE = TRUE|FALSE]
+[ON_ERROR = ABORT_STATEMENT]
+```
+
+**Basic import:**
+
+```sql
+COPY INTO my_db.my_table
+FROM '/data/csv_files/'
+FILE_FORMAT = (TYPE = CSV);
+```
+
+**Import with explicit column mapping:**
+
+```sql
+-- Only load into specified columns; omitted columns use their DEFAULT value
or NULL
+COPY INTO my_db.users (id, name)
+FROM '/data/new_users/'
+FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1);
+```
+
+**Import with NULL_IF and PATTERN:**
+
+```sql
+COPY INTO my_db.events
+FROM '/data/logs/'
+FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|', NULL_IF = ('NULL', '\\N',
''))
+PATTERN = '.*\.csv'
+FORCE = FALSE;
+```
+
+### Write CSV Files
+
+```sql
+COPY INTO 'target_path'
+FROM table_name
+FILE_FORMAT = (TYPE = CSV [, option = value, ...])
+[OVERWRITE = TRUE|FALSE]
+```
+
+**Write with header and overwrite:**
+
+```sql
+COPY INTO '/export/users_backup/'
+FROM my_db.users
+FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER = ',')
+OVERWRITE = TRUE;
+```
+
+### FILE_FORMAT Options
+
+`FILE_FORMAT` is required and must include `TYPE = CSV`.
+
+**Import options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. Must be `CSV`. | (required) |
+| FIELD_DELIMITER | Column delimiter character. | `,` |
+| SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
+| QUOTE | Quote character for enclosing fields. | `"` |
+| ESCAPE | Escape character within quoted fields. | `\` |
+| NULL_IF | List of string values to interpret as NULL, e.g. `('NULL',
'\\N')`. | (none) |
+| EMPTY_FIELD_AS_NULL | Treat empty fields as NULL. `TRUE` or `FALSE`. |
`FALSE` |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
+**Write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. Must be `CSV`. | (required) |
+| FIELD_DELIMITER | Column delimiter character. | `,` |
+| HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
+| QUOTE | Quote character for enclosing fields. | `"` |
+| ESCAPE | Escape character within quoted fields. | `\` |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
+### Import Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| PATTERN | Regex to filter source files by base file name. Only matching
files are loaded. | (all files) |
+| FORCE | `FALSE`: skip files already loaded (idempotent). `TRUE`: reload all
files. | `FALSE` |
+| ON_ERROR | Error handling strategy. Only `ABORT_STATEMENT` is supported. |
`ABORT_STATEMENT` |
+
+### File Write Options
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| OVERWRITE | `FALSE`: fail if target path exists. `TRUE`: overwrite existing
files. | `FALSE` |
+
+### Column Mapping
+
+When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM
...`):
+
+- CSV columns are mapped **positionally** to the specified column list.
+- The number of CSV columns must match the column list length.
+- Columns not in the list are filled with their **DEFAULT value** (if defined
in the table schema) or **NULL**.
+- Non-nullable columns without a default value that are not in the list will
cause an error.
+
+When no column list is provided:
+
+- CSV columns are mapped positionally to all writable columns in the target
table.
+- The number of CSV columns must match the number of writable columns.
+
+### Repeated Imports
+
+By default (`FORCE = FALSE`), COPY INTO tracks which files have been
successfully loaded. A file is identified by its path, size, and last-modified
timestamp.
+
+- Re-running the same COPY INTO command will **skip** already-loaded files and
return status `SKIPPED`.
+- If a source file is modified (size or timestamp changes), it becomes
eligible for re-loading.
+- `FORCE = TRUE` bypasses load history and always re-imports all matching
files.
+
+### Result Output
+
+**Import** returns one row per source file:
+
+| Column | Type | Description |
+|--------|------|-------------|
+| file_name | STRING | Source file name |
+| status | STRING | `LOADED` or `SKIPPED` |
+| rows_loaded | BIGINT | Number of rows written |
+| rows_parsed | BIGINT | Number of rows parsed from the file |
+
+**File write** returns a single row:
+
+| Column | Type | Description |
+|--------|------|-------------|
+| output_path | STRING | Target output path |
+| file_count | INT | Number of files written |
+| rows_written | BIGINT | Total rows written |
+
+### Limitations
+
+- Only **CSV** format is supported.
+- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not
supported.
+- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the
entire command.
+- `SINGLE = TRUE` (single-file output) is not supported.
+- File format options must be specified inline in `FILE_FORMAT = (...)`.
+- File listing is **non-recursive**: only direct files under the source path
are processed. Subdirectories are ignored.
+- `PATTERN` matches the **base file name** only (not the full path).
+- Concurrent COPY INTO commands targeting the same table may produce duplicate
data.
+- `SKIP_HEADER` only supports values `0` or `1`.
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 0398df809f..67f72d953a 100644
---
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -122,7 +122,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val
delegate: ParserInterf
.replaceAll("/\\*.*?\\*/", " ")
.replaceAll("`", "")
.trim()
- isPaimonProcedure(normalized) || isTagRefDdl(normalized)
+ isPaimonProcedure(normalized) || isTagRefDdl(normalized) ||
isCopyInto(normalized)
}
// All builtin paimon procedures are under the 'sys' namespace
@@ -140,6 +140,10 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val
delegate: ParserInterf
normalized.contains("delete tag")))
}
+ private def isCopyInto(normalized: String): Boolean = {
+ normalized.startsWith("copy into")
+ }
+
protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser
=> T): T = {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git
a/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
new file mode 100644
index 0000000000..d1515031e5
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.1/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class CopyIntoTest extends CopyIntoTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 207d973216..4255e530c1 100644
---
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -74,6 +74,16 @@ statement
| ALTER TABLE multipartIdentifier createReplaceTagClause
#createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier
#deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier
#renameTag
+ | COPY INTO multipartIdentifier ('(' columnList ')')?
+ FROM sourcePath=STRING
+ fileFormatClause
+ patternClause?
+ forceClause?
+ onErrorClause?
#copyIntoTable
+ | COPY INTO targetPath=STRING
+ FROM multipartIdentifier
+ fileFormatClause
+ overwriteClause?
#copyIntoLocation
;
callArgument
@@ -104,6 +114,42 @@ timeUnit
| MINUTES
;
+columnList
+ : identifier (',' identifier)*
+ ;
+
+fileFormatClause
+ : FILE_FORMAT '=' '(' fileFormatOption (',' fileFormatOption)* ')'
+ ;
+
+fileFormatOption
+ : key=identifier '=' fileFormatValue
+ ;
+
+fileFormatValue
+ : STRING
#stringFormatValue
+ | identifier
#identFormatValue
+ | booleanValue
#boolFormatValue
+ | INTEGER_VALUE
#intFormatValue
+ | '(' STRING (',' STRING)* ')'
#listFormatValue
+ ;
+
+patternClause
+ : PATTERN '=' STRING
+ ;
+
+forceClause
+ : FORCE '=' booleanValue
+ ;
+
+onErrorClause
+ : ON_ERROR '=' ABORT_STATEMENT
+ ;
+
+overwriteClause
+ : OVERWRITE '=' booleanValue
+ ;
+
expression
: constant
| stringMap
@@ -155,6 +201,8 @@ nonReserved
| REPLACE | RETAIN | VERSION | TAG
| TRUE | FALSE
| MAP
+ | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR |
ABORT_STATEMENT | OVERWRITE
+ | CSV
;
ALTER: 'ALTER';
@@ -185,6 +233,17 @@ FALSE: 'FALSE';
MAP: 'MAP';
+COPY: 'COPY';
+INTO: 'INTO';
+FROM: 'FROM';
+FILE_FORMAT: 'FILE_FORMAT';
+PATTERN: 'PATTERN';
+FORCE: 'FORCE';
+ON_ERROR: 'ON_ERROR';
+ABORT_STATEMENT: 'ABORT_STATEMENT';
+OVERWRITE: 'OVERWRITE';
+CSV: 'CSV';
+
PLUS: '+';
MINUS: '-';
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
new file mode 100644
index 0000000000..3e5d0a0666
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.leafnode.PaimonLeafCommand
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
+
+case class CopyIntoLocationCommand(
+ targetPath: String,
+ table: Seq[String],
+ fileFormat: CopyFileFormat,
+ overwrite: Boolean)
+ extends PaimonLeafCommand {
+
+ override def output: Seq[Attribute] = Seq(
+ AttributeReference("output_path", StringType, nullable = false)(),
+ AttributeReference("file_count", IntegerType, nullable = false)(),
+ AttributeReference("rows_written", LongType, nullable = false)()
+ )
+
+ override def simpleString(maxFields: Int): String = {
+ s"CopyIntoLocation: target=$targetPath, source=$table"
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
new file mode 100644
index 0000000000..eedad9763e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoTableCommand.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.leafnode.PaimonLeafCommand
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
+import org.apache.spark.sql.types.{LongType, StringType}
+
+case class CopyIntoTableCommand(
+ table: Seq[String],
+ columns: Option[Seq[String]],
+ sourcePath: String,
+ fileFormat: CopyFileFormat,
+ pattern: Option[String],
+ force: Boolean)
+ extends PaimonLeafCommand {
+
+ override def output: Seq[Attribute] = Seq(
+ AttributeReference("file_name", StringType, nullable = false)(),
+ AttributeReference("status", StringType, nullable = false)(),
+ AttributeReference("rows_loaded", LongType, nullable = false)(),
+ AttributeReference("rows_parsed", LongType, nullable = false)()
+ )
+
+ override def simpleString(maxFields: Int): String = {
+ s"CopyIntoTable: table=$table, source=$sourcePath"
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
new file mode 100644
index 0000000000..2e2f7e2ec1
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+sealed trait FileFormatType
+
+object FileFormatType {
+ case object CSV extends FileFormatType
+ case class Unsupported(name: String) extends FileFormatType
+}
+
+case class CopyFileFormat(formatType: FileFormatType, options: Map[String,
String]) {
+
+ def toSparkReaderOptions: Map[String, String] = {
+ val mapped = scala.collection.mutable.Map[String, String]("mode" ->
"FAILFAST")
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "FIELD_DELIMITER" => mapped("sep") = v
+ case "QUOTE" => mapped("quote") = v
+ case "ESCAPE" => mapped("escape") = v
+ case "COMPRESSION" => mapped("compression") = v
+ case "SKIP_HEADER" =>
+ mapped("header") = if (v == "1" || v.equalsIgnoreCase("TRUE"))
"true" else "false"
+ case _ =>
+ }
+ }
+ mapped.toMap
+ }
+
+ def toSparkWriterOptions: Map[String, String] = {
+ val mapped = scala.collection.mutable.Map[String, String]()
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "FIELD_DELIMITER" => mapped("sep") = v
+ case "HEADER" => mapped("header") = v.toLowerCase
+ case "QUOTE" => mapped("quote") = v
+ case "ESCAPE" => mapped("escape") = v
+ case "COMPRESSION" => mapped("compression") = v
+ case _ =>
+ }
+ }
+ mapped.toMap
+ }
+
+ def nullIfValues: Seq[String] = {
+ options.get("NULL_IF") match {
+ case Some(v) if v.nonEmpty =>
+ v.split(CopyFileFormat.LIST_SEPARATOR, -1).toSeq
+ case _ => Seq.empty
+ }
+ }
+
+ def emptyFieldAsNull: Boolean = {
+ options.get("EMPTY_FIELD_AS_NULL").exists(v => v == "TRUE" ||
v.equalsIgnoreCase("TRUE"))
+ }
+
+ def validateForImport(): Unit = {
+ validateFormatType()
+ if (options.contains("MODE")) {
+ throw new IllegalArgumentException(
+ "MODE cannot be specified in FILE_FORMAT options; it is reserved for
ON_ERROR handling")
+ }
+ val invalid =
options.keys.filterNot(CopyFileFormat.VALID_IMPORT_KEYS.contains)
+ if (invalid.nonEmpty) {
+ throw new IllegalArgumentException(
+ s"Unsupported FILE_FORMAT options for import: ${invalid.mkString(",
")}")
+ }
+ options.get("SKIP_HEADER").foreach {
+ v =>
+ val intVal =
+ try v.toInt
+ catch { case _: NumberFormatException => -1 }
+ if (intVal != 0 && intVal != 1) {
+ throw new IllegalArgumentException(s"SKIP_HEADER supports only 0 or
1, got: $v")
+ }
+ }
+ }
+
+ def validateForExport(): Unit = {
+ validateFormatType()
+ val invalid =
options.keys.filterNot(CopyFileFormat.VALID_EXPORT_KEYS.contains)
+ if (invalid.nonEmpty) {
+ throw new IllegalArgumentException(
+ s"Unsupported FILE_FORMAT options for export: ${invalid.mkString(",
")}")
+ }
+ }
+
+ private def validateFormatType(): Unit = {
+ formatType match {
+ case FileFormatType.CSV =>
+ case FileFormatType.Unsupported(name) =>
+ throw new IllegalArgumentException(
+ s"Unsupported file format type: $name. Only CSV is currently
supported")
+ }
+ }
+}
+
+object CopyFileFormat {
+
+ val VALID_IMPORT_KEYS: Set[String] = Set(
+ "FIELD_DELIMITER",
+ "SKIP_HEADER",
+ "QUOTE",
+ "ESCAPE",
+ "NULL_IF",
+ "EMPTY_FIELD_AS_NULL",
+ "COMPRESSION"
+ )
+
+ val VALID_EXPORT_KEYS: Set[String] = Set(
+ "FIELD_DELIMITER",
+ "HEADER",
+ "QUOTE",
+ "ESCAPE",
+ "COMPRESSION"
+ )
+
+ // Unit Separator (U+001F) used to encode multi-value lists in a single
string
+ val LIST_SEPARATOR: String = "\u001f"
+
+ def parseFormatType(typeStr: String): FileFormatType = {
+ typeStr.toUpperCase match {
+ case "CSV" => FileFormatType.CSV
+ case other => FileFormatType.Unsupported(other)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala
new file mode 100644
index 0000000000..c85a552151
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/copyinto/CopyLoadHistoryManager.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.copyinto
+
+import org.apache.paimon.fs.{FileIO, Path}
+import org.apache.paimon.utils.JsonSerdeUtil
+
+import java.nio.charset.StandardCharsets
+import java.security.MessageDigest
+import java.util.{LinkedHashMap => JLinkedHashMap}
+
+case class CopyLoadRecord(
+ filePath: String,
+ fileSize: Long,
+ lastModified: Long,
+ loadedAt: Long,
+ snapshotId: Long,
+ rowsLoaded: Long) {
+
+ def toJson: String = {
+ val map = new JLinkedHashMap[String, AnyRef]()
+ map.put("filePath", filePath)
+ map.put("fileSize", java.lang.Long.valueOf(fileSize))
+ map.put("lastModified", java.lang.Long.valueOf(lastModified))
+ map.put("loadedAt", java.lang.Long.valueOf(loadedAt))
+ map.put("snapshotId", java.lang.Long.valueOf(snapshotId))
+ map.put("rowsLoaded", java.lang.Long.valueOf(rowsLoaded))
+ JsonSerdeUtil.toJson(map)
+ }
+}
+
+object CopyLoadRecord {
+ def fromJson(json: String): CopyLoadRecord = {
+ val map = JsonSerdeUtil.parseJsonMap(json, classOf[AnyRef])
+ CopyLoadRecord(
+ filePath = map.get("filePath").toString,
+ fileSize = map.get("fileSize").toString.toLong,
+ lastModified = map.get("lastModified").toString.toLong,
+ loadedAt = map.get("loadedAt").toString.toLong,
+ snapshotId = map.get("snapshotId").toString.toLong,
+ rowsLoaded = map.get("rowsLoaded").toString.toLong
+ )
+ }
+}
+
+class CopyLoadHistoryManager(fileIO: FileIO, tablePath: Path) {
+
+ private val LOG =
org.slf4j.LoggerFactory.getLogger(classOf[CopyLoadHistoryManager])
+
+ private val historyDir = new Path(tablePath, "copy-into/history")
+
+ def isLoaded(filePath: String, fileSize: Long, lastModified: Long): Boolean
= {
+ val prefix = s"load-${sha256(filePath)}-"
+ if (!fileIO.exists(historyDir)) return false
+ try {
+ val files = fileIO.listStatus(historyDir)
+ files.exists {
+ status =>
+ val name = status.getPath.getName
+ if (name.startsWith(prefix)) {
+ try {
+ val content = fileIO.readFileUtf8(status.getPath)
+ val record = CopyLoadRecord.fromJson(content)
+ record.fileSize == fileSize && record.lastModified ==
lastModified
+ } catch {
+ case e: Exception =>
+ LOG.warn(s"Failed to read load history record
${status.getPath}: ${e.getMessage}")
+ false
+ }
+ } else {
+ false
+ }
+ }
+ } catch {
+ case e: Exception =>
+ LOG.warn(s"Failed to list load history directory $historyDir:
${e.getMessage}")
+ false
+ }
+ }
+
+ def recordLoaded(record: CopyLoadRecord): Unit = {
+ fileIO.mkdirs(historyDir)
+ val hash = sha256(record.filePath)
+ val recordPath = new Path(historyDir, s"load-$hash-${record.loadedAt}")
+ fileIO.overwriteFileUtf8(recordPath, record.toJson)
+ }
+
+ private def sha256(input: String): String = {
+ val digest = MessageDigest.getInstance("SHA-256")
+ val hash = digest.digest(input.getBytes(StandardCharsets.UTF_8))
+ hash.map("%02x".format(_)).mkString.take(16)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
new file mode 100644
index 0000000000..f4f0720b28
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.unsafe.types.UTF8String
+
+case class CopyIntoLocationExec(
+ spark: SparkSession,
+ catalog: TableCatalog,
+ ident: Identifier,
+ targetPath: String,
+ fileFormat: CopyFileFormat,
+ overwrite: Boolean,
+ out: Seq[Attribute])
+ extends PaimonLeafV2CommandExec {
+
+ override def output: Seq[Attribute] = out
+
+ override protected def run(): Seq[InternalRow] = {
+ fileFormat.validateForExport()
+
+ val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+ val df = spark.table(tableName)
+
+ val rowCount = df.count()
+
+ val writerOptions = fileFormat.toSparkWriterOptions
+ val saveMode = if (overwrite) SaveMode.Overwrite else
SaveMode.ErrorIfExists
+
+ df.write.options(writerOptions).mode(saveMode).csv(targetPath)
+
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val fsPath = new Path(targetPath)
+ val fs = fsPath.getFileSystem(hadoopConf)
+ val fileCount = if (fs.exists(fsPath)) {
+ fs.listStatus(fsPath).count(_.isFile)
+ } else {
+ 0
+ }
+
+ Seq(InternalRow(UTF8String.fromString(targetPath), fileCount, rowCount))
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
new file mode 100644
index 0000000000..260add4349
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.copyinto.{CopyLoadHistoryManager,
CopyLoadRecord}
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataField
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.functions.{col, input_file_name, lit, when}
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class CopyIntoTableExec(
+ spark: SparkSession,
+ catalog: TableCatalog,
+ ident: Identifier,
+ sourcePath: String,
+ columns: Option[Seq[String]],
+ fileFormat: CopyFileFormat,
+ pattern: Option[String],
+ force: Boolean,
+ out: Seq[Attribute])
+ extends PaimonLeafV2CommandExec {
+
+ override def output: Seq[Attribute] = out
+
+ override protected def run(): Seq[InternalRow] = {
+ fileFormat.validateForImport()
+
+ val table = catalog.loadTable(ident)
+ assert(table.isInstanceOf[SparkTable])
+ val paimonTable =
table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable]
+ val tableSchema = paimonTable.schema()
+ val writableColumns = tableSchema.fieldNames().asScala.toSeq
+ val fields = tableSchema.fields().asScala.toSeq
+ val targetColumns = resolveTargetColumns(writableColumns)
+
+ validateNonNullableDefaults(writableColumns, targetColumns, fields)
+
+ val (filesToLoad, skippedFiles) = listAndFilterFiles(paimonTable)
+
+ if (filesToLoad.isEmpty) {
+ return buildSkippedResults(skippedFiles)
+ }
+
+ val filePaths = filesToLoad.map(_.getPath.toString)
+ val stringSchema = StructType(
+ (0 until targetColumns.size).map(i => StructField(s"_c$i", StringType,
nullable = true)))
+ val readerOptions = fileFormat.toSparkReaderOptions
+
+ val csvDf = readAndProcessCsv(filePaths, stringSchema, readerOptions)
+ val finalDf =
+ buildFinalDataFrame(csvDf, targetColumns, writableColumns, fields)
+ val castedDf = castAndValidate(finalDf, writableColumns, fields)
+
+ val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
+ castedDf.write.format("paimon").mode("append").insertInto(tableName)
+
+ recordHistoryAndBuildResults(
+ paimonTable,
+ filesToLoad,
+ skippedFiles,
+ filePaths,
+ stringSchema,
+ readerOptions)
+ }
+
+ private def resolveTargetColumns(writableColumns: Seq[String]): Seq[String]
= {
+ columns match {
+ case Some(cols) =>
+ val resolver = spark.sessionState.conf.resolver
+ cols.indices.foreach {
+ i =>
+ cols.indices.filter(_ > i).foreach {
+ j =>
+ if (resolver(cols(i), cols(j))) {
+ throw new IllegalArgumentException(
+ s"Duplicate columns in column list: ${cols(i)}")
+ }
+ }
+ }
+ cols.map {
+ c =>
+ writableColumns.find(w => resolver(w, c)).getOrElse {
+ throw new IllegalArgumentException(
+ s"Column '$c' does not exist in target table. Available
columns: ${writableColumns.mkString(", ")}")
+ }
+ }
+ case None => writableColumns
+ }
+ }
+
+ private def validateNonNullableDefaults(
+ writableColumns: Seq[String],
+ targetColumns: Seq[String],
+ fields: Seq[DataField]): Unit = {
+ if (columns.isEmpty) return
+ val unmapped = writableColumns.filterNot(targetColumns.contains)
+ unmapped.foreach {
+ colName =>
+ val field = fields.find(_.name() == colName).get
+ if (!field.`type`().isNullable && field.defaultValue() == null) {
+ throw new IllegalArgumentException(
+ s"Non-nullable column '$colName' is not in the column list and has
no default value")
+ }
+ }
+ }
+
+ private def listAndFilterFiles(
+ paimonTable: FileStoreTable): (Array[FileStatus], Array[FileStatus]) = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val fsPath = new Path(sourcePath)
+ val fs = fsPath.getFileSystem(hadoopConf)
+ val allFiles = fs.listStatus(fsPath).filter(_.isFile)
+
+ val patternFiltered = pattern match {
+ case Some(p) =>
+ val regex = p.r
+ allFiles.filter(f => regex.findFirstIn(f.getPath.getName).isDefined)
+ case None => allFiles
+ }
+
+ if (patternFiltered.isEmpty) {
+ return (Array.empty, Array.empty)
+ }
+
+ val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
+ val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
+
+ if (!force) {
+ val (skip, load) = patternFiltered.partition {
+ f => historyManager.isLoaded(f.getPath.toString, f.getLen,
f.getModificationTime)
+ }
+ (load, skip)
+ } else {
+ (patternFiltered, Array.empty[FileStatus])
+ }
+ }
+
+ private def readAndProcessCsv(
+ filePaths: Array[String],
+ stringSchema: StructType,
+ readerOptions: Map[String, String]): DataFrame = {
+ var df = spark.read
+ .options(readerOptions)
+ .schema(stringSchema)
+ .csv(filePaths: _*)
+
+ val nullIfVals = fileFormat.nullIfValues
+ if (nullIfVals.nonEmpty) {
+ df.columns.foreach {
+ colName =>
+ df = df.withColumn(
+ colName,
+ when(col(colName).isin(nullIfVals: _*), lit(null).cast(StringType))
+ .otherwise(col(colName)))
+ }
+ }
+
+ if (fileFormat.emptyFieldAsNull) {
+ df.columns.foreach {
+ colName =>
+ df = df.withColumn(
+ colName,
+ when(col(colName) === lit(""), lit(null).cast(StringType))
+ .otherwise(col(colName)))
+ }
+ }
+
+ df
+ }
+
+ private def buildFinalDataFrame(
+ csvDf: DataFrame,
+ targetColumns: Seq[String],
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): DataFrame = {
+ val renamedDf = targetColumns.zipWithIndex.foldLeft(csvDf) {
+ case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", targetCol)
+ }
+
+ if (columns.isDefined) {
+ val selectExprs: Seq[Column] = writableColumns.map {
+ colName =>
+ if (targetColumns.contains(colName)) {
+ col(colName)
+ } else {
+ val field = fields.find(_.name() == colName).get
+ val defaultVal = field.defaultValue()
+ if (defaultVal != null) {
+ val sparkType =
+
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ try {
+ val parsed =
spark.sessionState.sqlParser.parseExpression(defaultVal)
+
SparkShimLoader.shim.classicApi.column(parsed).cast(sparkType).as(colName)
+ } catch {
+ case _: Exception => lit(null).cast(sparkType).as(colName)
+ }
+ } else {
+ lit(null).as(colName)
+ }
+ }
+ }
+ renamedDf.select(selectExprs: _*)
+ } else {
+ renamedDf
+ }
+ }
+
+ private def castAndValidate(
+ finalDf: DataFrame,
+ writableColumns: Seq[String],
+ fields: Seq[DataField]): DataFrame = {
+ val nonStringCastCols = ArrayBuffer[String]()
+ var castedDf = finalDf
+ writableColumns.zip(fields).foreach {
+ case (colName, field) =>
+ val sparkType =
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ castedDf = castedDf.withColumn(colName, col(colName).cast(sparkType))
+ if (sparkType != StringType) {
+ nonStringCastCols += colName
+ }
+ }
+
+ if (nonStringCastCols.nonEmpty) {
+ val castSuffix = "__cv"
+ val validationDf = nonStringCastCols.zipWithIndex.foldLeft(finalDf) {
+ case (df, (colName, idx)) =>
+ val field = fields.find(_.name() == colName).get
+ val sparkType =
org.apache.paimon.spark.SparkTypeUtils.fromPaimonType(field.`type`())
+ df.withColumn(castSuffix + idx, col(colName).cast(sparkType))
+ }
+ val badCastFilter = nonStringCastCols.zipWithIndex
+ .map { case (cn, idx) => col(cn).isNotNull && col(castSuffix +
idx).isNull }
+ .reduce(_ || _)
+ val badRows = validationDf.filter(badCastFilter).limit(1).collect()
+ if (badRows.nonEmpty) {
+ val example = nonStringCastCols.zipWithIndex.find {
+ case (cn, idx) =>
+ val row = badRows(0)
+ val srcIdx = validationDf.schema.fieldIndex(cn)
+ val dstIdx = validationDf.schema.fieldIndex(castSuffix + idx)
+ !row.isNullAt(srcIdx) && row.isNullAt(dstIdx)
+ }
+ throw new IllegalArgumentException(
+ s"ON_ERROR = ABORT_STATEMENT: Cast failure in column
'${example.map(_._1).getOrElse("unknown")}'. Source data contains values that
cannot be converted to the target type.")
+ }
+ }
+
+ castedDf
+ }
+
+ private def recordHistoryAndBuildResults(
+ paimonTable: FileStoreTable,
+ filesToLoad: Array[FileStatus],
+ skippedFiles: Array[FileStatus],
+ filePaths: Array[String],
+ stringSchema: StructType,
+ readerOptions: Map[String, String]): Seq[InternalRow] = {
+ val paimonPath = new
org.apache.paimon.fs.Path(paimonTable.location().toString)
+ val historyManager = new CopyLoadHistoryManager(paimonTable.fileIO(),
paimonPath)
+ val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
+ val loadedAt = System.currentTimeMillis()
+
+ val rowCounts = spark.read
+ .options(readerOptions)
+ .schema(stringSchema)
+ .csv(filePaths: _*)
+ .groupBy(input_file_name().as("file"))
+ .count()
+ .collect()
+
+ val fileCountMap = rowCounts.map {
+ row =>
+ val fullPath = row.getString(0)
+ val baseName = fullPath.substring(fullPath.lastIndexOf('/') + 1)
+ baseName -> row.getLong(1)
+ }.toMap
+
+ val loadedResults = filesToLoad.map {
+ fileStatus =>
+ val baseName = fileStatus.getPath.getName
+ val rowCount = fileCountMap.getOrElse(baseName, 0L)
+
+ historyManager.recordLoaded(
+ CopyLoadRecord(
+ filePath = fileStatus.getPath.toString,
+ fileSize = fileStatus.getLen,
+ lastModified = fileStatus.getModificationTime,
+ loadedAt = loadedAt,
+ snapshotId = snapshotId,
+ rowsLoaded = rowCount
+ ))
+
+ InternalRow(
+ UTF8String.fromString(baseName),
+ UTF8String.fromString("LOADED"),
+ rowCount,
+ rowCount)
+ }.toSeq
+
+ val skippedResults = buildSkippedResults(skippedFiles)
+ loadedResults ++ skippedResults
+ }
+
+ private def buildSkippedResults(files: Array[FileStatus]): Seq[InternalRow]
= {
+ files.map {
+ f =>
+ InternalRow(
+ UTF8String.fromString(f.getPath.getName),
+ UTF8String.fromString("SKIPPED"),
+ 0L,
+ 0L)
+ }.toSeq
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
new file mode 100644
index 0000000000..dd85af058d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.spark.sql.connector.catalog.Identifier
+
+object CopyIntoUtils {
+
+ def quoteIdentifier(catalogName: String, ident: Identifier): String = {
+ val parts = Seq(catalogName) ++
+ ident.namespace().toSeq ++
+ Seq(ident.name())
+ parts.filter(_.nonEmpty).map(p => s"`${p.replace("`",
"``")}`").mkString(".")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 63c61a16e8..76bd6beb15 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable,
SparkUtils}
import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
-import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand,
PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews,
ShowTagsCommand, TruncatePaimonTableWithFilter}
+import
org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand,
CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView,
DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions,
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand,
TruncatePaimonTableWithFilter}
import org.apache.paimon.table.Table
import org.apache.spark.sql.SparkSession
@@ -149,6 +149,28 @@ case class PaimonStrategy(spark: SparkSession)
partitionPredicate: Option[PartitionPredicate]) =>
TruncatePaimonTableWithFilterExec(table, partitionPredicate) :: Nil
+ case c @ CopyIntoTableCommand(PaimonCatalogAndIdentifier(catalog, ident),
_, _, _, _, _) =>
+ CopyIntoTableExec(
+ spark,
+ catalog,
+ ident,
+ c.sourcePath,
+ c.columns,
+ c.fileFormat,
+ c.pattern,
+ c.force,
+ c.output) :: Nil
+
+ case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog,
ident), _, _) =>
+ CopyIntoLocationExec(
+ spark,
+ catalog,
+ ident,
+ c.targetPath,
+ c.fileFormat,
+ c.overwrite,
+ c.output) :: Nil
+
case _ => Nil
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index 0398df809f..67f72d953a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -122,7 +122,7 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val
delegate: ParserInterf
.replaceAll("/\\*.*?\\*/", " ")
.replaceAll("`", "")
.trim()
- isPaimonProcedure(normalized) || isTagRefDdl(normalized)
+ isPaimonProcedure(normalized) || isTagRefDdl(normalized) ||
isCopyInto(normalized)
}
// All builtin paimon procedures are under the 'sys' namespace
@@ -140,6 +140,10 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val
delegate: ParserInterf
normalized.contains("delete tag")))
}
+ private def isCopyInto(normalized: String): Boolean = {
+ normalized.startsWith("copy into")
+ }
+
protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser
=> T): T = {
val lexer = new PaimonSqlExtensionsLexer(
new UpperCaseCharStream(CharStreams.fromString(command)))
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index a1289a5f0b..dd3f3c4d15 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -33,6 +33,7 @@ import
org.apache.spark.sql.catalyst.parser.extensions.PaimonSqlExtensionsParser
import org.apache.spark.sql.catalyst.plans.logical._
import scala.collection.JavaConverters._
+import scala.collection.mutable
/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
@@ -153,6 +154,101 @@ class PaimonSqlExtensionsAstBuilder(delegate:
ParserInterface)
ctx.identifier(1).getText)
}
+ /** Create a COPY INTO TABLE (import) logical command. */
+ override def visitCopyIntoTable(ctx: CopyIntoTableContext):
logical.CopyIntoTableCommand =
+ withOrigin(ctx) {
+ val table = typedVisit[Seq[String]](ctx.multipartIdentifier)
+ val columns =
Option(ctx.columnList()).map(_.identifier().asScala.map(_.getText).toSeq)
+ val sourcePath = unquoteString(ctx.sourcePath.getText)
+ val fileFormat = buildFileFormat(ctx.fileFormatClause())
+ val pattern = Option(ctx.patternClause()).map(p =>
unquoteString(p.STRING().getText))
+ val force = Option(ctx.forceClause()).exists(_.booleanValue().TRUE() !=
null)
+ logical.CopyIntoTableCommand(table, columns, sourcePath, fileFormat,
pattern, force)
+ }
+
+ /** Create a COPY INTO LOCATION (export) logical command. */
+ override def visitCopyIntoLocation(
+ ctx: CopyIntoLocationContext): logical.CopyIntoLocationCommand =
withOrigin(ctx) {
+ val targetPath = unquoteString(ctx.targetPath.getText)
+ val table = typedVisit[Seq[String]](ctx.multipartIdentifier)
+ val fileFormat = buildFileFormat(ctx.fileFormatClause())
+ val overwrite =
Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null)
+ logical.CopyIntoLocationCommand(targetPath, table, fileFormat, overwrite)
+ }
+
+ private def buildFileFormat(ctx: FileFormatClauseContext): CopyFileFormat = {
+ val opts = ctx.fileFormatOption().asScala.toSeq
+ val seen = mutable.Set[String]()
+ val optionsBuilder = mutable.LinkedHashMap[String, String]()
+
+ opts.foreach {
+ opt =>
+ val key = opt.key.getText.toUpperCase
+ if (!seen.add(key)) {
+ throw new IllegalArgumentException(s"Duplicate FILE_FORMAT option:
$key")
+ }
+ val value = extractFormatValue(opt.fileFormatValue())
+ optionsBuilder(key) = value
+ }
+
+ val typeValue = optionsBuilder.remove("TYPE")
+ if (typeValue.isEmpty) {
+ throw new IllegalArgumentException("FILE_FORMAT must include TYPE")
+ }
+
+ val formatType = CopyFileFormat.parseFormatType(typeValue.get)
+
+ CopyFileFormat(formatType, optionsBuilder.toMap)
+ }
+
+ private def extractFormatValue(ctx: FileFormatValueContext): String = {
+ ctx match {
+ case c: StringFormatValueContext =>
+ unquoteString(c.STRING().getText)
+ case c: IdentFormatValueContext =>
+ c.identifier().getText
+ case c: BoolFormatValueContext =>
+ if (c.booleanValue().TRUE() != null) "TRUE" else "FALSE"
+ case c: IntFormatValueContext =>
+ c.INTEGER_VALUE().getText
+ case c: ListFormatValueContext =>
+ c.STRING()
+ .asScala
+ .map(s => unquoteString(s.getText))
+ .mkString(CopyFileFormat.LIST_SEPARATOR)
+ }
+ }
+
+ private def unquoteString(s: String): String = {
+ if (s == null || s.length < 2) return s
+ val first = s.charAt(0)
+ if ((first == '\'' || first == '"') && s.charAt(s.length - 1) == first) {
+ val inner = s.substring(1, s.length - 1)
+ val sb = new StringBuilder
+ var i = 0
+ while (i < inner.length) {
+ val c = inner.charAt(i)
+ if (c == '\\' && i + 1 < inner.length) {
+ inner.charAt(i + 1) match {
+ case 'n' => sb.append('\n'); i += 2
+ case 't' => sb.append('\t'); i += 2
+ case 'r' => sb.append('\r'); i += 2
+ case '\\' => sb.append('\\'); i += 2
+ case q if q == first => sb.append(q); i += 2
+ case other => sb.append('\\'); sb.append(other); i += 2
+ }
+ } else if (c == first && i + 1 < inner.length && inner.charAt(i + 1)
== first) {
+ sb.append(first); i += 2
+ } else {
+ sb.append(c); i += 1
+ }
+ }
+ sb.toString()
+ } else {
+ s
+ }
+ }
+
private def toBuffer[T](list: java.util.List[T]) = list.asScala
private def toSeq[T](list: java.util.List[T]) = toBuffer(list)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
new file mode 100644
index 0000000000..f3fb75a8c6
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+import java.io.{File, PrintWriter}
+import java.nio.file.Files
+
+class CopyIntoTestBase extends PaimonSparkTestBase {
+
+ private def createCsvFile(dir: File, name: String, content: String): File = {
+ val file = new File(dir, name)
+ val writer = new PrintWriter(file)
+ try writer.write(content)
+ finally writer.close()
+ file
+ }
+
+ private def withCsvDir(testBody: File => Unit): Unit = {
+ val dir = Files.createTempDirectory("copy_into_test").toFile
+ try testBody(dir)
+ finally deleteRecursively(dir)
+ }
+
+ private def deleteRecursively(file: File): Unit = {
+ if (file.isDirectory) {
+ file.listFiles().foreach(deleteRecursively)
+ }
+ file.delete()
+ }
+
+ test("COPY INTO: basic CSV import") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_basic")
+ spark.sql(s"CREATE TABLE $dbName0.copy_basic (id INT, name STRING, age
INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice,30\n2,Bob,25\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_basic
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ assert(result.collect().length > 0)
+ val row = result.collect()(0)
+ assert(row.getString(1) == "LOADED")
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_basic ORDER BY id"),
+ Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_basic")
+ }
+
+ test("COPY INTO: CSV with custom delimiter") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_delim")
+ spark.sql(s"CREATE TABLE $dbName0.copy_delim (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1|Alice\n2|Bob\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_delim
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, FIELD_DELIMITER = '|')
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_delim ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_delim")
+ }
+
+ test("COPY INTO: CSV with SKIP_HEADER") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_header")
+ spark.sql(s"CREATE TABLE $dbName0.copy_header (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "id,name\n1,Alice\n2,Bob\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_header
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_header ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_header")
+ }
+
+ test("COPY INTO: CSV with quote and escape") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_quote")
+ spark.sql(s"CREATE TABLE $dbName0.copy_quote (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,\"Alice \"\"the
great\"\"\"\n2,\"Bob\"\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_quote
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, QUOTE = '"', ESCAPE = '"')
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_quote ORDER BY id"),
+ Seq(Row(1, "Alice \"the great\""), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_quote")
+ }
+
+ test("COPY INTO: NULL_IF with multiple values") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_null")
+ spark.sql(s"CREATE TABLE $dbName0.copy_null (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,NULL\n2,\\N\n3,Alice\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_null
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, NULL_IF = ('NULL', '\\N'))
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_null ORDER BY id"),
+ Seq(Row(1, null), Row(2, null), Row(3, "Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_null")
+ }
+
+ test("COPY INTO: EMPTY_FIELD_AS_NULL") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_empty")
+ spark.sql(s"CREATE TABLE $dbName0.copy_empty (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,\n2,Alice\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_empty
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, EMPTY_FIELD_AS_NULL = TRUE)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_empty ORDER BY id"),
+ Seq(Row(1, null), Row(2, "Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_empty")
+ }
+
+ test("COPY INTO: PATTERN filters files") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_pattern")
+ spark.sql(s"CREATE TABLE $dbName0.copy_pattern (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data1.csv", "1,Alice\n")
+ createCsvFile(dir, "data2.csv", "2,Bob\n")
+ createCsvFile(dir, "ignore.txt", "3,Charlie\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_pattern
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |PATTERN = '.*\\.csv'
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_pattern ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_pattern")
+ }
+
+ test("COPY INTO: explicit column list") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_cols")
+ spark.sql(s"CREATE TABLE $dbName0.copy_cols (id INT, name STRING, age
INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_cols (id, name)
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_cols ORDER BY id"),
+ Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_cols")
+ }
+
+ test("COPY INTO: unsupported TYPE errors at validation time") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup")
+ spark.sql(s"CREATE TABLE $dbName0.copy_unsup (id INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1\n")
+
+ val e = intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO $dbName0.copy_unsup
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("Unsupported file format type"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup")
+ }
+
+ test("COPY INTO: missing TYPE errors") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_notype")
+ spark.sql(s"CREATE TABLE $dbName0.copy_notype (id INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1\n")
+
+ val e = intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO $dbName0.copy_notype
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (FIELD_DELIMITER = ',')
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("FILE_FORMAT must include TYPE"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_notype")
+ }
+
+ test("COPY INTO: duplicate option key errors") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup")
+ spark.sql(s"CREATE TABLE $dbName0.copy_dup (id INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1\n")
+
+ val e = intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO $dbName0.copy_dup
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("Duplicate FILE_FORMAT option"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup")
+ }
+
+ test("COPY INTO: export table to CSV") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export (id INT, name STRING)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export VALUES (1, 'Alice'), (2,
'Bob')")
+
+ withCsvDir {
+ dir =>
+ val outputPath = new File(dir, "output").getAbsolutePath
+
+ val result = spark.sql(s"""COPY INTO '$outputPath'
+ |FROM $dbName0.copy_export
+ |FILE_FORMAT = (TYPE = CSV, HEADER = TRUE)
+ |""".stripMargin)
+
+ val row = result.collect()(0)
+ assert(row.getString(0) == outputPath)
+ assert(row.getLong(2) == 2L)
+
+ // Verify exported data
+ val exported = spark.read.option("header", "true").csv(outputPath)
+ assert(exported.count() == 2)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export")
+ }
+
+ test("COPY INTO: export OVERWRITE=FALSE fails on existing path") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_fail")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_fail (id INT)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export_fail VALUES (1)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "existing.csv", "data\n")
+
+ intercept[Exception] {
+ spark.sql(s"""COPY INTO '${dir.getAbsolutePath}'
+ |FROM $dbName0.copy_export_fail
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_fail")
+ }
+
+ test("COPY INTO: export OVERWRITE=TRUE succeeds") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_ow")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_ow (id INT, name STRING)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export_ow VALUES (1, 'Alice')")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "existing.csv", "old data\n")
+
+ val result = spark.sql(s"""COPY INTO '${dir.getAbsolutePath}'
+ |FROM $dbName0.copy_export_ow
+ |FILE_FORMAT = (TYPE = CSV)
+ |OVERWRITE = TRUE
+ |""".stripMargin)
+
+ assert(result.collect()(0).getLong(2) == 1L)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_ow")
+ }
+
+ test("COPY INTO: FORCE=FALSE skips already-loaded files") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force")
+ spark.sql(s"CREATE TABLE $dbName0.copy_force (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+ // First load
+ spark.sql(s"""COPY INTO $dbName0.copy_force
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ // Second load with FORCE=FALSE (default) should skip
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_force
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |FORCE = FALSE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getString(1) == "SKIPPED")
+
+ // Table should still have only original 2 rows
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_force ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force")
+ }
+
+ test("COPY INTO: FORCE=TRUE reloads already-loaded files") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force_true")
+ spark.sql(s"CREATE TABLE $dbName0.copy_force_true (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n")
+
+ // First load
+ spark.sql(s"""COPY INTO $dbName0.copy_force_true
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ // Second load with FORCE=TRUE should re-import
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_force_true
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |FORCE = TRUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getString(1) == "LOADED")
+
+ // Table should have duplicated data (2 rows total)
+ assert(spark.sql(s"SELECT * FROM $dbName0.copy_force_true").count() ==
2)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_force_true")
+ }
+
+ test("COPY INTO: bad numeric cast fails with ABORT_STATEMENT") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_badcast")
+ spark.sql(s"CREATE TABLE $dbName0.copy_badcast (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "abc,Alice\n")
+
+ val e = intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_badcast
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ val msg = e.getMessage
+ assert(
+ msg.contains("Cast failure") ||
+ msg.contains("ABORT_STATEMENT") ||
+ msg.contains("CAST_INVALID_INPUT") ||
+ msg.contains("cannot be cast to") ||
+ e.getCause != null)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_badcast")
+ }
+
+ test("COPY INTO: no namespace table works") {
+ spark.sql(s"DROP TABLE IF EXISTS copy_no_ns")
+ spark.sql(s"CREATE TABLE copy_no_ns (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n")
+
+ spark.sql(s"""COPY INTO copy_no_ns
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ checkAnswer(spark.sql(s"SELECT * FROM copy_no_ns"), Seq(Row(1,
"Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS copy_no_ns")
+ }
+
+ test("COPY INTO: lowercase options are accepted") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_lcase")
+ spark.sql(s"CREATE TABLE $dbName0.copy_lcase (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1|Alice\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_lcase
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (type = csv, field_delimiter = '|')
+ |""".stripMargin)
+
+ checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_lcase"),
Seq(Row(1, "Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_lcase")
+ }
+
+ test("COPY INTO: unknown option key errors") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unknown_opt")
+ spark.sql(s"CREATE TABLE $dbName0.copy_unknown_opt (id INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1\n")
+
+ val e = intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_unknown_opt
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, BOGUS_OPTION = TRUE)
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("Unsupported FILE_FORMAT options"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unknown_opt")
+ }
+
+ test("COPY INTO: SKIP_HEADER > 1 errors") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skip2")
+ spark.sql(s"CREATE TABLE $dbName0.copy_skip2 (id INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "h1\nh2\n1\n")
+
+ val e = intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_skip2
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 2)
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("SKIP_HEADER supports only 0 or 1"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_skip2")
+ }
+
+ test("COPY INTO: export rejects import-only options") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_bad")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_bad (id INT)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export_bad VALUES (1)")
+
+ withCsvDir {
+ dir =>
+ val outputPath = new File(dir, "out").getAbsolutePath
+ val e = intercept[Exception] {
+ spark.sql(s"""COPY INTO '$outputPath'
+ |FROM $dbName0.copy_export_bad
+ |FILE_FORMAT = (TYPE = CSV, SKIP_HEADER = 1)
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("Unsupported FILE_FORMAT options for
export"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_bad")
+ }
+
+ test("COPY INTO: explicit column list with default value column") {
+ assume(gteqSpark3_4)
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_default")
+ spark.sql(
+ s"CREATE TABLE $dbName0.copy_default (id INT, name STRING, status STRING
DEFAULT 'active')")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_default (id, name)
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_default ORDER BY id"),
+ Seq(Row(1, "Alice", "active"), Row(2, "Bob", "active")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_default")
+ }
+
+ test("COPY INTO: too many CSV columns fails") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toomany")
+ spark.sql(s"CREATE TABLE $dbName0.copy_toomany (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice,extra,columns\n")
+
+ intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_toomany
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(spark.sql(s"SELECT * FROM $dbName0.copy_toomany").count() == 0)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toomany")
+ }
+
+ test("COPY INTO: too few CSV columns fails") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toofew")
+ spark.sql(s"CREATE TABLE $dbName0.copy_toofew (id INT, name STRING, age
INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n")
+
+ intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_toofew
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(spark.sql(s"SELECT * FROM $dbName0.copy_toofew").count() == 0)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_toofew")
+ }
+
+ test("COPY INTO: rows_loaded count is accurate") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_count")
+ spark.sql(s"CREATE TABLE $dbName0.copy_count (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n2,Bob\n3,Charlie\n")
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_count
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getString(1) == "LOADED")
+ assert(rows(0).getLong(2) == 3L)
+ assert(rows(0).getLong(3) == 3L)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_count")
+ }
+
+ test("COPY INTO: duplicate column list errors") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
+ spark.sql(s"CREATE TABLE $dbName0.copy_dup_col (id INT, name STRING)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n")
+
+ val e = intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO $dbName0.copy_dup_col (id, id)
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("Duplicate columns"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
+ }
+
+ test("COPY INTO: case-insensitive column matching") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
+ spark.sql(s"CREATE TABLE $dbName0.copy_case (id INT, name STRING, age
INT)")
+
+ withCsvDir {
+ dir =>
+ createCsvFile(dir, "data.csv", "1,Alice\n")
+
+ spark.sql(s"""COPY INTO $dbName0.copy_case (ID, NAME)
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ checkAnswer(spark.sql(s"SELECT * FROM $dbName0.copy_case"), Seq(Row(1,
"Alice", null)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
+ }
+}