This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6b887dbd54 [spark] Support FROM (query) export in COPY INTO location
(#8096)
6b887dbd54 is described below
commit 6b887dbd54900132f2c4443aa4b6cdbc7bfb9bf1
Author: Junrui Lee <[email protected]>
AuthorDate: Sun Jun 7 21:14:24 2026 +0800
[spark] Support FROM (query) export in COPY INTO location (#8096)
Extend `COPY INTO <location>` (export) to accept an inline query as the
source, not just a table:
```sql
COPY INTO '/export/active_users/'
FROM (SELECT id, name FROM my_db.users WHERE active = TRUE)
FILE_FORMAT = (TYPE = CSV, HEADER = TRUE);
```
Previously only `FROM table_name` was supported. The inline query is
parsed through the
session (Paimon) parser, so it behaves exactly like the same query run
via `spark.sql`,
including Paimon parser rules such as the v1 function rewrite.
---
docs/docs/spark/sql-write.md | 33 ++-
.../PaimonSqlExtensions.g4 | 12 ++
.../catalyst/analysis/PaimonViewResolver.scala | 10 +-
.../plans/logical/CopyIntoLocationCommand.scala | 27 ++-
.../spark/execution/CopyIntoLocationExec.scala | 33 ++-
.../paimon/spark/execution/CopyIntoUtils.scala | 52 ++++-
.../paimon/spark/execution/PaimonStrategy.scala | 20 +-
.../scala/org/apache/spark/sql/PaimonUtils.scala | 16 ++
.../extensions/PaimonSqlExtensionsAstBuilder.scala | 41 +++-
.../apache/paimon/spark/sql/CopyIntoTestBase.scala | 235 +++++++++++++++++++++
.../spark/sql/PaimonV1FunctionTestBase.scala | 31 +++
11 files changed, 483 insertions(+), 27 deletions(-)
diff --git a/docs/docs/spark/sql-write.md b/docs/docs/spark/sql-write.md
index 703976857a..1cc6c0bc69 100644
--- a/docs/docs/spark/sql-write.md
+++ b/docs/docs/spark/sql-write.md
@@ -431,7 +431,7 @@ Parquet columns are matched **by column name** (not by
position). Extra columns
```sql
COPY INTO 'target_path'
-FROM table_name
+FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = CSV [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```
@@ -445,11 +445,19 @@ FILE_FORMAT = (TYPE = CSV, HEADER = TRUE, FIELD_DELIMITER
= ',')
OVERWRITE = TRUE;
```
+**Write from query:**
+
+```sql
+COPY INTO '/export/active_users/'
+FROM (SELECT id, name FROM my_db.users WHERE active = TRUE)
+FILE_FORMAT = (TYPE = CSV, HEADER = TRUE);
+```
+
#### Write JSON Files
```sql
COPY INTO 'target_path'
-FROM table_name
+FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = JSON [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```
@@ -463,11 +471,19 @@ FILE_FORMAT = (TYPE = JSON)
OVERWRITE = TRUE;
```
+**JSON export from query:**
+
+```sql
+COPY INTO '/export/recent_events/'
+FROM (SELECT * FROM my_db.events WHERE event_date > '2024-01-01')
+FILE_FORMAT = (TYPE = JSON);
+```
+
#### Write Parquet Files
```sql
COPY INTO 'target_path'
-FROM table_name
+FROM { table_name | (SELECT ...) }
FILE_FORMAT = (TYPE = PARQUET [, option = value, ...])
[OVERWRITE = TRUE|FALSE]
```
@@ -490,6 +506,14 @@ FILE_FORMAT = (TYPE = PARQUET, COMPRESSION = GZIP)
OVERWRITE = TRUE;
```
+**Parquet export from aggregation query:**
+
+```sql
+COPY INTO '/export/summary/'
+FROM (SELECT dept, COUNT(*) AS cnt FROM my_db.employees GROUP BY dept)
+FILE_FORMAT = (TYPE = PARQUET);
+```
+
#### FILE_FORMAT Options
`FILE_FORMAT` is required and must include `TYPE = CSV`, `TYPE = JSON`, or
`TYPE = PARQUET`.
@@ -614,10 +638,11 @@ By default (`FORCE = FALSE`), COPY INTO tracks which
files have been successfull
- **CSV column-count mismatch**: Rows with fewer or more columns than the
target schema are treated as malformed records. With `ON_ERROR = CONTINUE`,
these rows are skipped and counted as errors.
- Only **CSV**, **JSON**, and **Parquet** formats are supported.
-- Writing files only supports `FROM table_name`; `FROM (SELECT ...)` is not
supported.
- `SINGLE = TRUE` (single-file output) is not supported.
- File format options must be specified inline in `FILE_FORMAT = (...)`.
- File listing is **non-recursive**: only direct files under the source path
are processed. Subdirectories are ignored.
- `PATTERN` matches the **base file name** only (not the full path).
- Concurrent COPY INTO commands targeting the same table may produce duplicate
data.
- `SKIP_HEADER` only supports values `0` or `1`.
+- `FROM (...)` accepts any read-only query (e.g. `SELECT`, `WITH ... SELECT`,
`VALUES`); statements with side effects (e.g. `INSERT`, `INSERT OVERWRITE
DIRECTORY`, DDL) are rejected.
+- For a `FROM (...)` export, `rows_written` is an execution-time statistic
counted by a separate pass before the files are written. Because the DataFrame
is lazy and not cached, writing re-executes the query a second time; if the
query is non-deterministic (e.g. uses `rand()`, `current_timestamp()`, or reads
a volatile source), the two runs can produce different rows, so `rows_written`
may not match the actual file contents. The result is intentionally not staged,
so the export does not [...]
diff --git
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
index 620a2bb95a..8c2e45b34e 100644
---
a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
+++
b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4
@@ -86,6 +86,18 @@ statement
overwriteClause?
#copyIntoLocation
| CREATE TABLE (IF NOT EXISTS)? target=multipartIdentifier
LIKE source=multipartIdentifier ( . )*?
#createTableLike
+ | COPY INTO targetPath=STRING
+ FROM query=parenBlock
+ fileFormatClause
+ overwriteClause?
#copyIntoLocationFromQuery
+ ;
+
+// A parenthesized block with balanced parentheses, used to capture an inline
subquery verbatim,
+// e.g. the (SELECT ...) in `COPY INTO <location> FROM (SELECT ...)`. A
recursive rule is required
+// (rather than '(' .*? ')') so that nested parentheses such as `WHERE x IN
(1, 2)` are matched
+// correctly. The raw subquery text is later extracted from the token stream
by the AST builder.
+parenBlock
+ : '(' ( parenBlock | ~('(' | ')') )* ')'
;
callArgument
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
index 8e19c5ddbe..d60e9ab8f3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonViewResolver.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.spark.catalog.SupportView
import org.apache.paimon.view.View
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal,
UnresolvedRelation, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, UpCast}
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -91,13 +91,7 @@ case class PaimonViewResolver(spark: SparkSession)
)
try {
CurrentOrigin.withOrigin(origin) {
- try {
- spark.sessionState.sqlParser.parseQuery(viewText)
- } catch {
- // For compatibility with Spark 3.2 and below
- case _: NoSuchMethodError =>
- spark.sessionState.sqlParser.parsePlan(viewText)
- }
+ PaimonUtils.parseQueryCompat(spark.sessionState.sqlParser, viewText)
}
} catch {
case _: ParseException =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
index 3e5d0a0666..dd04754c81 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/CopyIntoLocationCommand.scala
@@ -23,9 +23,26 @@ import org.apache.paimon.spark.leafnode.PaimonLeafCommand
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
+/**
+ * The (still unresolved) source to export: either a named table or an inline
read-only query. Using
+ * an ADT keeps the two mutually exclusive, so impossible states such as
"table name and query both
+ * present" cannot be constructed. The table name is resolved to a
catalog/identifier later, during
+ * planning.
+ */
+sealed trait CopyIntoLocationSource
+
+object CopyIntoLocationSource {
+
+ /** Export the named table; `nameParts` is the multipart identifier,
unresolved at this point. */
+ case class TableName(nameParts: Seq[String]) extends CopyIntoLocationSource
+
+ /** Export the result of the inline `FROM (<query>)` read-only query. */
+ case class Query(query: String) extends CopyIntoLocationSource
+}
+
case class CopyIntoLocationCommand(
targetPath: String,
- table: Seq[String],
+ source: CopyIntoLocationSource,
fileFormat: CopyFileFormat,
overwrite: Boolean)
extends PaimonLeafCommand {
@@ -37,6 +54,12 @@ case class CopyIntoLocationCommand(
)
override def simpleString(maxFields: Int): String = {
- s"CopyIntoLocation: target=$targetPath, source=$table"
+ val sourceDesc = source match {
+ case CopyIntoLocationSource.Query(q) =>
+ val truncated = if (q.length > 100) q.take(100) + "..." else q
+ s"query=$truncated"
+ case CopyIntoLocationSource.TableName(nameParts) => s"table=$nameParts"
+ }
+ s"CopyIntoLocation: target=$targetPath, source=$sourceDesc"
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
index 45471bab84..76d147162a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoLocationExec.scala
@@ -28,10 +28,21 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.unsafe.types.UTF8String
+/** The source to export from: either a Paimon table or an inline read-only
query. */
+sealed trait CopyIntoSource
+
+object CopyIntoSource {
+
+ /** Export an existing Paimon table identified by `catalog`/`ident`. */
+ case class TableSource(catalog: TableCatalog, ident: Identifier) extends
CopyIntoSource
+
+ /** Export the result of an inline `FROM (<query>)` read-only query. */
+ case class QuerySource(query: String) extends CopyIntoSource
+}
+
case class CopyIntoLocationExec(
spark: SparkSession,
- catalog: TableCatalog,
- ident: Identifier,
+ source: CopyIntoSource,
targetPath: String,
fileFormat: CopyFileFormat,
overwrite: Boolean,
@@ -43,14 +54,21 @@ case class CopyIntoLocationExec(
override protected def run(): Seq[InternalRow] = {
fileFormat.validateForExport()
- val tableName = CopyIntoUtils.quoteIdentifier(catalog.name(), ident)
- val df = spark.table(tableName)
-
- val rowCount = df.count()
+ val df = source match {
+ case CopyIntoSource.QuerySource(query) =>
CopyIntoUtils.queryToDataFrame(spark, query)
+ case CopyIntoSource.TableSource(catalog, ident) =>
+ spark.table(CopyIntoUtils.quoteIdentifier(catalog.name(), ident))
+ }
val writerOptions = fileFormat.toSparkWriterOptions
val saveMode = if (overwrite) SaveMode.Overwrite else
SaveMode.ErrorIfExists
+ // `rows_written` is counted by a separate `count()` action before the
write. The DataFrame is
+ // lazy and not cached, so the write re-executes the query a second time;
for a non-deterministic
+ // query (e.g. `rand()`, `current_timestamp()`, or a volatile source) the
two runs can yield
+ // different rows, so this count may not match the files (see the docs).
We accept this rather
+ // than stage the whole result to disk just to make the count exact.
+ val rowCount = df.count()
fileFormat.formatType match {
case FileFormatType.JSON =>
df.write.options(writerOptions).mode(saveMode).json(targetPath)
@@ -63,8 +81,9 @@ case class CopyIntoLocationExec(
val hadoopConf = spark.sessionState.newHadoopConf()
val fsPath = new Path(targetPath)
val fs = fsPath.getFileSystem(hadoopConf)
+ // Count only data files (part-*); committer side files such as _SUCCESS
are not data output.
val fileCount = if (fs.exists(fsPath)) {
- fs.listStatus(fsPath).count(_.isFile)
+ fs.listStatus(fsPath).count(s => s.isFile &&
s.getPath.getName.startsWith("part-"))
} else {
0
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
index fdf76b8dc6..32aa730780 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/CopyIntoUtils.scala
@@ -18,12 +18,62 @@
package org.apache.paimon.spark.execution
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{Command, InsertIntoDir,
LogicalPlan, ParsedStatement}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
object CopyIntoUtils {
+ /**
+ * Build a [[DataFrame]] from the inline query of `COPY INTO <location> FROM
(<query>)`.
+ *
+ * The query is parsed with the session parser (the Paimon parser) via
`parsePlan`, so it goes
+ * through the exact same path as `spark.sql(<query>)`, including Paimon's
parser rules (e.g. the
+ * v1 function rewrite). Any read-only query is accepted (`SELECT`, `WITH
... SELECT`, `VALUES`,
+ * etc.); the only restriction is that it must have no side effects,
enforced by the
+ * [[hasSideEffect]] guard on the resulting plan: the Paimon parser does not
reject statements at
+ * parse time (its `parseQuery` just delegates to `parsePlan`), so DDL/DML
reaches us as a plan
+ * and must be rejected by inspecting the tree.
+ */
+ def queryToDataFrame(spark: SparkSession, query: String): DataFrame = {
+ val plan =
+ try {
+ spark.sessionState.sqlParser.parsePlan(query)
+ } catch {
+ case e: ParseException =>
+ throw new IllegalArgumentException(
+ s"COPY INTO <location> FROM (<query>) only supports read-only
queries: $query",
+ e)
+ }
+ if (hasSideEffect(plan)) {
+ throw new IllegalArgumentException(
+ "COPY INTO <location> FROM (<query>) only supports read-only queries,
" +
+ s"but got a statement with side effects: $query")
+ }
+ SparkShimLoader.shim.classicApi.createDataset(spark, plan)
+ }
+
+ /**
+ * Whether `plan` contains a node with side effects anywhere in its tree:
+ * - `Command` covers resolved commands such as `DROP TABLE`.
+ * - `ParsedStatement` covers parsed-but-unresolved DDL/DML such as
`INSERT`, CTAS,
+ * `CREATE VIEW` (it is the parent of `InsertIntoStatement`); a pure
SELECT never contains
+ * such a node.
+ * - `InsertIntoDir` (the `INSERT OVERWRITE DIRECTORY` plan) is neither of
the above, so it is
+ * matched explicitly.
+ *
+ * `find` is used rather than `exists` because `TreeNode.exists` is absent
on Spark 3.2.
+ */
+ private def hasSideEffect(plan: LogicalPlan): Boolean = {
+ plan.find {
+ case _: Command | _: ParsedStatement | _: InsertIntoDir => true
+ case _ => false
+ }.isDefined
+ }
+
def quoteIdentifier(catalogName: String, ident: Identifier): String = {
val parts = Seq(catalogName) ++
ident.namespace().toSeq ++
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 6870ce8832..321e61f2cb 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable,
SparkUtils}
import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
-import
org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand,
CopyIntoTableCommand, CreateOrReplaceTagCommand, CreatePaimonView,
DeleteTagCommand, DropPaimonView, PaimonCallCommand, PaimonDropPartitions,
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand,
TruncatePaimonTableWithFilter}
+import
org.apache.paimon.spark.catalyst.plans.logical.{CopyIntoLocationCommand,
CopyIntoLocationSource, CopyIntoTableCommand, CreateOrReplaceTagCommand,
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand,
PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews,
ShowTagsCommand, TruncatePaimonTableWithFilter}
import org.apache.paimon.table.Table
import org.apache.spark.sql.SparkSession
@@ -162,11 +162,23 @@ case class PaimonStrategy(spark: SparkSession)
c.onError,
c.output) :: Nil
- case c @ CopyIntoLocationCommand(_, PaimonCatalogAndIdentifier(catalog,
ident), _, _) =>
+ case c @ CopyIntoLocationCommand(_, CopyIntoLocationSource.Query(query),
_, _) =>
CopyIntoLocationExec(
spark,
- catalog,
- ident,
+ CopyIntoSource.QuerySource(query),
+ c.targetPath,
+ c.fileFormat,
+ c.overwrite,
+ c.output) :: Nil
+
+ case c @ CopyIntoLocationCommand(
+ _,
+ CopyIntoLocationSource.TableName(PaimonCatalogAndIdentifier(catalog,
ident)),
+ _,
+ _) =>
+ CopyIntoLocationExec(
+ spark,
+ CopyIntoSource.TableSource(catalog, ident),
c.targetPath,
c.fileFormat,
c.overwrite,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 16d34ecf4c..18dbdfd498 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -23,6 +23,7 @@ import org.apache.spark.rdd.InputFileBlockHolder
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.filter.Predicate
@@ -63,6 +64,21 @@ object PaimonUtils {
SparkShimLoader.shim.classicApi.createDataset(sparkSession, logicalPlan)
}
+ /**
+ * Parse a read-only query, preferring [[ParserInterface.parseQuery]] which
rejects non-query
+ * statements at parse time. `parseQuery` was added in Spark 3.3, so on
Spark 3.2 (where it is
+ * absent) we fall back to [[ParserInterface.parsePlan]]. Callers are
responsible for handling any
+ * [[org.apache.spark.sql.catalyst.parser.ParseException]] and for any
further validation of the
+ * returned plan.
+ */
+ def parseQueryCompat(parser: ParserInterface, sqlText: String): LogicalPlan
= {
+ try {
+ parser.parseQuery(sqlText)
+ } catch {
+ case _: NoSuchMethodError => parser.parsePlan(sqlText)
+ }
+ }
+
def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]):
Seq[Expression] = {
DataSourceStrategy.normalizeExprs(exprs, attributes)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
index 0d54c698e6..f5986492b6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala
@@ -190,7 +190,46 @@ class PaimonSqlExtensionsAstBuilder(delegate:
ParserInterface)
val table = typedVisit[Seq[String]](ctx.multipartIdentifier)
val fileFormat = buildFileFormat(ctx.fileFormatClause())
val overwrite =
Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null)
- logical.CopyIntoLocationCommand(targetPath, table, fileFormat, overwrite)
+ logical.CopyIntoLocationCommand(
+ targetPath,
+ logical.CopyIntoLocationSource.TableName(table),
+ fileFormat,
+ overwrite)
+ }
+
+ /** Create a COPY INTO LOCATION FROM (query) (export) logical command. */
+ override def visitCopyIntoLocationFromQuery(
+ ctx: CopyIntoLocationFromQueryContext): logical.CopyIntoLocationCommand
= withOrigin(ctx) {
+ val targetPath = unquoteString(ctx.targetPath.getText)
+ val query = extractParenBlockInner(ctx.query)
+ val fileFormat = buildFileFormat(ctx.fileFormatClause())
+ val overwrite =
Option(ctx.overwriteClause()).exists(_.booleanValue().TRUE() != null)
+ logical.CopyIntoLocationCommand(
+ targetPath,
+ logical.CopyIntoLocationSource.Query(query),
+ fileFormat,
+ overwrite)
+ }
+
+ /**
+ * Extract the raw subquery text inside a [[ParenBlockContext]], i.e. the
`SELECT ...` between the
+ * outer parentheses of `FROM (SELECT ...)`. The text is taken verbatim from
the original input
+ * stream (not unquoted) so that the inline query is later re-parsed exactly
as the user wrote it.
+ */
+ private def extractParenBlockInner(ctx: ParenBlockContext): String = {
+ val open = ctx.getStart.getStartIndex // '('
+ val close = ctx.getStop.getStopIndex // ')'
+ val inner =
+ if (close - 1 < open + 1) {
+ ""
+ } else {
+ ctx.getStart.getInputStream.getText(Interval.of(open + 1, close -
1)).trim
+ }
+ if (inner.isEmpty) {
+ throw new IllegalArgumentException(
+ "COPY INTO <location> FROM (<query>) requires a non-empty query")
+ }
+ inner
}
private def buildFileFormat(ctx: FileFormatClauseContext): CopyFileFormat = {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
index 23365e35e6..bb127854d7 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/CopyIntoTestBase.scala
@@ -1422,4 +1422,239 @@ class CopyIntoTestBase extends PaimonSparkTestBase {
spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_parquet_count")
}
+
+ // ==================== FROM (SELECT ...) export tests ====================
+
+ test("COPY INTO location: FROM (SELECT ...) with CSV") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_query (id INT, name STRING,
age INT)")
+ spark.sql(
+ s"INSERT INTO $dbName0.copy_export_query VALUES (1, 'Alice', 30), (2,
'Bob', 25), (3, 'Carol', 35)")
+
+ withCsvDir {
+ dir =>
+ val exportPath = new File(dir, "query_export").getAbsolutePath
+ val result =
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (SELECT id, name FROM $dbName0.copy_export_query
WHERE age > 28)
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getInt(1) >= 1, "file_count should count at least one
data file")
+ assert(rows(0).getLong(2) == 2, "Should export 2 rows (Alice age 30
and Carol age 35)")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query")
+ }
+
+ test("COPY INTO location: FROM (SELECT ...) with JSON") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_json")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_query_json (id INT, name
STRING)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export_query_json VALUES (1,
'Alice'), (2, 'Bob')")
+
+ withJsonDir {
+ dir =>
+ val exportPath = new File(dir, "json_export").getAbsolutePath
+ val result =
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (SELECT * FROM $dbName0.copy_export_query_json
WHERE id = 1)
+ |FILE_FORMAT = (TYPE = JSON)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(2) == 1, "Should export 1 row")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_json")
+ }
+
+ test("COPY INTO location: FROM (SELECT ...) with aggregation") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_agg")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_query_agg (dept STRING,
salary INT)")
+ spark.sql(
+ s"INSERT INTO $dbName0.copy_export_query_agg VALUES ('A', 100), ('A',
200), ('B', 150)")
+
+ withCsvDir {
+ dir =>
+ val exportPath = new File(dir, "agg_export").getAbsolutePath
+ val result = spark.sql(
+ s"""COPY INTO '$exportPath'
+ |FROM (SELECT dept, SUM(salary) AS total FROM
$dbName0.copy_export_query_agg GROUP BY dept)
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(2) == 2, "Should export 2 aggregated rows")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_agg")
+ }
+
+ test("COPY INTO location: FROM (SELECT ...) with nested parentheses") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_nested")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_nested (id INT, name
STRING)")
+ spark.sql(
+ s"INSERT INTO $dbName0.copy_export_nested VALUES (1, 'Alice'), (2,
'Bob'), (3, 'Carol')")
+
+ withCsvDir {
+ dir =>
+ val exportPath = new File(dir, "nested_export").getAbsolutePath
+ // The subquery contains nested parentheses (IN (...)), which only
parse correctly with the
+ // balanced-paren grammar rule.
+ val result =
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (SELECT id, name FROM $dbName0.copy_export_nested
WHERE id IN (1, 3))
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(2) == 2, "Should export 2 rows (id 1 and 3)")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_nested")
+ }
+
+ test("COPY INTO location: FROM (SELECT ...) with Parquet") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_parquet")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_parquet (dept STRING, salary
INT)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export_parquet VALUES ('A', 100),
('A', 200), ('B', 150)")
+
+ withParquetDir {
+ dir =>
+ val exportPath = new File(dir, "parquet_export").getAbsolutePath
+ val result = spark.sql(
+ s"""COPY INTO '$exportPath'
+ |FROM (SELECT dept, COUNT(*) AS cnt FROM
$dbName0.copy_export_parquet GROUP BY dept)
+ |FILE_FORMAT = (TYPE = PARQUET)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(2) == 2, "Should export 2 aggregated rows")
+
+ val exported = spark.read.parquet(exportPath).collect()
+ assert(exported.length == 2, "Parquet output should contain the 2
aggregated rows")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_parquet")
+ }
+
+ test("COPY INTO location: FROM (SELECT ...) with OVERWRITE = TRUE") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_ow")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_query_ow (id INT)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export_query_ow VALUES (1), (2),
(3)")
+
+ withCsvDir {
+ dir =>
+ val exportPath = new File(dir, "ow_export").getAbsolutePath
+
+ // First export writes 3 rows.
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (SELECT id FROM $dbName0.copy_export_query_ow)
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ // OVERWRITE = TRUE replaces the previous output with a 1-row result.
+ val result = spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (SELECT id FROM
$dbName0.copy_export_query_ow WHERE id = 1)
+ |FILE_FORMAT = (TYPE = CSV)
+ |OVERWRITE = TRUE
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(2) == 1, "Overwrite export should report 1 row")
+
+ val exported = spark.read.csv(exportPath).collect()
+ assert(exported.length == 1, "Overwrite must replace the previous
3-row output")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_query_ow")
+ }
+
+ test("COPY INTO location: FROM () empty query is rejected") {
+ withCsvDir {
+ dir =>
+ val exportPath = new File(dir, "empty_export").getAbsolutePath
+ val error = intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM ()
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(error.getMessage.contains("requires a non-empty query"))
+ }
+ }
+
+ test("COPY INTO location: FROM (query) accepts read-only non-SELECT queries
(VALUES)") {
+ withCsvDir {
+ dir =>
+ val exportPath = new File(dir, "values_export").getAbsolutePath
+ // A read-only query that is not a plain SELECT (here a VALUES list)
is a valid export
+ // source: the only restriction is "no side effects", not "must be a
SELECT".
+ val result =
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (VALUES (1, 'a'), (2, 'b'))
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+
+ val rows = result.collect()
+ assert(rows.length == 1)
+ assert(rows(0).getLong(2) == 2, "Should export 2 rows from the VALUES
list")
+ }
+ }
+
+ test("COPY INTO location: FROM (query) rejects statements with side
effects") {
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_reject")
+ spark.sql(s"CREATE TABLE $dbName0.copy_export_reject (id INT)")
+ spark.sql(s"INSERT INTO $dbName0.copy_export_reject VALUES (1), (2)")
+
+ withCsvDir {
+ dir =>
+ val exportPath = new File(dir, "reject_export").getAbsolutePath
+
+ // A DDL statement must be rejected and must NOT actually drop the
table.
+ val dropError = intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (DROP TABLE $dbName0.copy_export_reject)
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(dropError.getMessage.contains("only supports read-only
queries"))
+
+ // An INSERT statement must be rejected as well.
+ val insertError = intercept[IllegalArgumentException] {
+ spark.sql(s"""COPY INTO '$exportPath'
+ |FROM (INSERT INTO $dbName0.copy_export_reject VALUES
(3))
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(insertError.getMessage.contains("only supports read-only
queries"))
+
+ // INSERT OVERWRITE DIRECTORY must be rejected (on Spark 3.2 it parses
to InsertIntoDir,
+ // which is neither a Command nor a ParsedStatement, so it is rejected
explicitly) and must
+ // NOT write any files.
+ val outDir = new File(dir, "evil_dir").getAbsolutePath
+ val dirError = intercept[IllegalArgumentException] {
+ spark.sql(
+ s"""COPY INTO '$exportPath'
+ |FROM (INSERT OVERWRITE DIRECTORY '$outDir' USING csv SELECT *
FROM $dbName0.copy_export_reject)
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ }
+ assert(dirError.getMessage.contains("only supports read-only queries"))
+ assert(!new File(outDir).exists(), "INSERT OVERWRITE DIRECTORY must
not have written files")
+
+ // The table and its original contents are untouched.
+ val rows = spark.sql(s"SELECT * FROM
$dbName0.copy_export_reject").collect()
+ assert(rows.length == 2, "Source table must be unchanged by the
rejected statements")
+ }
+
+ spark.sql(s"DROP TABLE IF EXISTS $dbName0.copy_export_reject")
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
index 88449fad5e..367327ed47 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonV1FunctionTestBase.scala
@@ -24,6 +24,8 @@ import org.apache.paimon.spark.function.FunctionResources._
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
+import java.io.File
+
abstract class PaimonV1FunctionTestBase extends
PaimonSparkTestWithRestCatalogBase {
test("Paimon V1 Function: create or replace function") {
@@ -266,6 +268,35 @@ abstract class PaimonV1FunctionTestBase extends
PaimonSparkTestWithRestCatalogBa
}.getMessage.contains("udf_add2 is a built-in/temporary function"))
}
}
+
+ test("Paimon V1 Function: COPY INTO location FROM (SELECT udf(...))") {
+ withUserDefinedFunction("udf_add2" -> false) {
+ sql(s"""
+ |CREATE FUNCTION udf_add2 AS '$UDFExampleAdd2Class'
+ |USING JAR '$testUDFJarPath'
+ |""".stripMargin)
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b INT)")
+ sql("INSERT INTO t VALUES (1, 2), (3, 4)")
+
+ withTempDir {
+ dir =>
+ val exportPath = new File(dir, "udf_export").getAbsolutePath
+ // The inline query references a Paimon v1 function. This only
resolves if the query is
+ // parsed through the session (Paimon) parser, which applies the
v1 function rewrite.
+ val result = sql(s"""
+ |COPY INTO '$exportPath'
+ |FROM (SELECT udf_add2(a, b) AS c FROM t)
+ |FILE_FORMAT = (TYPE = CSV)
+ |""".stripMargin)
+ checkAnswer(result.selectExpr("rows_written"), Row(2L))
+ checkAnswer(
+ spark.read.csv(exportPath).selectExpr("CAST(_c0 AS INT)"),
+ Seq(Row(3), Row(7)))
+ }
+ }
+ }
+ }
}
class DisablePaimonV1FunctionTest extends PaimonSparkTestWithRestCatalogBase {