This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b887dbd54 [spark] Support FROM (query) export in COPY INTO location 
(#8096)
6b887dbd54 is described below

commit 6b887dbd54900132f2c4443aa4b6cdbc7bfb9bf1
Author: Junrui Lee <[email protected]>
AuthorDate: Sun Jun 7 21:14:24 2026 +0800

    [spark] Support FROM (query) export in COPY INTO location (#8096)
    
    Extend `COPY INTO <location>` (export) to accept an inline query as the
    source, not just a table:
    
    ```sql
    COPY INTO '/export/active_users/'
    FROM (SELECT id, name FROM my_db.users WHERE active = TRUE)
    FILE_FORMAT = (TYPE = CSV, HEADER = TRUE);
    ```
    
    Previously only `FROM table_name` was supported. The inline query is
    parsed through the
    session (Paimon) parser, so it behaves exactly like the same query run
    via `spark.sql`,
    including Paimon parser rules such as the v1 function rewrite.
---
 docs/docs/spark/sql-write.md                       |  33 ++-
 .../PaimonSqlExtensions.g4                         |  12 ++
 .../catalyst/analysis/PaimonViewResolver.scala     |  10 +-
 .../plans/logical/CopyIntoLocationCommand.scala    |  27 ++-
 .../spark/execution/CopyIntoLocationExec.scala     |  33 ++-
 .../paimon/spark/execution/CopyIntoUtils.scala     |  52 ++++-
 .../paimon/spark/execution/PaimonStrategy.scala    |  20 +-
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |  16 ++
 .../extensions/PaimonSqlExtensionsAstBuilder.scala |  41 +++-
 .../apache/paimon/spark/sql/CopyIntoTestBase.scala | 235 +++++++++++++++++++++
 .../spark/sql/PaimonV1FunctionTestBase.scala       |  31 +++
 11 files changed, 483 insertions(+), 27 deletions(-)

diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 703976857a..1cc6c0bc69 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -431,7 +431,7 @@ Parquet columns are matched **by column name** (not by 
position). Extra columns
 
 ```sql
 COPY INTO 'target_path'
-FROM table_name
+FROM { table_name | (SELECT ...) }
 FILE_FORMAT = (TYPE = CSV [, option = value, ...])
 [OVERWRITE = TRUE|FALSE]
 ```
@@ -445,11 +445,19 @@ FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER 
= ',')
 OVERWRITE = TRUE;
 ```
 
+**Write from query:**
+
+```sql
+COPY INTO '/export/active_users/'
+FROM (SELECT id, name FROM my_db.users WHERE active = TRUE)
+FILE_FORMAT = (TYPE = CSV, HEADER = TRUE);
+```
+
 #### Write JSON Files
 
 ```sql
 COPY INTO 'target_path'
-FROM table_name
+FROM { table_name | (SELECT ...) }
 FILE_FORMAT = (TYPE = JSON [, option = value, ...])
 [OVERWRITE = TRUE|FALSE]
 ```
@@ -463,11 +471,19 @@ FILE_FORMAT = (TYPE = JSON)
 OVERWRITE = TRUE;
 ```
 
+**JSON export from query:**
+
+```sql
+COPY INTO '/export/recent_events/'
+FROM (SELECT * FROM my_db.events WHERE event_date > '2024-01-01')
+FILE_FORMAT = (TYPE = JSON);
+```
+
 #### Write Parquet Files
 
 ```sql
 COPY INTO 'target_path'
-FROM table_name
+FROM { table_name | (SELECT ...) }
 FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
 [OVERWRITE = TRUE|FALSE]
 ```
@@ -490,6 +506,14 @@ FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
 OVERWRITE = TRUE;
 ```
 
+**Parquet export from aggregation query:**
+
+```sql
+COPY INTO '/export/summary/'
+FROM (SELECT dept, COUNT(*) AS cnt FROM my_db.employees GROUP BY dept)
+FILE_FORMAT = (TYPE = PARQUET);
+```
+
 #### FILE_FORMAT Options
 
 `FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or 
`TYPE = PARQUET`.
@@ -614,10 +638,11 @@ By default (`FORCE = FALSE`), COPY INTO tracks which 
files have been successfull
 
 - **CSV column-count mismatch**: Rows with fewer or more columns than the 
target schema are treated as malformed records. With `ON_ERROR = CONTINUE`, 
these rows are skipped and counted as errors.
 - Only **CSV**, **JSON**, and **Parquet** formats are supported.
-- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not 
supported.
 - `SINGLE = TRUE` (single-file output) is not supported.
 - File format options must be specified inline in `FILE_FORMAT = (...)`.
 - File listing is **non-recursive**: only direct files under the source path 
are processed. Subdirectories are ignored.
 - `PATTERN` matches the **base file name** only (not the full path).
 - Concurrent COPY INTO commands targeting the same table may produce duplicate 
data.
 - `SKIP_HEADER` only supports values `0` or `1`.
+- `FROM (...)` accepts any read-only query (e.g. `SELECT`, `WITH ... SELECT`, 
`VALUES`); statements with side effects (e.g. `INSERT`, `INSERT OVERWRITE 
DIRECTORY`, DDL) are rejected.
+- For a `FROM (...)` export, `rows_written` is an execution-time statistic 
counted by a separate pass before the files are written. Because the DataFrame 
is lazy and not cached, writing re-executes the query a second time; if the 
query is non-deterministic (e.g. uses `rand()`, `current_timestamp()`, or reads 
a volatile source), the two runs can produce different rows, so `rows_written` 
may not match the actual file contents. The result is intentionally not staged, 
so the export does not  [...]
diff --git 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 620a2bb95a..8c2e45b34e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++ 
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -86,6 +86,18 @@ statement
       overwriteClause?                                                         
             #copyIntoLocation
     | CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier
         LIKE source=multipartIdentifier ( . )*?                                
             #createTableLike
+    | COPY INTO targetPath=STRING
+      FROM query=parenBlock
+      fileFormatClause
+      overwriteClause?                                                         
             #copyIntoLocationFromQuery
+  ;
+
+// A parenthesized block with balanced parentheses, used to capture an inline 
subquery verbatim,
+// e.g. the (SELECT ...) in `COPY INTO <location> FROM (SELECT ...)`. A 
recursive rule is required
+// (rather than '(' .*? ')') so that nested parentheses such as `WHERE x IN 
(1, 2)` are matched
+// correctly. The raw subquery text is later extracted from the token stream 
by the AST builder.
+parenBlock
+  : '(' ( parenBlock | ~('(' | ')') )* ')'
   ;
 
 callArgument
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
index 8e19c5ddbe..d60e9ab8f3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.spark.SparkTypeUtils
 import org.apache.paimon.spark.catalog.SupportView
 import org.apache.paimon.view.View
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, 
UnresolvedRelation, UnresolvedTableOrView}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, UpCast}
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -91,13 +91,7 @@ case class PaimonViewResolver(spark: SparkSession)
     )
     try {
       CurrentOrigin.withOrigin(origin) {
-        try {
-          spark.sessionState.sqlParser.parseQuery(viewText)
-        } catch {
-          // For compatibility with Spark 3.2 and below
-          case _: NoSuchMethodError =>
-            spark.sessionState.sqlParser.parsePlan(viewText)
-        }
+        PaimonUtils.parseQueryCompat(spark.sessionState.sqlParser, viewText)
       }
     } catch {
       case _: ParseException =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
index 3e5d0a0666..dd04754c81 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
@@ -23,9 +23,26 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
 
+/**
+ * The (still unresolved) source to export: either a named table or an inline 
read-only query. Using
+ * an ADT keeps the two mutually exclusive, so impossible states such as 
"table name and query both
+ * present" cannot be constructed. The table name is resolved to a 
catalog/identifier later, during
+ * planning.
+ */
+sealed trait CopyIntoLocationSource
+
+object CopyIntoLocationSource {
+
+  /** Export the named table; `nameParts` is the multipart identifier, 
unresolved at this point. */
+  case class TableName(nameParts: Seq[String]) extends CopyIntoLocationSource
+
+  /** Export the result of the inline `FROM (<query>)` read-only query. */
+  case class Query(query: String) extends CopyIntoLocationSource
+}
+
 case class CopyIntoLocationCommand(
     targetPath: String,
-    table: Seq[String],
+    source: CopyIntoLocationSource,
     fileFormat: CopyFileFormat,
     overwrite: Boolean)
   extends PaimonLeafCommand {
@@ -37,6 +54,12 @@ case class CopyIntoLocationCommand(
   )
 
   override def simpleString(maxFields: Int): String = {
-    s"CopyIntoLocation: target=$targetPath, source=$table"
+    val sourceDesc = source match {
+      case CopyIntoLocationSource.Query(q) =>
+        val truncated = if (q.length > 100) q.take(100) + "..." else q
+        s"query=$truncated"
+      case CopyIntoLocationSource.TableName(nameParts) => s"table=$nameParts"
+    }
+    s"CopyIntoLocation: target=$targetPath, source=$sourceDesc"
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
index 45471bab84..76d147162a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -28,10 +28,21 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
 import org.apache.spark.unsafe.types.UTF8String
 
+/** The source to export from: either a Paimon table or an inline read-only 
query. */
+sealed trait CopyIntoSource
+
+object CopyIntoSource {
+
+  /** Export an existing Paimon table identified by `catalog`/`ident`. */
+  case class TableSource(catalog: TableCatalog, ident: Identifier) extends 
CopyIntoSource
+
+  /** Export the result of an inline `FROM (<query>)` read-only query. */
+  case class QuerySource(query: String) extends CopyIntoSource
+}
+
 case class CopyIntoLocationExec(
     spark: SparkSession,
-    catalog: TableCatalog,
-    ident: Identifier,
+    source: CopyIntoSource,
     targetPath: String,
     fileFormat: CopyFileFormat,
     overwrite: Boolean,
@@ -43,14 +54,21 @@ case class CopyIntoLocationExec(
   override protected def run(): Seq[InternalRow] = {
     fileFormat.validateForExport()
 
-    val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
-    val df = spark.table(tableName)
-
-    val rowCount = df.count()
+    val df = source match {
+      case CopyIntoSource.QuerySource(query) => 
CopyIntoUtils.queryToDataFrame(spark, query)
+      case CopyIntoSource.TableSource(catalog, ident) =>
+        spark.table(CopyIntoUtils.quoteIdentifier(catalog.name(), ident))
+    }
 
     val writerOptions = fileFormat.toSparkWriterOptions
     val saveMode = if (overwrite) SaveMode.Overwrite else 
SaveMode.ErrorIfExists
 
+    // `rows_written` is counted by a separate `count()` action before the 
write. The DataFrame is
+    // lazy and not cached, so the write re-executes the query a second time; 
for a non-deterministic
+    // query (e.g. `rand()`, `current_timestamp()`, or a volatile source) the 
two runs can yield
+    // different rows, so this count may not match the files (see the docs). 
We accept this rather
+    // than stage the whole result to disk just to make the count exact.
+    val rowCount = df.count()
     fileFormat.formatType match {
       case FileFormatType.JSON =>
         df.write.options(writerOptions).mode(saveMode).json(targetPath)
@@ -63,8 +81,9 @@ case class CopyIntoLocationExec(
     val hadoopConf = spark.sessionState.newHadoopConf()
     val fsPath = new Path(targetPath)
     val fs = fsPath.getFileSystem(hadoopConf)
+    // Count only data files (part-*); committer side files such as _SUCCESS 
are not data output.
     val fileCount = if (fs.exists(fsPath)) {
-      fs.listStatus(fsPath).count(_.isFile)
+      fs.listStatus(fsPath).count(s => s.isFile && 
s.getPath.getName.startsWith("part-"))
     } else {
       0
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
index fdf76b8dc6..32aa730780 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
@@ -18,12 +18,62 @@
 
 package org.apache.paimon.spark.execution
 
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{Command, InsertIntoDir, 
LogicalPlan, ParsedStatement}
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 
 object CopyIntoUtils {
 
+  /**
+   * Build a [[DataFrame]] from the inline query of `COPY INTO <location> FROM 
(<query>)`.
+   *
+   * The query is parsed with the session parser (the Paimon parser) via 
`parsePlan`, so it goes
+   * through the exact same path as `spark.sql(<query>)`, including Paimon's 
parser rules (e.g. the
+   * v1 function rewrite). Any read-only query is accepted (`SELECT`, `WITH 
... SELECT`, `VALUES`,
+   * etc.); the only restriction is that it must have no side effects, 
enforced by the
+   * [[hasSideEffect]] guard on the resulting plan: the Paimon parser does not 
reject statements at
+   * parse time (its `parseQuery` just delegates to `parsePlan`), so DDL/DML 
reaches us as a plan
+   * and must be rejected by inspecting the tree.
+   */
+  def queryToDataFrame(spark: SparkSession, query: String): DataFrame = {
+    val plan =
+      try {
+        spark.sessionState.sqlParser.parsePlan(query)
+      } catch {
+        case e: ParseException =>
+          throw new IllegalArgumentException(
+            s"COPY INTO <location> FROM (<query>) only supports read-only 
queries: $query",
+            e)
+      }
+    if (hasSideEffect(plan)) {
+      throw new IllegalArgumentException(
+        "COPY INTO <location> FROM (<query>) only supports read-only queries, 
" +
+          s"but got a statement with side effects: $query")
+    }
+    SparkShimLoader.shim.classicApi.createDataset(spark, plan)
+  }
+
+  /**
+   * Whether `plan` contains a node with side effects anywhere in its tree:
+   *   - `Command` covers resolved commands such as `DROP TABLE`.
+   *   - `ParsedStatement` covers parsed-but-unresolved DDL/DML such as 
`INSERT`, CTAS,
+   *     `CREATE VIEW` (it is the parent of `InsertIntoStatement`); a pure 
SELECT never contains
+   *     such a node.
+   *   - `InsertIntoDir` (the `INSERT OVERWRITE DIRECTORY` plan) is neither of 
the above, so it is
+   *     matched explicitly.
+   *
+   * `find` is used rather than `exists` because `TreeNode.exists` is absent 
on Spark 3.2.
+   */
+  private def hasSideEffect(plan: LogicalPlan): Boolean = {
+    plan.find {
+      case _: Command | _: ParsedStatement | _: InsertIntoDir => true
+      case _ => false
+    }.isDefined
+  }
+
   def quoteIdentifier(catalogName: String, ident: Identifier): String = {
     val parts = Seq(catalogName) ++
       ident.namespace().toSeq ++
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 6870ce8832..321e61f2cb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable, 
SparkUtils}
 import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
 import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
-import 
org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, 
CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView, 
DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions, 
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand, 
TruncatePaimonTableWithFilter}
+import 
org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand, 
CopyIntoLocationSource, CopyIntoTableCommand, CreateOrReplaceTagCommand, 
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, 
PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, 
ShowTagsCommand, TruncatePaimonTableWithFilter}
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.SparkSession
@@ -162,11 +162,23 @@ case class PaimonStrategy(spark: SparkSession)
         c.onError,
         c.output) :: Nil
 
-    case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog, 
ident), _, _) =>
+    case c @ CopyIntoLocationCommand(_, CopyIntoLocationSource.Query(query), 
_, _) =>
       CopyIntoLocationExec(
         spark,
-        catalog,
-        ident,
+        CopyIntoSource.QuerySource(query),
+        c.targetPath,
+        c.fileFormat,
+        c.overwrite,
+        c.output) :: Nil
+
+    case c @ CopyIntoLocationCommand(
+          _,
+          CopyIntoLocationSource.TableName(PaimonCatalogAndIdentifier(catalog, 
ident)),
+          _,
+          _) =>
+      CopyIntoLocationExec(
+        spark,
+        CopyIntoSource.TableSource(catalog, ident),
         c.targetPath,
         c.fileFormat,
         c.overwrite,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 16d34ecf4c..18dbdfd498 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.expressions.FieldReference
 import org.apache.spark.sql.connector.expressions.filter.Predicate
@@ -63,6 +64,21 @@ object PaimonUtils {
     SparkShimLoader.shim.classicApi.createDataset(sparkSession, logicalPlan)
   }
 
+  /**
+   * Parse a read-only query, preferring [[ParserInterface.parseQuery]] which 
rejects non-query
+   * statements at parse time. `parseQuery` was added in Spark 3.3, so on 
Spark 3.2 (where it is
+   * absent) we fall back to [[ParserInterface.parsePlan]]. Callers are 
responsible for handling any
+   * [[org.apache.spark.sql.catalyst.parser.ParseException]] and for any 
further validation of the
+   * returned plan.
+   */
+  def parseQueryCompat(parser: ParserInterface, sqlText: String): LogicalPlan 
= {
+    try {
+      parser.parseQuery(sqlText)
+    } catch {
+      case _: NoSuchMethodError => parser.parsePlan(sqlText)
+    }
+  }
+
   def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): 
Seq[Expression] = {
     DataSourceStrategy.normalizeExprs(exprs, attributes)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index 0d54c698e6..f5986492b6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -190,7 +190,46 @@ class PaimonSqlExtensionsAstBuilder(delegate: 
ParserInterface)
     val table = typedVisit[Seq[String]](ctx.multipartIdentifier)
     val fileFormat = buildFileFormat(ctx.fileFormatClause())
     val overwrite = 
Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null)
-    logical.CopyIntoLocationCommand(targetPath, table, fileFormat, overwrite)
+    logical.CopyIntoLocationCommand(
+      targetPath,
+      logical.CopyIntoLocationSource.TableName(table),
+      fileFormat,
+      overwrite)
+  }
+
+  /** Create a COPY INTO LOCATION FROM (query) (export) logical command. */
+  override def visitCopyIntoLocationFromQuery(
+      ctx: CopyIntoLocationFromQueryContext): logical.CopyIntoLocationCommand 
= withOrigin(ctx) {
+    val targetPath = unquoteString(ctx.targetPath.getText)
+    val query = extractParenBlockInner(ctx.query)
+    val fileFormat = buildFileFormat(ctx.fileFormatClause())
+    val overwrite = 
Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null)
+    logical.CopyIntoLocationCommand(
+      targetPath,
+      logical.CopyIntoLocationSource.Query(query),
+      fileFormat,
+      overwrite)
+  }
+
+  /**
+   * Extract the raw subquery text inside a [[ParenBlockContext]], i.e. the 
`SELECT ...` between the
+   * outer parentheses of `FROM (SELECT ...)`. The text is taken verbatim from 
the original input
+   * stream (not unquoted) so that the inline query is later re-parsed exactly 
as the user wrote it.
+   */
+  private def extractParenBlockInner(ctx: ParenBlockContext): String = {
+    val open = ctx.getStart.getStartIndex // '('
+    val close = ctx.getStop.getStopIndex // ')'
+    val inner =
+      if (close - 1 < open + 1) {
+        ""
+      } else {
+        ctx.getStart.getInputStream.getText(Interval.of(open + 1, close - 
1)).trim
+      }
+    if (inner.isEmpty) {
+      throw new IllegalArgumentException(
+        "COPY INTO <location> FROM (<query>) requires a non-empty query")
+    }
+    inner
   }
 
   private def buildFileFormat(ctx: FileFormatClauseContext): CopyFileFormat = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index 23365e35e6..bb127854d7 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -1422,4 +1422,239 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
 
     spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count")
   }
+
+  // ==================== FROM (SELECT ...) export tests ====================
+
+  test("COPY INTO location: FROM (SELECT ...) with CSV") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_query (id INT, name STRING, 
age INT)")
+    spark.sql(
+      s"INSERT INTO $dbName0.copy_export_query VALUES (1, 'Alice', 30), (2, 
'Bob', 25), (3, 'Carol', 35)")
+
+    withCsvDir {
+      dir =>
+        val exportPath = new File(dir, "query_export").getAbsolutePath
+        val result =
+          spark.sql(s"""COPY INTO '$exportPath'
+                       |FROM (SELECT id, name FROM $dbName0.copy_export_query 
WHERE age > 28)
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getInt(1) >= 1, "file_count should count at least one 
data file")
+        assert(rows(0).getLong(2) == 2, "Should export 2 rows (Alice age 30 
and Carol age 35)")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query")
+  }
+
+  test("COPY INTO location: FROM (SELECT ...) with JSON") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_json")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_query_json (id INT, name 
STRING)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export_query_json VALUES (1, 
'Alice'), (2, 'Bob')")
+
+    withJsonDir {
+      dir =>
+        val exportPath = new File(dir, "json_export").getAbsolutePath
+        val result =
+          spark.sql(s"""COPY INTO '$exportPath'
+                       |FROM (SELECT * FROM $dbName0.copy_export_query_json 
WHERE id = 1)
+                       |FILE_FORMAT = (TYPE = JSON)
+                       |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(2) == 1, "Should export 1 row")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_json")
+  }
+
+  test("COPY INTO location: FROM (SELECT ...) with aggregation") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_agg")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_query_agg (dept STRING, 
salary INT)")
+    spark.sql(
+      s"INSERT INTO $dbName0.copy_export_query_agg VALUES ('A', 100), ('A', 
200), ('B', 150)")
+
+    withCsvDir {
+      dir =>
+        val exportPath = new File(dir, "agg_export").getAbsolutePath
+        val result = spark.sql(
+          s"""COPY INTO '$exportPath'
+             |FROM (SELECT dept, SUM(salary) AS total FROM 
$dbName0.copy_export_query_agg GROUP BY dept)
+             |FILE_FORMAT = (TYPE = CSV)
+             |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(2) == 2, "Should export 2 aggregated rows")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_agg")
+  }
+
+  test("COPY INTO location: FROM (SELECT ...) with nested parentheses") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_nested")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_nested (id INT, name 
STRING)")
+    spark.sql(
+      s"INSERT INTO $dbName0.copy_export_nested VALUES (1, 'Alice'), (2, 
'Bob'), (3, 'Carol')")
+
+    withCsvDir {
+      dir =>
+        val exportPath = new File(dir, "nested_export").getAbsolutePath
+        // The subquery contains nested parentheses (IN (...)), which only 
parse correctly with the
+        // balanced-paren grammar rule.
+        val result =
+          spark.sql(s"""COPY INTO '$exportPath'
+                       |FROM (SELECT id, name FROM $dbName0.copy_export_nested 
WHERE id IN (1, 3))
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(2) == 2, "Should export 2 rows (id 1 and 3)")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_nested")
+  }
+
+  test("COPY INTO location: FROM (SELECT ...) with Parquet") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_parquet")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_parquet (dept STRING, salary 
INT)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export_parquet VALUES ('A', 100), 
('A', 200), ('B', 150)")
+
+    withParquetDir {
+      dir =>
+        val exportPath = new File(dir, "parquet_export").getAbsolutePath
+        val result = spark.sql(
+          s"""COPY INTO '$exportPath'
+             |FROM (SELECT dept, COUNT(*) AS cnt FROM 
$dbName0.copy_export_parquet GROUP BY dept)
+             |FILE_FORMAT = (TYPE = PARQUET)
+             |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(2) == 2, "Should export 2 aggregated rows")
+
+        val exported = spark.read.parquet(exportPath).collect()
+        assert(exported.length == 2, "Parquet output should contain the 2 
aggregated rows")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_parquet")
+  }
+
+  test("COPY INTO location: FROM (SELECT ...) with OVERWRITE = TRUE") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_ow")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_query_ow (id INT)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export_query_ow VALUES (1), (2), 
(3)")
+
+    withCsvDir {
+      dir =>
+        val exportPath = new File(dir, "ow_export").getAbsolutePath
+
+        // First export writes 3 rows.
+        spark.sql(s"""COPY INTO '$exportPath'
+                     |FROM (SELECT id FROM $dbName0.copy_export_query_ow)
+                     |FILE_FORMAT = (TYPE = CSV)
+                     |""".stripMargin)
+
+        // OVERWRITE = TRUE replaces the previous output with a 1-row result.
+        val result = spark.sql(s"""COPY INTO '$exportPath'
+                                  |FROM (SELECT id FROM 
$dbName0.copy_export_query_ow WHERE id = 1)
+                                  |FILE_FORMAT = (TYPE = CSV)
+                                  |OVERWRITE = TRUE
+                                  |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(2) == 1, "Overwrite export should report 1 row")
+
+        val exported = spark.read.csv(exportPath).collect()
+        assert(exported.length == 1, "Overwrite must replace the previous 
3-row output")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_ow")
+  }
+
+  test("COPY INTO location: FROM () empty query is rejected") {
+    withCsvDir {
+      dir =>
+        val exportPath = new File(dir, "empty_export").getAbsolutePath
+        val error = intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO '$exportPath'
+                       |FROM ()
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+        assert(error.getMessage.contains("requires a non-empty query"))
+    }
+  }
+
+  test("COPY INTO location: FROM (query) accepts read-only non-SELECT queries 
(VALUES)") {
+    withCsvDir {
+      dir =>
+        val exportPath = new File(dir, "values_export").getAbsolutePath
+        // A read-only query that is not a plain SELECT (here a VALUES list) 
is a valid export
+        // source: the only restriction is "no side effects", not "must be a 
SELECT".
+        val result =
+          spark.sql(s"""COPY INTO '$exportPath'
+                       |FROM (VALUES (1, 'a'), (2, 'b'))
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+
+        val rows = result.collect()
+        assert(rows.length == 1)
+        assert(rows(0).getLong(2) == 2, "Should export 2 rows from the VALUES 
list")
+    }
+  }
+
+  test("COPY INTO location: FROM (query) rejects statements with side 
effects") {
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_reject")
+    spark.sql(s"CREATE TABLE $dbName0.copy_export_reject (id INT)")
+    spark.sql(s"INSERT INTO $dbName0.copy_export_reject VALUES (1), (2)")
+
+    withCsvDir {
+      dir =>
+        val exportPath = new File(dir, "reject_export").getAbsolutePath
+
+        // A DDL statement must be rejected and must NOT actually drop the 
table.
+        val dropError = intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO '$exportPath'
+                       |FROM (DROP TABLE $dbName0.copy_export_reject)
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+        assert(dropError.getMessage.contains("only supports read-only 
queries"))
+
+        // An INSERT statement must be rejected as well.
+        val insertError = intercept[IllegalArgumentException] {
+          spark.sql(s"""COPY INTO '$exportPath'
+                       |FROM (INSERT INTO $dbName0.copy_export_reject VALUES 
(3))
+                       |FILE_FORMAT = (TYPE = CSV)
+                       |""".stripMargin)
+        }
+        assert(insertError.getMessage.contains("only supports read-only 
queries"))
+
+        // INSERT OVERWRITE DIRECTORY must be rejected (on Spark 3.2 it parses 
to InsertIntoDir,
+        // which is neither a Command nor a ParsedStatement, so it is rejected 
explicitly) and must
+        // NOT write any files.
+        val outDir = new File(dir, "evil_dir").getAbsolutePath
+        val dirError = intercept[IllegalArgumentException] {
+          spark.sql(
+            s"""COPY INTO '$exportPath'
+               |FROM (INSERT OVERWRITE DIRECTORY '$outDir' USING csv SELECT * 
FROM $dbName0.copy_export_reject)
+               |FILE_FORMAT = (TYPE = CSV)
+               |""".stripMargin)
+        }
+        assert(dirError.getMessage.contains("only supports read-only queries"))
+        assert(!new File(outDir).exists(), "INSERT OVERWRITE DIRECTORY must 
not have written files")
+
+        // The table and its original contents are untouched.
+        val rows = spark.sql(s"SELECT * FROM 
$dbName0.copy_export_reject").collect()
+        assert(rows.length == 2, "Source table must be unchanged by the 
rejected statements")
+    }
+
+    spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_reject")
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 88449fad5e..367327ed47 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -24,6 +24,8 @@ import org.apache.paimon.spark.function.FunctionResources._
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 
+import java.io.File
+
 abstract class PaimonV1FunctionTestBase extends 
PaimonSparkTestWithRestCatalogBase {
 
   test("Paimon V1 Function: create or replace function") {
@@ -266,6 +268,35 @@ abstract class PaimonV1FunctionTestBase extends 
PaimonSparkTestWithRestCatalogBa
       }.getMessage.contains("udf_add2 is a built-in/temporary function"))
     }
   }
+
+  test("Paimon V1 Function: COPY INTO location FROM (SELECT udf(...))") {
+    withUserDefinedFunction("udf_add2" -> false) {
+      sql(s"""
+             |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+             |USING JAR '$testUDFJarPath'
+             |""".stripMargin)
+      withTable("t") {
+        sql("CREATE TABLE t (a INT, b INT)")
+        sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+
+        withTempDir {
+          dir =>
+            val exportPath = new File(dir, "udf_export").getAbsolutePath
+            // The inline query references a Paimon v1 function. This only 
resolves if the query is
+            // parsed through the session (Paimon) parser, which applies the 
v1 function rewrite.
+            val result = sql(s"""
+                                |COPY INTO '$exportPath'
+                                |FROM (SELECT udf_add2(a, b) AS c FROM t)
+                                |FILE_FORMAT = (TYPE = CSV)
+                                |""".stripMargin)
+            checkAnswer(result.selectExpr("rows_written"), Row(2L))
+            checkAnswer(
+              spark.read.csv(exportPath).selectExpr("CAST(_c0 AS INT)"),
+              Seq(Row(3), Row(7)))
+        }
+      }
+    }
+  }
 }
 
 class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {


Reply via email to