This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a9ce9b8b64 [spark] Support JSON format in COPY INTO (#7993)
a9ce9b8b64 is described below

commit a9ce9b8b64beb280baf981825a2a463b226647a9
Author: Junrui Lee <[email protected]>
AuthorDate: Fri May 29 13:23:43 2026 +0800

    [spark] Support JSON format in COPY INTO (#7993)
    
    - Add JSON format support for `COPY INTO` import and export, alongside
    existing CSV support
    - JSON uses column-name matching (not positional), with options for
    `MULTI_LINE`, `NULL_IF`, `EMPTY_FIELD_AS_NULL`, and `COMPRESSION`
    - CSV-only options (e.g. `FIELD_DELIMITER`, `SKIP_HEADER`) are rejected
    for JSON format with clear error messages
---
 docs/docs/spark/sql-write.md                       |  89 ++++-
 .../PaimonSqlExtensions.g4                         |   2 +
 .../spark/catalyst/plans/logical/CopyOptions.scala | 117 ++++--
 .../spark/execution/CopyIntoLocationExec.scala     |   9 +-
 .../paimon/spark/execution/CopyIntoTableExec.scala |  54 ++-
 .../apache/paimon/spark/sql/CopyIntoTestBase.scala | 421 ++++++++++++++++++++-
 6 files changed, 629 insertions(+), 63 deletions(-)

diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 518bf0e5b4..449d8871ed 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -314,7 +314,7 @@ INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
 
 ## COPY INTO
 
-`COPY INTO` provides a SQL command for bulk loading CSV files into Paimon 
tables and writing table data to CSV files.
+`COPY INTO` provides a SQL command for bulk loading data files into Paimon 
tables and exporting table data to files. Supported formats: **CSV** and 
**JSON**.
 
 ### CSV Import
 
@@ -354,6 +354,35 @@ PATTERN = '.*\.csv'
 FORCE = FALSE;
 ```
 
+### JSON Import
+
+```sql
+COPY INTO table_name [(col1, col2, ...)]
+FROM 'source_path'
+FILE_FORMAT = (TYPE = JSON [, option = value, ...])
+[PATTERN = 'regex']
+[FORCE = TRUE|FALSE]
+[ON_ERROR = ABORT_STATEMENT]
+```
+
+**Basic import:**
+
+```sql
+COPY INTO my_db.my_table
+FROM '/data/json_files/'
+FILE_FORMAT = (TYPE = JSON);
+```
+
+**Import multi-line JSON array:**
+
+```sql
+COPY INTO my_db.events
+FROM '/data/events/'
+FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE);
+```
+
+JSON columns are matched **by column name** (not by position), so source field 
order does not matter.
+
 ### Write CSV Files
 
 ```sql
@@ -372,15 +401,33 @@ FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER 
= ',')
 OVERWRITE = TRUE;
 ```
 
+### Write JSON Files
+
+```sql
+COPY INTO 'target_path'
+FROM table_name
+FILE_FORMAT = (TYPE = JSON [, option = value, ...])
+[OVERWRITE = TRUE|FALSE]
+```
+
+**Basic JSON export:**
+
+```sql
+COPY INTO '/export/events_backup/'
+FROM my_db.events
+FILE_FORMAT = (TYPE = JSON)
+OVERWRITE = TRUE;
+```
+
 ### FILE_FORMAT Options
 
-`FILE_FORMAT` is required and must include `TYPE = CSV`.
+`FILE_FORMAT` is required and must include `TYPE = CSV` or `TYPE = JSON`.
 
-**Import options:**
+**CSV import options:**
 
 | Option | Description | Default |
 |--------|-------------|---------|
-| TYPE | File format type. Must be `CSV`. | (required) |
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
 | FIELD_DELIMITER | Column delimiter character. | `,` |
 | SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
 | QUOTE | Quote character for enclosing fields. | `"` |
@@ -389,17 +436,36 @@ OVERWRITE = TRUE;
 | EMPTY_FIELD_AS_NULL | Treat empty fields as NULL. `TRUE` or `FALSE`. | 
`FALSE` |
 | COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
 
-**Write options:**
+**JSON import options:**
 
 | Option | Description | Default |
 |--------|-------------|---------|
-| TYPE | File format type. Must be `CSV`. | (required) |
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| MULTI_LINE | Parse multi-line JSON (e.g. JSON arrays or pretty-printed 
objects). | `FALSE` |
+| NULL_IF | List of string values to interpret as NULL. | (none) |
+| EMPTY_FIELD_AS_NULL | Treat empty string values as NULL. | `FALSE` |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
+**CSV write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
 | FIELD_DELIMITER | Column delimiter character. | `,` |
 | HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
 | QUOTE | Quote character for enclosing fields. | `"` |
 | ESCAPE | Escape character within quoted fields. | `\` |
 | COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
 
+**JSON write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| DATE_FORMAT | Custom date format pattern. | Spark default |
+| TIMESTAMP_FORMAT | Custom timestamp format pattern. | Spark default |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
 ### Import Options
 
 | Option | Description | Default |
@@ -418,15 +484,16 @@ OVERWRITE = TRUE;
 
 When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM 
...`):
 
-- CSV columns are mapped **positionally** to the specified column list.
-- The number of CSV columns must match the column list length.
+- **CSV**: Columns are mapped **positionally** to the specified column list.
+- **JSON**: Columns are matched **by name** to the specified column list.
+- The number of source columns must match the column list length (CSV). For 
JSON, missing fields in the source become NULL.
 - Columns not in the list are filled with their **DEFAULT value** (if defined 
in the table schema) or **NULL**.
 - Non-nullable columns without a default value that are not in the list will 
cause an error.
 
 When no column list is provided:
 
-- CSV columns are mapped positionally to all writable columns in the target 
table.
-- The number of CSV columns must match the number of writable columns.
+- **CSV**: Columns are mapped positionally to all writable columns in the 
target table. The number of CSV columns must match the number of writable 
columns.
+- **JSON**: Columns are matched by name to the writable columns. Missing 
fields in JSON become NULL.
 
 ### Repeated Imports
 
@@ -457,7 +524,7 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files 
have been successfull
 
 ### Limitations
 
-- Only **CSV** format is supported.
+- Only **CSV** and **JSON** formats are supported.
 - Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not 
supported.
 - `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the 
entire command.
 - `SINGLE = TRUE` (single-file output) is not supported.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 12a5bc8c51..f87884634a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++ 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -205,6 +205,7 @@ nonReserved
     | MAP
     | COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR | 
ABORT_STATEMENT | OVERWRITE
     | CSV
+    | JSON
     ;
 
 ALTER: 'ALTER';
@@ -246,6 +247,7 @@ ON_ERROR: 'ON_ERROR';
 ABORT_STATEMENT: 'ABORT_STATEMENT';
 OVERWRITE: 'OVERWRITE';
 CSV: 'CSV';
+JSON: 'JSON';
 
 PLUS: '+';
 MINUS: '-';
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
index 2e2f7e2ec1..ae5d56eaea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -22,6 +22,7 @@ sealed trait FileFormatType
 
 object FileFormatType {
   case object CSV extends FileFormatType
+  case object JSON extends FileFormatType
   case class Unsupported(name: String) extends FileFormatType
 }
 
@@ -29,33 +30,60 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
 
   def toSparkReaderOptions: Map[String, String] = {
     val mapped = scala.collection.mutable.Map[String, String]("mode" -> 
"FAILFAST")
-    options.foreach {
-      case (k, v) =>
-        k match {
-          case "FIELD_DELIMITER" => mapped("sep") = v
-          case "QUOTE" => mapped("quote") = v
-          case "ESCAPE" => mapped("escape") = v
-          case "COMPRESSION" => mapped("compression") = v
-          case "SKIP_HEADER" =>
-            mapped("header") = if (v == "1" || v.equalsIgnoreCase("TRUE")) 
"true" else "false"
-          case _ =>
+    formatType match {
+      case FileFormatType.CSV =>
+        options.foreach {
+          case (k, v) =>
+            k match {
+              case "FIELD_DELIMITER" => mapped("sep") = v
+              case "QUOTE" => mapped("quote") = v
+              case "ESCAPE" => mapped("escape") = v
+              case "COMPRESSION" => mapped("compression") = v
+              case "SKIP_HEADER" =>
+                mapped("header") = if (v == "1" || v.equalsIgnoreCase("TRUE")) 
"true" else "false"
+              case _ =>
+            }
         }
+      case FileFormatType.JSON =>
+        options.foreach {
+          case (k, v) =>
+            k match {
+              case "MULTI_LINE" => mapped("multiLine") = v.toLowerCase
+              case "COMPRESSION" => mapped("compression") = v
+              case _ =>
+            }
+        }
+      case _ =>
     }
     mapped.toMap
   }
 
   def toSparkWriterOptions: Map[String, String] = {
     val mapped = scala.collection.mutable.Map[String, String]()
-    options.foreach {
-      case (k, v) =>
-        k match {
-          case "FIELD_DELIMITER" => mapped("sep") = v
-          case "HEADER" => mapped("header") = v.toLowerCase
-          case "QUOTE" => mapped("quote") = v
-          case "ESCAPE" => mapped("escape") = v
-          case "COMPRESSION" => mapped("compression") = v
-          case _ =>
+    formatType match {
+      case FileFormatType.CSV =>
+        options.foreach {
+          case (k, v) =>
+            k match {
+              case "FIELD_DELIMITER" => mapped("sep") = v
+              case "HEADER" => mapped("header") = v.toLowerCase
+              case "QUOTE" => mapped("quote") = v
+              case "ESCAPE" => mapped("escape") = v
+              case "COMPRESSION" => mapped("compression") = v
+              case _ =>
+            }
+        }
+      case FileFormatType.JSON =>
+        options.foreach {
+          case (k, v) =>
+            k match {
+              case "COMPRESSION" => mapped("compression") = v
+              case "DATE_FORMAT" => mapped("dateFormat") = v
+              case "TIMESTAMP_FORMAT" => mapped("timestampFormat") = v
+              case _ =>
+            }
         }
+      case _ =>
     }
     mapped.toMap
   }
@@ -78,25 +106,35 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
       throw new IllegalArgumentException(
         "MODE cannot be specified in FILE_FORMAT options; it is reserved for 
ON_ERROR handling")
     }
-    val invalid = 
options.keys.filterNot(CopyFileFormat.VALID_IMPORT_KEYS.contains)
+    val validKeys = formatType match {
+      case FileFormatType.JSON => CopyFileFormat.VALID_JSON_IMPORT_KEYS
+      case _ => CopyFileFormat.VALID_CSV_IMPORT_KEYS
+    }
+    val invalid = options.keys.filterNot(validKeys.contains)
     if (invalid.nonEmpty) {
       throw new IllegalArgumentException(
         s"Unsupported FILE_FORMAT options for import: ${invalid.mkString(", 
")}")
     }
-    options.get("SKIP_HEADER").foreach {
-      v =>
-        val intVal =
-          try v.toInt
-          catch { case _: NumberFormatException => -1 }
-        if (intVal != 0 && intVal != 1) {
-          throw new IllegalArgumentException(s"SKIP_HEADER supports only 0 or 
1, got: $v")
-        }
+    if (formatType == FileFormatType.CSV) {
+      options.get("SKIP_HEADER").foreach {
+        v =>
+          val intVal =
+            try v.toInt
+            catch { case _: NumberFormatException => -1 }
+          if (intVal != 0 && intVal != 1) {
+            throw new IllegalArgumentException(s"SKIP_HEADER supports only 0 
or 1, got: $v")
+          }
+      }
     }
   }
 
   def validateForExport(): Unit = {
     validateFormatType()
-    val invalid = 
options.keys.filterNot(CopyFileFormat.VALID_EXPORT_KEYS.contains)
+    val validKeys = formatType match {
+      case FileFormatType.JSON => CopyFileFormat.VALID_JSON_EXPORT_KEYS
+      case _ => CopyFileFormat.VALID_CSV_EXPORT_KEYS
+    }
+    val invalid = options.keys.filterNot(validKeys.contains)
     if (invalid.nonEmpty) {
       throw new IllegalArgumentException(
         s"Unsupported FILE_FORMAT options for export: ${invalid.mkString(", 
")}")
@@ -106,16 +144,17 @@ case class CopyFileFormat(formatType: FileFormatType, 
options: Map[String, Strin
   private def validateFormatType(): Unit = {
     formatType match {
       case FileFormatType.CSV =>
+      case FileFormatType.JSON =>
       case FileFormatType.Unsupported(name) =>
         throw new IllegalArgumentException(
-          s"Unsupported file format type: $name. Only CSV is currently 
supported")
+          s"Unsupported file format type: $name. Supported types: CSV, JSON")
     }
   }
 }
 
 object CopyFileFormat {
 
-  val VALID_IMPORT_KEYS: Set[String] = Set(
+  val VALID_CSV_IMPORT_KEYS: Set[String] = Set(
     "FIELD_DELIMITER",
     "SKIP_HEADER",
     "QUOTE",
@@ -125,7 +164,14 @@ object CopyFileFormat {
     "COMPRESSION"
   )
 
-  val VALID_EXPORT_KEYS: Set[String] = Set(
+  val VALID_JSON_IMPORT_KEYS: Set[String] = Set(
+    "MULTI_LINE",
+    "COMPRESSION",
+    "NULL_IF",
+    "EMPTY_FIELD_AS_NULL"
+  )
+
+  val VALID_CSV_EXPORT_KEYS: Set[String] = Set(
     "FIELD_DELIMITER",
     "HEADER",
     "QUOTE",
@@ -133,12 +179,19 @@ object CopyFileFormat {
     "COMPRESSION"
   )
 
+  val VALID_JSON_EXPORT_KEYS: Set[String] = Set(
+    "COMPRESSION",
+    "DATE_FORMAT",
+    "TIMESTAMP_FORMAT"
+  )
+
   // Unit Separator (U+001F) used to encode multi-value lists in a single 
string
   val LIST_SEPARATOR: String = "\u001f"
 
   def parseFormatType(typeStr: String): FileFormatType = {
     typeStr.toUpperCase match {
       case "CSV" => FileFormatType.CSV
+      case "JSON" => FileFormatType.JSON
       case other => FileFormatType.Unsupported(other)
     }
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
index f4f0720b28..69387a5071 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.execution
 
-import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat, 
FileFormatType}
 import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
 
 import org.apache.hadoop.fs.Path
@@ -51,7 +51,12 @@ case class CopyIntoLocationExec(
     val writerOptions = fileFormat.toSparkWriterOptions
     val saveMode = if (overwrite) SaveMode.Overwrite else 
SaveMode.ErrorIfExists
 
-    df.write.options(writerOptions).mode(saveMode).csv(targetPath)
+    fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        df.write.options(writerOptions).mode(saveMode).json(targetPath)
+      case _ =>
+        df.write.options(writerOptions).mode(saveMode).csv(targetPath)
+    }
 
     val hadoopConf = spark.sessionState.newHadoopConf()
     val fsPath = new Path(targetPath)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
index 260add4349..4763d8c784 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.execution
 
 import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat, 
FileFormatType}
 import org.apache.paimon.spark.copyinto.{CopyLoadHistoryManager, 
CopyLoadRecord}
 import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
 import org.apache.paimon.table.FileStoreTable
@@ -72,13 +72,12 @@ case class CopyIntoTableExec(
     }
 
     val filePaths = filesToLoad.map(_.getPath.toString)
-    val stringSchema = StructType(
-      (0 until targetColumns.size).map(i => StructField(s"_c$i", StringType, 
nullable = true)))
+    val stringSchema = buildStringSchema(targetColumns)
     val readerOptions = fileFormat.toSparkReaderOptions
 
-    val csvDf = readAndProcessCsv(filePaths, stringSchema, readerOptions)
+    val sourceDf = readSourceData(filePaths, stringSchema, readerOptions)
     val finalDf =
-      buildFinalDataFrame(csvDf, targetColumns, writableColumns, fields)
+      buildFinalDataFrame(sourceDf, targetColumns, writableColumns, fields)
     val castedDf = castAndValidate(finalDf, writableColumns, fields)
 
     val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
@@ -93,6 +92,16 @@ case class CopyIntoTableExec(
       readerOptions)
   }
 
+  private def buildStringSchema(targetColumns: Seq[String]): StructType = {
+    fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        StructType(targetColumns.map(name => StructField(name, StringType, 
nullable = true)))
+      case _ =>
+        StructType(
+          (0 until targetColumns.size).map(i => StructField(s"_c$i", 
StringType, nullable = true)))
+    }
+  }
+
   private def resolveTargetColumns(writableColumns: Seq[String]): Seq[String] 
= {
     columns match {
       case Some(cols) =>
@@ -165,14 +174,16 @@ case class CopyIntoTableExec(
     }
   }
 
-  private def readAndProcessCsv(
+  private def readSourceData(
       filePaths: Array[String],
       stringSchema: StructType,
       readerOptions: Map[String, String]): DataFrame = {
-    var df = spark.read
-      .options(readerOptions)
-      .schema(stringSchema)
-      .csv(filePaths: _*)
+    var df = fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        spark.read.options(readerOptions).schema(stringSchema).json(filePaths: 
_*)
+      case _ =>
+        spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: 
_*)
+    }
 
     val nullIfVals = fileFormat.nullIfValues
     if (nullIfVals.nonEmpty) {
@@ -199,12 +210,17 @@ case class CopyIntoTableExec(
   }
 
   private def buildFinalDataFrame(
-      csvDf: DataFrame,
+      sourceDf: DataFrame,
       targetColumns: Seq[String],
       writableColumns: Seq[String],
       fields: Seq[DataField]): DataFrame = {
-    val renamedDf = targetColumns.zipWithIndex.foldLeft(csvDf) {
-      case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", targetCol)
+    val renamedDf = fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        sourceDf
+      case _ =>
+        targetColumns.zipWithIndex.foldLeft(sourceDf) {
+          case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", 
targetCol)
+        }
     }
 
     if (columns.isDefined) {
@@ -290,10 +306,14 @@ case class CopyIntoTableExec(
     val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
     val loadedAt = System.currentTimeMillis()
 
-    val rowCounts = spark.read
-      .options(readerOptions)
-      .schema(stringSchema)
-      .csv(filePaths: _*)
+    val countDf = fileFormat.formatType match {
+      case FileFormatType.JSON =>
+        spark.read.options(readerOptions).schema(stringSchema).json(filePaths: 
_*)
+      case _ =>
+        spark.read.options(readerOptions).schema(stringSchema).csv(filePaths: 
_*)
+    }
+
+    val rowCounts = countDf
       .groupBy(input_file_name().as("file"))
       .count()
       .collect()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index f3fb75a8c6..aa764bd30e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -237,7 +237,9 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
                        |FILE_FORMAT = (TYPE = PARQUET)
                        |""".stripMargin)
         }
-        assert(e.getMessage.contains("Unsupported file format type"))
+        assert(
+          e.getMessage.contains("Unsupported file format type") ||
+            e.getMessage.contains("Supported types"))
     }
 
     spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup")
@@ -646,6 +648,423 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
     spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
   }
 
+  private def createJsonFile(dir: File, name: String, content: String): File = 
{
+    val file = new File(dir, name)
+    val writer = new PrintWriter(file)
+    try writer.write(content)
+    finally writer.close()
+    file
+  }
+
+  private def withJsonDir(testBody: File => Unit): Unit = {
+    val dir = Files.createTempDirectory("copy_into_json_test").toFile
+    try testBody(dir)
+    finally deleteRecursively(dir)
+  }
+
+  test("COPY INTO: basic JSON import") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_basic")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_basic (id INT, name STRING, 
age INT)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id":"1","name":"Alice","age":"30"}
+            |{"id":"2","name":"Bob","age":"25"}
+            |""".stripMargin)
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_json_basic
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = JSON)
+                                  |""".stripMargin)
+
+        assert(result.collect().length > 0)
+        assert(result.collect()(0).getString(1) == "LOADED")
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_basic ORDER BY id"),
+          Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_basic")
+  }
+
+  test("COPY INTO: JSON column name matching ignores order") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_order")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_order (id INT, name STRING, 
age INT)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"age":"30","name":"Alice","id":"1"}
+            |{"name":"Bob","id":"2","age":"25"}
+            |""".stripMargin)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_order
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = JSON)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_order ORDER BY id"),
+          Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_order")
+  }
+
+  test("COPY INTO: JSON with MULTI_LINE") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_ml")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_ml (id INT, name STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """[
+            |  {"id":"1","name":"Alice"},
+            |  {"id":"2","name":"Bob"}
+            |]""".stripMargin)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_ml
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_ml ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_ml")
+  }
+
+  test("COPY INTO: JSON with explicit column list") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_cols")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_cols (id INT, name STRING, age 
INT)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id":"1","name":"Alice"}
+            |{"id":"2","name":"Bob"}
+            |""".stripMargin)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_cols (id, name)
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = JSON)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_cols ORDER BY id"),
+          Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_cols")
+  }
+
+  test("COPY INTO: JSON NULL_IF") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_null")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_null (id INT, name STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id":"1","name":"NULL"}
+            |{"id":"2","name":"\\N"}
+            |{"id":"3","name":"Alice"}
+            |""".stripMargin
+        )
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_null
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = JSON, NULL_IF = ('NULL', '\\N'))
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_null ORDER BY id"),
+          Seq(Row(1, null), Row(2, null), Row(3, "Alice")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_null")
+  }
+
+  test("COPY INTO: JSON export") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_export")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_export (id INT, name STRING)")
+    spark.sql(s"INSERT INTO $dbName0.copy_json_export VALUES (1, 'Alice'), (2, 
'Bob')")
+
+    withJsonDir {
+      dir =>
+        val outputPath = new File(dir, "output").getAbsolutePath
+
+        val result = spark.sql(s"""COPY INTO '$outputPath'
+                                  |FROM $dbName0.copy_json_export
+                                  |FILE_FORMAT = (TYPE = JSON)
+                                  |""".stripMargin)
+
+        val row = result.collect()(0)
+        assert(row.getString(0) == outputPath)
+        assert(row.getLong(2) == 2L)
+
+        val exported = spark.read.json(outputPath)
+        assert(exported.count() == 2)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_export")
+  }
+
+  test("COPY INTO: JSON rejects CSV-only options") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_bad_opt")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_bad_opt (id INT)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(dir, "data.json", """{"id":"1"}""")
+
+        val e = intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_json_bad_opt
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = JSON, FIELD_DELIMITER = ',')
+                       |""".stripMargin)
+        }
+        assert(e.getMessage.contains("Unsupported FILE_FORMAT options"))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_bad_opt")
+  }
+
+  test("COPY INTO: JSON with date and timestamp columns") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_date")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_date (id INT, dt DATE, ts 
TIMESTAMP)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id":"1","dt":"2024-01-15","ts":"2024-01-15T10:30:00"}
+            |{"id":"2","dt":"2024-06-20","ts":"2024-06-20T14:45:30"}
+            |""".stripMargin
+        )
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_date
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = JSON)
+                     |""".stripMargin)
+
+        val rows = spark.sql(s"SELECT * FROM $dbName0.copy_json_date ORDER BY 
id").collect()
+        assert(rows.length == 2)
+        assert(rows(0).getInt(0) == 1)
+        assert(rows(0).getDate(1).toString == "2024-01-15")
+        assert(rows(1).getInt(0) == 2)
+        assert(rows(1).getDate(1).toString == "2024-06-20")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_date")
+  }
+
+  test("COPY INTO: JSON rows_loaded count is accurate") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_count")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_count (id INT, name STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id":"1","name":"Alice"}
+            |{"id":"2","name":"Bob"}
+            |{"id":"3","name":"Charlie"}
+            |""".stripMargin
+        )
+
+        val result = spark.sql(s"""COPY INTO $dbName0.copy_json_count
+                                  |FROM '${dir.getAbsolutePath}'
+                                  |FILE_FORMAT = (TYPE = JSON)
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getString(1) == "LOADED")
+        assert(rows(0).getLong(2) == 3L)
+        assert(rows(0).getLong(3) == 3L)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_count")
+  }
+
+  test("COPY INTO: JSON export then import round-trip") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_src")
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_dst")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_rt_src (id INT, name STRING, 
score DOUBLE)")
+    spark.sql(s"INSERT INTO $dbName0.copy_json_rt_src VALUES (1, 'Alice', 
95.5), (2, 'Bob', 87.3)")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_rt_dst (id INT, name STRING, 
score DOUBLE)")
+
+    withJsonDir {
+      dir =>
+        val exportPath = new File(dir, "exported").getAbsolutePath
+
+        spark.sql(s"""COPY INTO '$exportPath'
+                     |FROM $dbName0.copy_json_rt_src
+                     |FILE_FORMAT = (TYPE = JSON)
+                     |""".stripMargin)
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_rt_dst
+                     |FROM '$exportPath'
+                     |FILE_FORMAT = (TYPE = JSON)
+                     |PATTERN = '.*\\.json'
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_rt_dst ORDER BY id"),
+          Seq(Row(1, "Alice", 95.5), Row(2, "Bob", 87.3)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_src")
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_dst")
+  }
+
+  test("COPY INTO: JSON extra fields are ignored") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_extra")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_extra (id INT, name STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          
"""{"id":"1","name":"Alice","extra_field":"ignored","another":"also_ignored"}
+            |{"id":"2","name":"Bob","extra_field":"ignored2"}
+            |""".stripMargin
+        )
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_extra
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = JSON)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_extra ORDER BY id"),
+          Seq(Row(1, "Alice"), Row(2, "Bob")))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_extra")
+  }
+
+  test("COPY INTO: JSON missing fields become null") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_missing")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_missing (id INT, name STRING, 
age INT)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id":"1","name":"Alice"}
+            |{"id":"2"}
+            |""".stripMargin
+        )
+
+        spark.sql(s"""COPY INTO $dbName0.copy_json_missing
+                     |FROM '${dir.getAbsolutePath}'
+                     |FILE_FORMAT = (TYPE = JSON)
+                     |""".stripMargin)
+
+        checkAnswer(
+          spark.sql(s"SELECT * FROM $dbName0.copy_json_missing ORDER BY id"),
+          Seq(Row(1, "Alice", null), Row(2, null, null)))
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_missing")
+  }
+
+  test("COPY INTO: JSON malformed data fails with ABORT_STATEMENT") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_malformed")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_malformed (id INT, name 
STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(
+          dir,
+          "data.json",
+          """{"id":"1","name":"Alice"}
+            |{this is not valid json}
+            |""".stripMargin
+        )
+
+        intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_json_malformed
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = JSON)
+                       |""".stripMargin)
+        }
+        assert(spark.sql(s"SELECT * FROM 
$dbName0.copy_json_malformed").count() == 0)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_malformed")
+  }
+
+  test("COPY INTO: JSON bad cast fails with ABORT_STATEMENT") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_badcast")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_badcast (id INT, name STRING)")
+
+    withJsonDir {
+      dir =>
+        createJsonFile(dir, "data.json", 
"""{"id":"not_a_number","name":"Alice"}""")
+
+        val e = intercept[Exception] {
+          spark.sql(s"""COPY INTO $dbName0.copy_json_badcast
+                       |FROM '${dir.getAbsolutePath}'
+                       |FILE_FORMAT = (TYPE = JSON)
+                       |""".stripMargin)
+        }
+        val msg = e.getMessage
+        assert(
+          msg.contains("Cast failure") ||
+            msg.contains("ABORT_STATEMENT") ||
+            msg.contains("CAST_INVALID_INPUT") ||
+            msg.contains("cannot be cast to") ||
+            e.getCause != null)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_badcast")
+  }
+
+  test("COPY INTO: JSON export with COMPRESSION") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_compress")
+    spark.sql(s"CREATE TABLE $dbName0.copy_json_compress (id INT, name 
STRING)")
+    spark.sql(s"INSERT INTO $dbName0.copy_json_compress VALUES (1, 'Alice'), 
(2, 'Bob')")
+
+    withJsonDir {
+      dir =>
+        val outputPath = new File(dir, "compressed").getAbsolutePath
+
+        val result = spark.sql(s"""COPY INTO '$outputPath'
+                                  |FROM $dbName0.copy_json_compress
+                                  |FILE_FORMAT = (TYPE = JSON, COMPRESSION = 
GZIP)
+                                  |OVERWRITE = TRUE
+                                  |""".stripMargin)
+
+        assert(result.collect()(0).getLong(2) == 2L)
+
+        val outputDir = new File(outputPath)
+        val gzFiles = outputDir.listFiles().filter(_.getName.endsWith(".gz"))
+        assert(gzFiles.nonEmpty)
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_compress")
+  }
+
   test("COPY INTO: case-insensitive column matching") {
     spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
     spark.sql(s"CREATE TABLE $dbName0.copy_case (id INT, name STRING, age 
INT)")


Reply via email to