This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a9ce9b8b64 [spark] Support JSON format in COPY INTO (#7993)
a9ce9b8b64 is described below
commit a9ce9b8b64beb280baf981825a2a463b226647a9
Author: Junrui Lee <[email protected]>
AuthorDate: Fri May 29 13:23:43 2026 +0800
[spark] Support JSON format in COPY INTO (#7993)
- Add JSON format support for `COPY INTO` import and export, alongside
existing CSV support
- JSON uses column-name matching (not positional), with options for
`MULTI_LINE`, `NULL_IF`, `EMPTY_FIELD_AS_NULL`, and `COMPRESSION`
- CSV-only options (e.g. `FIELD_DELIMITER`, `SKIP_HEADER`) are rejected
for JSON format with clear error messages
---
docs/docs/spark/sql-write.md | 89 ++++-
.../PaimonSqlExtensions.g4 | 2 +
.../spark/catalyst/plans/logical/CopyOptions.scala | 117 ++++--
.../spark/execution/CopyIntoLocationExec.scala | 9 +-
.../paimon/spark/execution/CopyIntoTableExec.scala | 54 ++-
.../apache/paimon/spark/sql/CopyIntoTestBase.scala | 421 ++++++++++++++++++++-
6 files changed, 629 insertions(+), 63 deletions(-)
diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 518bf0e5b4..449d8871ed 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -314,7 +314,7 @@ INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
## COPY INTO
-`COPY INTO` provides a SQL command for bulk loading CSV files into Paimon
tables and writing table data to CSV files.
+`COPY INTO` provides a SQL command for bulk loading data files into Paimon
tables and exporting table data to files. Supported formats: **CSV** and
**JSON**.
### CSV Import
@@ -354,6 +354,35 @@ PATTERN = '.*\.csv'
FORCE = FALSE;
```
+### JSON Import
+
+```sql
+COPY INTO table_name [(col1, col2, ...)]
+FROM 'source_path'
+FILE_FORMAT = (TYPE = JSON [, option = value, ...])
+[PATTERN = 'regex']
+[FORCE = TRUE|FALSE]
+[ON_ERROR = ABORT_STATEMENT]
+```
+
+**Basic import:**
+
+```sql
+COPY INTO my_db.my_table
+FROM '/data/json_files/'
+FILE_FORMAT = (TYPE = JSON);
+```
+
+**Import multi-line JSON array:**
+
+```sql
+COPY INTO my_db.events
+FROM '/data/events/'
+FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE);
+```
+
+JSON columns are matched **by column name** (not by position), so source field
order does not matter.
+
### Write CSV Files
```sql
@@ -372,15 +401,33 @@ FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER
= ',')
OVERWRITE = TRUE;
```
+### Write JSON Files
+
+```sql
+COPY INTO 'target_path'
+FROM table_name
+FILE_FORMAT = (TYPE = JSON [, option = value, ...])
+[OVERWRITE = TRUE|FALSE]
+```
+
+**Basic JSON export:**
+
+```sql
+COPY INTO '/export/events_backup/'
+FROM my_db.events
+FILE_FORMAT = (TYPE = JSON)
+OVERWRITE = TRUE;
+```
+
### FILE_FORMAT Options
-`FILE_FORMAT` is required and must include `TYPE = CSV`.
+`FILE_FORMAT` is required and must include `TYPE = CSV` or `TYPE = JSON`.
-**Import options:**
+**CSV import options:**
| Option | Description | Default |
|--------|-------------|---------|
-| TYPE | File format type. Must be `CSV`. | (required) |
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| SKIP_HEADER | Skip the first line as header. Only `0` or `1`. | `0` |
| QUOTE | Quote character for enclosing fields. | `"` |
@@ -389,17 +436,36 @@ OVERWRITE = TRUE;
| EMPTY_FIELD_AS_NULL | Treat empty fields as NULL. `TRUE` or `FALSE`. |
`FALSE` |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
-**Write options:**
+**JSON import options:**
| Option | Description | Default |
|--------|-------------|---------|
-| TYPE | File format type. Must be `CSV`. | (required) |
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| MULTI_LINE | Parse multi-line JSON (e.g. JSON arrays or pretty-printed
objects). | `FALSE` |
+| NULL_IF | List of string values to interpret as NULL. | (none) |
+| EMPTY_FIELD_AS_NULL | Treat empty string values as NULL. | `FALSE` |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
+**CSV write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
| FIELD_DELIMITER | Column delimiter character. | `,` |
| HEADER | Write column names as the first line. `TRUE` or `FALSE`. | `FALSE` |
| QUOTE | Quote character for enclosing fields. | `"` |
| ESCAPE | Escape character within quoted fields. | `\` |
| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+**JSON write options:**
+
+| Option | Description | Default |
+|--------|-------------|---------|
+| TYPE | File format type. `CSV` or `JSON`. | (required) |
+| DATE_FORMAT | Custom date format pattern. | Spark default |
+| TIMESTAMP_FORMAT | Custom timestamp format pattern. | Spark default |
+| COMPRESSION | Compression codec (e.g. `GZIP`). | `NONE` |
+
### Import Options
| Option | Description | Default |
@@ -418,15 +484,16 @@ OVERWRITE = TRUE;
When an explicit column list is provided (e.g., `COPY INTO t (col1, col2) FROM
...`):
-- CSV columns are mapped **positionally** to the specified column list.
-- The number of CSV columns must match the column list length.
+- **CSV**: Columns are mapped **positionally** to the specified column list.
+- **JSON**: Columns are matched **by name** to the specified column list.
+- The number of source columns must match the column list length (CSV). For
JSON, missing fields in the source become NULL.
- Columns not in the list are filled with their **DEFAULT value** (if defined
in the table schema) or **NULL**.
- Non-nullable columns without a default value that are not in the list will
cause an error.
When no column list is provided:
-- CSV columns are mapped positionally to all writable columns in the target
table.
-- The number of CSV columns must match the number of writable columns.
+- **CSV**: Columns are mapped positionally to all writable columns in the
target table. The number of CSV columns must match the number of writable
columns.
+- **JSON**: Columns are matched by name to the writable columns. Missing
fields in JSON become NULL.
### Repeated Imports
@@ -457,7 +524,7 @@ By default (`FORCE = FALSE`), COPY INTO tracks which files
have been successfull
### Limitations
-- Only **CSV** format is supported.
+- Only **CSV** and **JSON** formats are supported.
- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not
supported.
- `ON_ERROR = CONTINUE` is not supported; any parse or cast error aborts the
entire command.
- `SINGLE = TRUE` (single-file output) is not supported.
diff --git
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 12a5bc8c51..f87884634a 100644
---
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -205,6 +205,7 @@ nonReserved
| MAP
| COPY | INTO | FROM | FILE_FORMAT | PATTERN | FORCE | ON_ERROR |
ABORT_STATEMENT | OVERWRITE
| CSV
+ | JSON
;
ALTER: 'ALTER';
@@ -246,6 +247,7 @@ ON_ERROR: 'ON_ERROR';
ABORT_STATEMENT: 'ABORT_STATEMENT';
OVERWRITE: 'OVERWRITE';
CSV: 'CSV';
+JSON: 'JSON';
PLUS: '+';
MINUS: '-';
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
index 2e2f7e2ec1..ae5d56eaea 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyOptions.scala
@@ -22,6 +22,7 @@ sealed trait FileFormatType
object FileFormatType {
case object CSV extends FileFormatType
+ case object JSON extends FileFormatType
case class Unsupported(name: String) extends FileFormatType
}
@@ -29,33 +30,60 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
def toSparkReaderOptions: Map[String, String] = {
val mapped = scala.collection.mutable.Map[String, String]("mode" ->
"FAILFAST")
- options.foreach {
- case (k, v) =>
- k match {
- case "FIELD_DELIMITER" => mapped("sep") = v
- case "QUOTE" => mapped("quote") = v
- case "ESCAPE" => mapped("escape") = v
- case "COMPRESSION" => mapped("compression") = v
- case "SKIP_HEADER" =>
- mapped("header") = if (v == "1" || v.equalsIgnoreCase("TRUE"))
"true" else "false"
- case _ =>
+ formatType match {
+ case FileFormatType.CSV =>
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "FIELD_DELIMITER" => mapped("sep") = v
+ case "QUOTE" => mapped("quote") = v
+ case "ESCAPE" => mapped("escape") = v
+ case "COMPRESSION" => mapped("compression") = v
+ case "SKIP_HEADER" =>
+ mapped("header") = if (v == "1" || v.equalsIgnoreCase("TRUE"))
"true" else "false"
+ case _ =>
+ }
}
+ case FileFormatType.JSON =>
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "MULTI_LINE" => mapped("multiLine") = v.toLowerCase
+ case "COMPRESSION" => mapped("compression") = v
+ case _ =>
+ }
+ }
+ case _ =>
}
mapped.toMap
}
def toSparkWriterOptions: Map[String, String] = {
val mapped = scala.collection.mutable.Map[String, String]()
- options.foreach {
- case (k, v) =>
- k match {
- case "FIELD_DELIMITER" => mapped("sep") = v
- case "HEADER" => mapped("header") = v.toLowerCase
- case "QUOTE" => mapped("quote") = v
- case "ESCAPE" => mapped("escape") = v
- case "COMPRESSION" => mapped("compression") = v
- case _ =>
+ formatType match {
+ case FileFormatType.CSV =>
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "FIELD_DELIMITER" => mapped("sep") = v
+ case "HEADER" => mapped("header") = v.toLowerCase
+ case "QUOTE" => mapped("quote") = v
+ case "ESCAPE" => mapped("escape") = v
+ case "COMPRESSION" => mapped("compression") = v
+ case _ =>
+ }
+ }
+ case FileFormatType.JSON =>
+ options.foreach {
+ case (k, v) =>
+ k match {
+ case "COMPRESSION" => mapped("compression") = v
+ case "DATE_FORMAT" => mapped("dateFormat") = v
+ case "TIMESTAMP_FORMAT" => mapped("timestampFormat") = v
+ case _ =>
+ }
}
+ case _ =>
}
mapped.toMap
}
@@ -78,25 +106,35 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
throw new IllegalArgumentException(
"MODE cannot be specified in FILE_FORMAT options; it is reserved for
ON_ERROR handling")
}
- val invalid =
options.keys.filterNot(CopyFileFormat.VALID_IMPORT_KEYS.contains)
+ val validKeys = formatType match {
+ case FileFormatType.JSON => CopyFileFormat.VALID_JSON_IMPORT_KEYS
+ case _ => CopyFileFormat.VALID_CSV_IMPORT_KEYS
+ }
+ val invalid = options.keys.filterNot(validKeys.contains)
if (invalid.nonEmpty) {
throw new IllegalArgumentException(
s"Unsupported FILE_FORMAT options for import: ${invalid.mkString(",
")}")
}
- options.get("SKIP_HEADER").foreach {
- v =>
- val intVal =
- try v.toInt
- catch { case _: NumberFormatException => -1 }
- if (intVal != 0 && intVal != 1) {
- throw new IllegalArgumentException(s"SKIP_HEADER supports only 0 or
1, got: $v")
- }
+ if (formatType == FileFormatType.CSV) {
+ options.get("SKIP_HEADER").foreach {
+ v =>
+ val intVal =
+ try v.toInt
+ catch { case _: NumberFormatException => -1 }
+ if (intVal != 0 && intVal != 1) {
+ throw new IllegalArgumentException(s"SKIP_HEADER supports only 0
or 1, got: $v")
+ }
+ }
}
}
def validateForExport(): Unit = {
validateFormatType()
- val invalid =
options.keys.filterNot(CopyFileFormat.VALID_EXPORT_KEYS.contains)
+ val validKeys = formatType match {
+ case FileFormatType.JSON => CopyFileFormat.VALID_JSON_EXPORT_KEYS
+ case _ => CopyFileFormat.VALID_CSV_EXPORT_KEYS
+ }
+ val invalid = options.keys.filterNot(validKeys.contains)
if (invalid.nonEmpty) {
throw new IllegalArgumentException(
s"Unsupported FILE_FORMAT options for export: ${invalid.mkString(",
")}")
@@ -106,16 +144,17 @@ case class CopyFileFormat(formatType: FileFormatType,
options: Map[String, Strin
private def validateFormatType(): Unit = {
formatType match {
case FileFormatType.CSV =>
+ case FileFormatType.JSON =>
case FileFormatType.Unsupported(name) =>
throw new IllegalArgumentException(
- s"Unsupported file format type: $name. Only CSV is currently
supported")
+ s"Unsupported file format type: $name. Supported types: CSV, JSON")
}
}
}
object CopyFileFormat {
- val VALID_IMPORT_KEYS: Set[String] = Set(
+ val VALID_CSV_IMPORT_KEYS: Set[String] = Set(
"FIELD_DELIMITER",
"SKIP_HEADER",
"QUOTE",
@@ -125,7 +164,14 @@ object CopyFileFormat {
"COMPRESSION"
)
- val VALID_EXPORT_KEYS: Set[String] = Set(
+ val VALID_JSON_IMPORT_KEYS: Set[String] = Set(
+ "MULTI_LINE",
+ "COMPRESSION",
+ "NULL_IF",
+ "EMPTY_FIELD_AS_NULL"
+ )
+
+ val VALID_CSV_EXPORT_KEYS: Set[String] = Set(
"FIELD_DELIMITER",
"HEADER",
"QUOTE",
@@ -133,12 +179,19 @@ object CopyFileFormat {
"COMPRESSION"
)
+ val VALID_JSON_EXPORT_KEYS: Set[String] = Set(
+ "COMPRESSION",
+ "DATE_FORMAT",
+ "TIMESTAMP_FORMAT"
+ )
+
// Unit Separator (U+001F) used to encode multi-value lists in a single
string
val LIST_SEPARATOR: String = "\u001f"
def parseFormatType(typeStr: String): FileFormatType = {
typeStr.toUpperCase match {
case "CSV" => FileFormatType.CSV
+ case "JSON" => FileFormatType.JSON
case other => FileFormatType.Unsupported(other)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
index f4f0720b28..69387a5071 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.execution
-import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat,
FileFormatType}
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.hadoop.fs.Path
@@ -51,7 +51,12 @@ case class CopyIntoLocationExec(
val writerOptions = fileFormat.toSparkWriterOptions
val saveMode = if (overwrite) SaveMode.Overwrite else
SaveMode.ErrorIfExists
- df.write.options(writerOptions).mode(saveMode).csv(targetPath)
+ fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ df.write.options(writerOptions).mode(saveMode).json(targetPath)
+ case _ =>
+ df.write.options(writerOptions).mode(saveMode).csv(targetPath)
+ }
val hadoopConf = spark.sessionState.newHadoopConf()
val fsPath = new Path(targetPath)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
index 260add4349..4763d8c784 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoTableExec.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.execution
import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.catalyst.plans.logical.CopyFileFormat
+import org.apache.paimon.spark.catalyst.plans.logical.{CopyFileFormat,
FileFormatType}
import org.apache.paimon.spark.copyinto.{CopyLoadHistoryManager,
CopyLoadRecord}
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.table.FileStoreTable
@@ -72,13 +72,12 @@ case class CopyIntoTableExec(
}
val filePaths = filesToLoad.map(_.getPath.toString)
- val stringSchema = StructType(
- (0 until targetColumns.size).map(i => StructField(s"_c$i", StringType,
nullable = true)))
+ val stringSchema = buildStringSchema(targetColumns)
val readerOptions = fileFormat.toSparkReaderOptions
- val csvDf = readAndProcessCsv(filePaths, stringSchema, readerOptions)
+ val sourceDf = readSourceData(filePaths, stringSchema, readerOptions)
val finalDf =
- buildFinalDataFrame(csvDf, targetColumns, writableColumns, fields)
+ buildFinalDataFrame(sourceDf, targetColumns, writableColumns, fields)
val castedDf = castAndValidate(finalDf, writableColumns, fields)
val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
@@ -93,6 +92,16 @@ case class CopyIntoTableExec(
readerOptions)
}
+ private def buildStringSchema(targetColumns: Seq[String]): StructType = {
+ fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ StructType(targetColumns.map(name => StructField(name, StringType,
nullable = true)))
+ case _ =>
+ StructType(
+ (0 until targetColumns.size).map(i => StructField(s"_c$i",
StringType, nullable = true)))
+ }
+ }
+
private def resolveTargetColumns(writableColumns: Seq[String]): Seq[String]
= {
columns match {
case Some(cols) =>
@@ -165,14 +174,16 @@ case class CopyIntoTableExec(
}
}
- private def readAndProcessCsv(
+ private def readSourceData(
filePaths: Array[String],
stringSchema: StructType,
readerOptions: Map[String, String]): DataFrame = {
- var df = spark.read
- .options(readerOptions)
- .schema(stringSchema)
- .csv(filePaths: _*)
+ var df = fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ spark.read.options(readerOptions).schema(stringSchema).json(filePaths:
_*)
+ case _ =>
+ spark.read.options(readerOptions).schema(stringSchema).csv(filePaths:
_*)
+ }
val nullIfVals = fileFormat.nullIfValues
if (nullIfVals.nonEmpty) {
@@ -199,12 +210,17 @@ case class CopyIntoTableExec(
}
private def buildFinalDataFrame(
- csvDf: DataFrame,
+ sourceDf: DataFrame,
targetColumns: Seq[String],
writableColumns: Seq[String],
fields: Seq[DataField]): DataFrame = {
- val renamedDf = targetColumns.zipWithIndex.foldLeft(csvDf) {
- case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx", targetCol)
+ val renamedDf = fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ sourceDf
+ case _ =>
+ targetColumns.zipWithIndex.foldLeft(sourceDf) {
+ case (df, (targetCol, idx)) => df.withColumnRenamed(s"_c$idx",
targetCol)
+ }
}
if (columns.isDefined) {
@@ -290,10 +306,14 @@ case class CopyIntoTableExec(
val snapshotId = paimonTable.snapshotManager().latestSnapshotId()
val loadedAt = System.currentTimeMillis()
- val rowCounts = spark.read
- .options(readerOptions)
- .schema(stringSchema)
- .csv(filePaths: _*)
+ val countDf = fileFormat.formatType match {
+ case FileFormatType.JSON =>
+ spark.read.options(readerOptions).schema(stringSchema).json(filePaths:
_*)
+ case _ =>
+ spark.read.options(readerOptions).schema(stringSchema).csv(filePaths:
_*)
+ }
+
+ val rowCounts = countDf
.groupBy(input_file_name().as("file"))
.count()
.collect()
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index f3fb75a8c6..aa764bd30e 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -237,7 +237,9 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
|FILE_FORMAT = (TYPE = PARQUET)
|""".stripMargin)
}
- assert(e.getMessage.contains("Unsupported file format type"))
+ assert(
+ e.getMessage.contains("Unsupported file format type") ||
+ e.getMessage.contains("Supported types"))
}
spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_unsup")
@@ -646,6 +648,423 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_dup_col")
}
+ private def createJsonFile(dir: File, name: String, content: String): File =
{
+ val file = new File(dir, name)
+ val writer = new PrintWriter(file)
+ try writer.write(content)
+ finally writer.close()
+ file
+ }
+
+ private def withJsonDir(testBody: File => Unit): Unit = {
+ val dir = Files.createTempDirectory("copy_into_json_test").toFile
+ try testBody(dir)
+ finally deleteRecursively(dir)
+ }
+
+ test("COPY INTO: basic JSON import") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_basic")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_basic (id INT, name STRING,
age INT)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id":"1","name":"Alice","age":"30"}
+ |{"id":"2","name":"Bob","age":"25"}
+ |""".stripMargin)
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_json_basic
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ assert(result.collect().length > 0)
+ assert(result.collect()(0).getString(1) == "LOADED")
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_basic ORDER BY id"),
+ Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_basic")
+ }
+
+ test("COPY INTO: JSON column name matching ignores order") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_order")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_order (id INT, name STRING,
age INT)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"age":"30","name":"Alice","id":"1"}
+ |{"name":"Bob","id":"2","age":"25"}
+ |""".stripMargin)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_order
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_order ORDER BY id"),
+ Seq(Row(1, "Alice", 30), Row(2, "Bob", 25)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_order")
+ }
+
+ test("COPY INTO: JSON with MULTI_LINE") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_ml")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_ml (id INT, name STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """[
+ | {"id":"1","name":"Alice"},
+ | {"id":"2","name":"Bob"}
+ |]""".stripMargin)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_ml
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON, MULTI_LINE = TRUE)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_ml ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_ml")
+ }
+
+ test("COPY INTO: JSON with explicit column list") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_cols")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_cols (id INT, name STRING, age
INT)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id":"1","name":"Alice"}
+ |{"id":"2","name":"Bob"}
+ |""".stripMargin)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_cols (id, name)
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_cols ORDER BY id"),
+ Seq(Row(1, "Alice", null), Row(2, "Bob", null)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_cols")
+ }
+
+ test("COPY INTO: JSON NULL_IF") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_null")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_null (id INT, name STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id":"1","name":"NULL"}
+ |{"id":"2","name":"\\N"}
+ |{"id":"3","name":"Alice"}
+ |""".stripMargin
+ )
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_null
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON, NULL_IF = ('NULL', '\\N'))
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_null ORDER BY id"),
+ Seq(Row(1, null), Row(2, null), Row(3, "Alice")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_null")
+ }
+
+ test("COPY INTO: JSON export") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_export")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_export (id INT, name STRING)")
+ spark.sql(s"INSERT INTO $dbName0.copy_json_export VALUES (1, 'Alice'), (2,
'Bob')")
+
+ withJsonDir {
+ dir =>
+ val outputPath = new File(dir, "output").getAbsolutePath
+
+ val result = spark.sql(s"""COPY INTO '$outputPath'
+ |FROM $dbName0.copy_json_export
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ val row = result.collect()(0)
+ assert(row.getString(0) == outputPath)
+ assert(row.getLong(2) == 2L)
+
+ val exported = spark.read.json(outputPath)
+ assert(exported.count() == 2)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_export")
+ }
+
+ test("COPY INTO: JSON rejects CSV-only options") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_bad_opt")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_bad_opt (id INT)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(dir, "data.json", """{"id":"1"}""")
+
+ val e = intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_json_bad_opt
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON, FIELD_DELIMITER = ',')
+ |""".stripMargin)
+ }
+ assert(e.getMessage.contains("Unsupported FILE_FORMAT options"))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_bad_opt")
+ }
+
+ test("COPY INTO: JSON with date and timestamp columns") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_date")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_date (id INT, dt DATE, ts
TIMESTAMP)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id":"1","dt":"2024-01-15","ts":"2024-01-15T10:30:00"}
+ |{"id":"2","dt":"2024-06-20","ts":"2024-06-20T14:45:30"}
+ |""".stripMargin
+ )
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_date
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ val rows = spark.sql(s"SELECT * FROM $dbName0.copy_json_date ORDER BY
id").collect()
+ assert(rows.length == 2)
+ assert(rows(0).getInt(0) == 1)
+ assert(rows(0).getDate(1).toString == "2024-01-15")
+ assert(rows(1).getInt(0) == 2)
+ assert(rows(1).getDate(1).toString == "2024-06-20")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_date")
+ }
+
+ test("COPY INTO: JSON rows_loaded count is accurate") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_count")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_count (id INT, name STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id":"1","name":"Alice"}
+ |{"id":"2","name":"Bob"}
+ |{"id":"3","name":"Charlie"}
+ |""".stripMargin
+ )
+
+ val result = spark.sql(s"""COPY INTO $dbName0.copy_json_count
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getString(1) == "LOADED")
+ assert(rows(0).getLong(2) == 3L)
+ assert(rows(0).getLong(3) == 3L)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_count")
+ }
+
+ test("COPY INTO: JSON export then import round-trip") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_src")
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_dst")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_rt_src (id INT, name STRING,
score DOUBLE)")
+ spark.sql(s"INSERT INTO $dbName0.copy_json_rt_src VALUES (1, 'Alice',
95.5), (2, 'Bob', 87.3)")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_rt_dst (id INT, name STRING,
score DOUBLE)")
+
+ withJsonDir {
+ dir =>
+ val exportPath = new File(dir, "exported").getAbsolutePath
+
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM $dbName0.copy_json_rt_src
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_rt_dst
+ |FROM '$exportPath'
+ |FILE_FORMAT = (TYPE = JSON)
+ |PATTERN = '.*\\.json'
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_rt_dst ORDER BY id"),
+ Seq(Row(1, "Alice", 95.5), Row(2, "Bob", 87.3)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_src")
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_rt_dst")
+ }
+
+ test("COPY INTO: JSON extra fields are ignored") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_extra")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_extra (id INT, name STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+
"""{"id":"1","name":"Alice","extra_field":"ignored","another":"also_ignored"}
+ |{"id":"2","name":"Bob","extra_field":"ignored2"}
+ |""".stripMargin
+ )
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_extra
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_extra ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob")))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_extra")
+ }
+
+ test("COPY INTO: JSON missing fields become null") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_missing")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_missing (id INT, name STRING,
age INT)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id":"1","name":"Alice"}
+ |{"id":"2"}
+ |""".stripMargin
+ )
+
+ spark.sql(s"""COPY INTO $dbName0.copy_json_missing
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $dbName0.copy_json_missing ORDER BY id"),
+ Seq(Row(1, "Alice", null), Row(2, null, null)))
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_missing")
+ }
+
+ test("COPY INTO: JSON malformed data fails with ABORT_STATEMENT") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_malformed")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_malformed (id INT, name
STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(
+ dir,
+ "data.json",
+ """{"id":"1","name":"Alice"}
+ |{this is not valid json}
+ |""".stripMargin
+ )
+
+ intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_json_malformed
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+ }
+ assert(spark.sql(s"SELECT * FROM
$dbName0.copy_json_malformed").count() == 0)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_malformed")
+ }
+
+ test("COPY INTO: JSON bad cast fails with ABORT_STATEMENT") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_badcast")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_badcast (id INT, name STRING)")
+
+ withJsonDir {
+ dir =>
+ createJsonFile(dir, "data.json",
"""{"id":"not_a_number","name":"Alice"}""")
+
+ val e = intercept[Exception] {
+ spark.sql(s"""COPY INTO $dbName0.copy_json_badcast
+ |FROM '${dir.getAbsolutePath}'
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+ }
+ val msg = e.getMessage
+ assert(
+ msg.contains("Cast failure") ||
+ msg.contains("ABORT_STATEMENT") ||
+ msg.contains("CAST_INVALID_INPUT") ||
+ msg.contains("cannot be cast to") ||
+ e.getCause != null)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_badcast")
+ }
+
+ test("COPY INTO: JSON export with COMPRESSION") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_compress")
+ spark.sql(s"CREATE TABLE $dbName0.copy_json_compress (id INT, name
STRING)")
+ spark.sql(s"INSERT INTO $dbName0.copy_json_compress VALUES (1, 'Alice'),
(2, 'Bob')")
+
+ withJsonDir {
+ dir =>
+ val outputPath = new File(dir, "compressed").getAbsolutePath
+
+ val result = spark.sql(s"""COPY INTO '$outputPath'
+ |FROM $dbName0.copy_json_compress
+ |FILE_FORMAT = (TYPE = JSON, COMPRESSION =
GZIP)
+ |OVERWRITE = TRUE
+ |""".stripMargin)
+
+ assert(result.collect()(0).getLong(2) == 2L)
+
+ val outputDir = new File(outputPath)
+ val gzFiles = outputDir.listFiles().filter(_.getName.endsWith(".gz"))
+ assert(gzFiles.nonEmpty)
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_json_compress")
+ }
+
test("COPY INTO: case-insensitive column matching") {
spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_case")
spark.sql(s"CREATE TABLE $dbName0.copy_case (id INT, name STRING, age
INT)")