This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f2327257b [spark] Support UPDATE for append table (#3129)
f2327257b is described below
commit f2327257bb1410c7900ea01130052c7773e86614
Author: Yann Byron <[email protected]>
AuthorDate: Mon Apr 1 20:41:59 2024 +0800
[spark] Support UPDATE for append table (#3129)
---
.../apache/paimon/utils/FileStorePathFactory.java | 12 +-
.../paimon/spark/procedure/CompactProcedure.java | 8 +-
...hFileStoreTable.scala => PaimonSplitScan.scala} | 17 ++-
.../catalyst/analysis/PaimonDeleteTable.scala | 2 +-
.../catalyst/analysis/PaimonMergeIntoBase.scala | 2 +-
.../catalyst/analysis/PaimonUpdateTable.scala | 38 +++--
.../spark/catalyst/analysis/RowLevelHelper.scala | 23 +--
.../spark/catalyst/analysis/RowLevelOp.scala | 36 ++++-
.../analysis/expressions/ExpressionHelper.scala | 53 ++++---
.../analysis/expressions/ExpressionUtils.scala} | 11 +-
.../commands/DeleteFromPaimonTableCommand.scala | 1 -
.../paimon/spark/commands/PaimonCommand.scala | 4 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 17 ++-
.../paimon/spark/commands/SparkDataFileMeta.scala | 64 ++++++++
.../spark/commands/UpdatePaimonTableCommand.scala | 168 ++++++++++++++++++---
.../paimon/spark/commands/WithFileStoreTable.scala | 4 +
.../spark/commands/WriteIntoPaimonTable.scala | 17 +--
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 2 +-
.../apache/paimon/spark/sql/UpdateTableTest.scala | 79 +++++++++-
19 files changed, 437 insertions(+), 121 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 903a814e1..0f3ad7fec 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -101,8 +101,16 @@ public class FileStorePathFactory {
}
public Path bucketPath(BinaryRow partition, int bucket) {
- return new Path(
- root + "/" + getPartitionString(partition) + "/" +
BUCKET_PATH_PREFIX + bucket);
+ return new Path(root + "/" + relativePartitionAndBucketPath(partition,
bucket));
+ }
+
+ public Path relativePartitionAndBucketPath(BinaryRow partition, int
bucket) {
+ String partitionPath = getPartitionString(partition);
+ if (partitionPath.isEmpty()) {
+ return new Path(BUCKET_PATH_PREFIX + bucket);
+ } else {
+ return new Path(getPartitionString(partition) + "/" +
BUCKET_PATH_PREFIX + bucket);
+ }
}
/** IMPORTANT: This method is NOT THREAD SAFE. */
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 111e9f75c..639633644 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -28,7 +28,7 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
-import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper;
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.BucketMode;
@@ -143,9 +143,9 @@ public class CompactProcedure extends BaseProcedure {
LogicalPlan relation = createRelation(tableIdent);
Expression condition = null;
if (!StringUtils.isBlank(finalWhere)) {
- condition = ExpressionHelper.resolveFilter(spark(),
relation, finalWhere);
+ condition = ExpressionUtils.resolveFilter(spark(),
relation, finalWhere);
checkArgument(
- ExpressionHelper.onlyHasPartitionPredicate(
+ ExpressionUtils.isValidPredicate(
spark(),
condition,
table.partitionKeys().toArray(new
String[0])),
@@ -188,7 +188,7 @@ public class CompactProcedure extends BaseProcedure {
Predicate filter =
condition == null
? null
- :
ExpressionHelper.convertConditionToPaimonPredicate(
+ :
ExpressionUtils.convertConditionToPaimonPredicate(
condition, relation.output(),
table.rowType());
switch (bucketMode) {
case FIXED:
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
similarity index 59%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 55be49498..e86f4caf6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -16,12 +16,21 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.Table
+import org.apache.paimon.table.source.{DataSplit, Split}
-private[spark] trait WithFileStoreTable {
+import org.apache.spark.sql.connector.read.{Batch, Scan}
+import org.apache.spark.sql.types.StructType
- def table: FileStoreTable
+/** For internal use only. */
+case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends
Scan {
+
+ override def readSchema(): StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
+
+ override def toBatch: Batch = {
+ PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 8ef9de1cc..f2800d742 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -30,7 +30,7 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with
RowLevelHelper {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved
=>
- checkPaimonTable(table)
+ checkPaimonTable(table.getTable)
DeleteFromPaimonTableCommand(table, d)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index f95185699..c07b58399 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -46,7 +46,7 @@ trait PaimonMergeIntoBase
val v2Table = relation.table.asInstanceOf[SparkTable]
val targetOutput = relation.output
- checkPaimonTable(v2Table)
+ checkPaimonTable(v2Table.getTable)
checkCondition(merge.mergeCondition)
merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
index 940cb0e14..e369c46e2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
@@ -18,27 +18,47 @@
package org.apache.paimon.spark.catalyst.analysis
-import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
-object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper {
+import scala.collection.JavaConverters._
+
+object PaimonUpdateTable
+ extends Rule[LogicalPlan]
+ with RowLevelHelper
+ with AssignmentAlignmentHelper {
override val operation: RowLevelOp = Update
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
- case u @ UpdateTable(PaimonRelation(table), assignments, _) if
u.resolved =>
- checkPaimonTable(table)
+ case u @ UpdateTable(PaimonRelation(table), assignments, condition) if
u.resolved =>
+ checkPaimonTable(table.getTable)
- val primaryKeys =
table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
- if (!validUpdateAssignment(u.table.outputSet, primaryKeys,
assignments)) {
- throw new RuntimeException("Can't update the primary key column.")
- }
+ table.getTable match {
+ case paimonTable: FileStoreTable =>
+ val primaryKeys = paimonTable.primaryKeys().asScala
+ if (primaryKeys.isEmpty) {
+ condition.foreach(checkSubquery)
+ }
+ if (!validUpdateAssignment(u.table.outputSet, primaryKeys,
assignments)) {
+ throw new RuntimeException("Can't update the primary key
column.")
+ }
- UpdatePaimonTableCommand(u)
+ val relation = PaimonRelation.getPaimonRelation(u.table)
+ UpdatePaimonTableCommand(
+ relation,
+ paimonTable,
+ condition.getOrElse(TrueLiteral),
+ assignments)
+
+ case _ =>
+ throw new RuntimeException("Update Operation is only supported for
FileStoreTable.")
+ }
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index 659d84dab..9981e7d3c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -18,33 +18,18 @@
package org.apache.paimon.spark.catalyst.analysis
-import org.apache.paimon.CoreOptions.MERGE_ENGINE
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.Table
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression,
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.Assignment
trait RowLevelHelper extends SQLConfHelper {
val operation: RowLevelOp
- protected def checkPaimonTable(table: SparkTable): Unit = {
- val paimonTable = if (table.getTable.primaryKeys().size() > 0) {
- table.getTable
- } else {
- throw new UnsupportedOperationException(
- s"Only support to $operation table with primary keys.")
- }
-
- val options = Options.fromMap(paimonTable.options)
- val mergeEngine = options.get(MERGE_ENGINE)
- if (!operation.supportedMergeEngine.contains(mergeEngine)) {
- throw new UnsupportedOperationException(
- s"merge engine $mergeEngine can not support $operation, currently only
${operation.supportedMergeEngine
- .mkString(", ")} can support $operation.")
- }
+ protected def checkPaimonTable(table: Table): Unit = {
+ operation.checkValidity(table)
}
protected def checkSubquery(condition: Expression): Unit = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
index 3d4fe0889..f83cf91d8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
@@ -18,28 +18,54 @@
package org.apache.paimon.spark.catalyst.analysis
-import org.apache.paimon.CoreOptions.MergeEngine
+import org.apache.paimon.CoreOptions.{MERGE_ENGINE, MergeEngine}
+import org.apache.paimon.options.Options
+import org.apache.paimon.table.Table
sealed trait RowLevelOp {
- val supportedMergeEngine: Seq[MergeEngine]
+
+ val name: String = this.getClass.getSimpleName.stripSuffix("$")
+
+ protected val supportedMergeEngine: Seq[MergeEngine]
+
+ protected val supportAppendOnlyTable: Boolean
+
+ def checkValidity(table: Table): Unit = {
+ if (!supportAppendOnlyTable && table.primaryKeys().isEmpty) {
+ throw new UnsupportedOperationException(s"Only support to $name table
with primary keys.")
+ }
+
+ val mergeEngine = Options.fromMap(table.options).get(MERGE_ENGINE)
+ if (!supportedMergeEngine.contains(mergeEngine)) {
+ throw new UnsupportedOperationException(
+ s"merge engine $mergeEngine can not support $name, currently only
${supportedMergeEngine
+ .mkString(", ")} can support $name.")
+ }
+ }
}
case object Delete extends RowLevelOp {
- override def toString: String = "delete"
override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE)
+
+ override val supportAppendOnlyTable: Boolean = false
+
}
case object Update extends RowLevelOp {
- override def toString: String = "update"
override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+
+ override val supportAppendOnlyTable: Boolean = true
+
}
case object MergeInto extends RowLevelOp {
- override def toString: String = "merge into"
override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+
+ override val supportAppendOnlyTable: Boolean = false
+
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 65f2a04bd..3e09557d5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast,
Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.internal.SQLConf
@@ -100,35 +101,37 @@ trait ExpressionHelper extends PredicateHelper {
throw new UnsupportedOperationException(
s"Unsupported update expression: $other, only support update with
PrimitiveType and StructType.")
}
-}
-
-object ExpressionHelper {
-
- case class FakeLogicalPlan(exprs: Seq[Expression], children:
Seq[LogicalPlan])
- extends LogicalPlan {
- override def output: Seq[Attribute] = Nil
- override protected def withNewChildrenInternal(
- newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children
= newChildren)
- }
-
- def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String):
Expression = {
- val unResolvedExpression =
spark.sessionState.sqlParser.parseExpression(where)
+ def resolveFilter(spark: SparkSession, plan: LogicalPlan, conditionSql:
String): Expression = {
+ val unResolvedExpression =
spark.sessionState.sqlParser.parseExpression(conditionSql)
val filter = Filter(unResolvedExpression, plan)
spark.sessionState.analyzer.execute(filter) match {
case filter: Filter => filter.condition
- case _ => throw new RuntimeException(s"Could not resolve expression
$where in plan: $plan")
+ case _ =>
+ throw new RuntimeException(s"Could not resolve expression
$conditionSql in plan: $plan")
}
}
- def onlyHasPartitionPredicate(
+ def isPredicatePartitionColumnsOnly(
+ condition: Expression,
+ partitionColumns: Seq[String],
+ resolver: Resolver
+ ): Boolean = {
+ condition.references.forall(r => partitionColumns.exists(resolver(r.name,
_)))
+ }
+
+ /**
+ * A valid predicate should meet two requirements: 1) This predicate only
contains partition
+ * columns. 2) This predicate doesn't contain subquery.
+ */
+ def isValidPredicate(
spark: SparkSession,
expr: Expression,
partitionCols: Array[String]): Boolean = {
- val resolvedNameEquals = spark.sessionState.analyzer.resolver
+ val resolver = spark.sessionState.analyzer.resolver
splitConjunctivePredicates(expr).forall(
e =>
- e.references.forall(r =>
partitionCols.exists(resolvedNameEquals(r.name, _))) &&
+ isPredicatePartitionColumnsOnly(e, partitionCols, resolver) &&
!SubqueryExpression.hasSubquery(expr))
}
@@ -148,12 +151,16 @@ object ExpressionHelper {
val predicates = filters.map(converter.convert)
PredicateBuilder.and(predicates: _*)
}
+}
- def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
- condition match {
- case And(cond1, cond2) =>
- splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
- case other => other :: Nil
- }
+object ExpressionHelper {
+
+ case class FakeLogicalPlan(exprs: Seq[Expression], children:
Seq[LogicalPlan])
+ extends LogicalPlan {
+ override def output: Seq[Attribute] = Nil
+
+ override protected def withNewChildrenInternal(
+ newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children
= newChildren)
}
+
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala
similarity index 82%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala
index 55be49498..b0c97c620 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala
@@ -16,12 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst.analysis.expressions
-import org.apache.paimon.table.FileStoreTable
-
-private[spark] trait WithFileStoreTable {
-
- def table: FileStoreTable
-
-}
+/** This wrapper is only used in java code, e.g. Procedure. */
+object ExpressionUtils extends ExpressionHelper
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 9f79664be..e4bf22d0f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -21,7 +21,6 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
import org.apache.paimon.spark.{InsertInto, SparkTable}
-import
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 02a0e0cc2..ba404704b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -19,13 +19,13 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.types.RowType
-import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
/** Helper trait for all paimon commands. */
-trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
+trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index edccb4989..da269d486 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -35,7 +35,7 @@ import java.io.IOException
import scala.collection.JavaConverters._
-trait PaimonSparkWriter extends WithFileStoreTable {
+case class PaimonSparkWriter(table: FileStoreTable) {
private lazy val tableSchema = table.schema
@@ -52,7 +52,9 @@ trait PaimonSparkWriter extends WithFileStoreTable {
private lazy val serializer = new CommitMessageSerializer
- def write(data: Dataset[_], writeBuilder: BatchWriteBuilder):
Seq[CommitMessage] = {
+ val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
+
+ def write(data: Dataset[_]): Seq[CommitMessage] = {
val sparkSession = data.sparkSession
import sparkSession.implicits._
@@ -101,6 +103,17 @@ trait PaimonSparkWriter extends WithFileStoreTable {
commitMessages.toSeq
}
+ def commit(commitMessages: Seq[CommitMessage]): Unit = {
+ val tableCommit = writeBuilder.newCommit()
+ try {
+ tableCommit.commit(commitMessages.toList.asJava)
+ } catch {
+ case e: Throwable => throw new RuntimeException(e);
+ } finally {
+ tableCommit.close()
+ }
+ }
+
/** assign a valid bucket id for each of record. */
private def assignBucketId(
sparkSession: SparkSession,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
new file mode 100644
index 000000000..cb3266157
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.utils.FileStorePathFactory
+
+import scala.collection.JavaConverters._
+
+case class SparkDataFileMeta(
+ partition: BinaryRow,
+ bucket: Int,
+ totalBuckets: Int,
+ dataFileMeta: DataFileMeta) {
+
+ def relativePath(fileStorePathFactory: FileStorePathFactory): String = {
+ fileStorePathFactory
+ .relativePartitionAndBucketPath(partition, bucket)
+ .toUri
+ .toString + "/" + dataFileMeta.fileName()
+ }
+}
+
+object SparkDataFileMeta {
+ def convertToSparkDataFileMeta(
+ dataSplit: DataSplit,
+ totalBuckets: Int): Seq[SparkDataFileMeta] = {
+ dataSplit.dataFiles().asScala.map {
+ file => SparkDataFileMeta(dataSplit.partition, dataSplit.bucket,
totalBuckets, file)
+ }
+ }
+
+ def convertToDataSplits(sparkDataFiles: Array[SparkDataFileMeta]):
Array[DataSplit] = {
+ sparkDataFiles
+ .groupBy(file => (file.partition, file.bucket))
+ .map {
+ case ((partition, bucket), files) =>
+ new DataSplit.Builder()
+ .withPartition(partition)
+ .withBucket(bucket)
+ .withDataFiles(files.map(_.dataFileMeta).toList.asJava)
+ .build()
+ }
+ .toArray
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 45ef870f2..3bc236a86 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -18,45 +18,169 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.catalyst.analysis.{AssignmentAlignmentHelper,
PaimonRelation}
+import org.apache.paimon.index.IndexFileMeta
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
+import org.apache.paimon.spark.PaimonSplitScan
+import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
+import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
+import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowKind
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.Utils.createDataset
-import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project,
UpdateTable}
-import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
Project, SupportsSubquery}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
+import org.apache.spark.sql.functions.{input_file_name, lit}
-case class UpdatePaimonTableCommand(u: UpdateTable)
+import java.net.URI
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+case class UpdatePaimonTableCommand(
+ relation: DataSourceV2Relation,
+ override val table: FileStoreTable,
+ condition: Expression,
+ assignments: Seq[Assignment])
extends PaimonLeafRunnableCommand
- with AssignmentAlignmentHelper {
+ with PaimonCommand
+ with AssignmentAlignmentHelper
+ with SupportsSubquery {
- override def run(sparkSession: SparkSession): Seq[Row] = {
+ private lazy val writer = PaimonSparkWriter(table)
- val relation = PaimonRelation.getPaimonRelation(u.table)
+ private lazy val updateExpressions = {
+ generateAlignedExpressions(relation.output,
assignments).zip(relation.output).map {
+ case (expr, attr) => Alias(expr, attr.name)()
+ }
+ }
- val updatedExprs: Seq[Alias] =
- generateAlignedExpressions(relation.output,
u.assignments).zip(relation.output).map {
- case (expr, attr) => Alias(expr, attr.name)()
- }
+ override def run(sparkSession: SparkSession): Seq[Row] = {
- val updatedPlan = Project(updatedExprs,
Filter(u.condition.getOrElse(TrueLiteral), relation))
+ val commitMessages = if (withPrimaryKeys) {
+ performUpdateForPkTable(sparkSession)
+ } else {
+ performUpdateForNonPkTable(sparkSession)
+ }
+ writer.commit(commitMessages)
+ Seq.empty[Row]
+ }
+
+ /** Update for table with primary keys */
+ private def performUpdateForPkTable(sparkSession: SparkSession):
Seq[CommitMessage] = {
+ val updatedPlan = Project(updateExpressions, Filter(condition, relation))
val df = createDataset(sparkSession, updatedPlan)
.withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue))
+ writer.write(df)
+ }
- WriteIntoPaimonTable(
-
relation.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable],
- InsertInto,
- df,
- Options.fromMap(relation.options)).run(sparkSession)
+ /** Update for table without primary keys */
+ private def performUpdateForNonPkTable(sparkSession: SparkSession):
Seq[CommitMessage] = {
+ // Step1: the candidate data splits which are filtered by Paimon Predicate.
+ val candidateDataSplits = findCandidateDataSplits()
- Seq.empty[Row]
+ val commitMessages = if (candidateDataSplits.isEmpty) {
+ // no data spilt need to be rewrote
+ logDebug("No file need to rerote. It's an empty Commit.")
+ Seq.empty[CommitMessage]
+ } else {
+ import sparkSession.implicits._
+
+ // Step2: extract out the exactly files, which must contain record to be
updated.
+ val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
+ val filteredRelation =
+ Filter(condition, DataSourceV2ScanRelation(relation, scan,
relation.output))
+ val touchedFilePaths = createDataset(sparkSession, filteredRelation)
+ .select(input_file_name())
+ .distinct()
+ .as[String]
+ .collect()
+ .map(relativePath)
+
+ // Step3: build a new list of data splits which compose of those files.
+ // Those are expected to be the smallest range of data files that need
to be rewritten.
+ val totalBuckets = table.coreOptions().bucket()
+ val candidateDataFiles = candidateDataSplits
+ .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit,
totalBuckets))
+ val fileStorePathFactory = table.store().pathFactory()
+ val fileNameToMeta =
+ candidateDataFiles
+ .map(file => (file.relativePath(fileStorePathFactory), file))
+ .toMap
+ val touchedFiles: Array[SparkDataFileMeta] = touchedFilePaths.map {
+ file => fileNameToMeta.getOrElse(file, throw new
RuntimeException(s"Missing file: $file"))
+ }
+ val touchedDataSplits =
SparkDataFileMeta.convertToDataSplits(touchedFiles)
+
+ // Step4: build a dataframe that contains the unchanged and updated
data, and write out them.
+ val columns = updateExpressions.zip(relation.output).map {
+ case (update, origin) =>
+ val updated = if (condition == TrueLiteral) {
+ update
+ } else {
+ If(condition, update, origin)
+ }
+ new Column(updated).as(origin.name, origin.metadata)
+ }
+ val toUpdateScanRelation = DataSourceV2ScanRelation(
+ relation,
+ PaimonSplitScan(table, touchedDataSplits),
+ relation.output)
+ val data = createDataset(sparkSession,
toUpdateScanRelation).select(columns: _*)
+ val addCommitMessage = writer.write(data)
+
+ // Step5: convert the files that need to be wrote to commit message.
+ val deletedCommitMessage = touchedFiles
+ .groupBy(f => (f.partition, f.bucket))
+ .map {
+ case ((partition, bucket), files) =>
+ val bb = files.map(_.dataFileMeta).toList.asJava
+ val newFilesIncrement = new DataIncrement(
+ Collections.emptyList[DataFileMeta],
+ bb,
+ Collections.emptyList[DataFileMeta])
+ buildCommitMessage(
+ new CommitMessageImpl(partition, bucket, newFilesIncrement,
null, null))
+ }
+ .toSeq
+
+ addCommitMessage ++ deletedCommitMessage
+ }
+ commitMessages
+ }
+
+ private def findCandidateDataSplits(): Seq[DataSplit] = {
+ val snapshotReader = table.newSnapshotReader()
+ if (condition == TrueLiteral) {
+ val filter = convertConditionToPaimonPredicate(condition,
relation.output, rowType)
+ snapshotReader.withFilter(filter)
+ }
+
+ snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
+ }
+
+ /** Gets a relative path against the table path. */
+ private def relativePath(absolutePath: String): String = {
+ val location = table.location().toUri
+ location.relativize(new URI(absolutePath)).toString
+ }
+
+ private def buildCommitMessage(o: CommitMessageImpl): CommitMessage = {
+ new CommitMessageImpl(
+ o.partition,
+ o.bucket,
+ o.newFilesIncrement,
+ new CompactIncrement(
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta]),
+ new IndexIncrement(Collections.emptyList[IndexFileMeta]));
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
index 55be49498..1d447281e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
@@ -19,9 +19,13 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.RowType
private[spark] trait WithFileStoreTable {
def table: FileStoreTable
+ def withPrimaryKeys: Boolean = !table.primaryKeys().isEmpty
+
+ def rowType: RowType = table.rowType()
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index b3077eb8a..905c9cdfb 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -39,7 +39,6 @@ case class WriteIntoPaimonTable(
options: Options)
extends RunnableCommand
with PaimonCommand
- with PaimonSparkWriter
with SchemaHelper
with Logging {
@@ -57,20 +56,12 @@ case class WriteIntoPaimonTable(
updateTableWithOptions(
Map(DYNAMIC_PARTITION_OVERWRITE.key ->
dynamicPartitionOverwriteMode.toString))
- val writeBuilder = table.newBatchWriteBuilder()
+ val writer = PaimonSparkWriter(table)
if (overwritePartition != null) {
- writeBuilder.withOverwrite(overwritePartition.asJava)
- }
-
- val commitMessages = write(data, writeBuilder)
- val tableCommit = writeBuilder.newCommit()
- try {
- tableCommit.commit(commitMessages.toList.asJava)
- } catch {
- case e: Throwable => throw new RuntimeException(e);
- } finally {
- tableCommit.close()
+ writer.writeBuilder.withOverwrite(overwritePartition.asJava)
}
+ val commitMessages = writer.write(data)
+ writer.commit(commitMessages)
Seq.empty
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index a853bcb3b..bf8e72f79 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -618,7 +618,7 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase {
|THEN INSERT (a, b, c) values (a, b, c)
|""".stripMargin)
}.getMessage
- assert(error.contains("Only support to merge into table with primary
keys."))
+ assert(error.contains("Only support to MergeInto table with primary
keys."))
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index 65261f3de..cc95e7a90 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -26,15 +26,86 @@ import org.assertj.core.api.Assertions.{assertThat,
assertThatThrownBy}
class UpdateTableTest extends PaimonSparkTestBase {
- test(s"test update append only table") {
+ import testImplicits._
+
+ test(s"Paimon Update: append-only table") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING)
|""".stripMargin)
- spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'),
(4, 'd', '2025')
+ |""".stripMargin)
+
+ spark.sql("UPDATE T SET name = 'a_new' WHERE id = 1")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a_new", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d",
"2025")).toDF()
+ )
+
+ val snapshotManager = loadTable("T").snapshotManager()
+ var lastSnapshotId = snapshotManager.latestSnapshotId()
+ spark.sql("UPDATE T SET name = concat(name, '2') WHERE id % 2 == 0")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a_new", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d2",
"2025")).toDF()
+ )
+ assertThat(lastSnapshotId +
1).isEqualTo(snapshotManager.latestSnapshotId())
+
+ lastSnapshotId = snapshotManager.latestSnapshotId()
+ spark.sql("UPDATE T SET name = 'empty_commit' WHERE id > 100")
+ // no data need to be updated, it's an empty commit.
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a_new", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d2",
"2025")).toDF()
+ )
+ assertThat(lastSnapshotId).isEqualTo(snapshotManager.latestSnapshotId())
+ }
+
+ test(s"Paimon Update: append-only table with partition") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED
BY (dt)
+ |""".stripMargin)
+
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'),
(4, 'd', '2025')
+ |""".stripMargin)
+
+ spark.sql("UPDATE T SET name = concat(name, '2') WHERE dt <= '2024'")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a2", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d",
"2025")).toDF()
+ )
+
+ spark.sql("UPDATE T SET name = concat(name, '3') WHERE dt = '2025' and id
% 2 == 1")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a2", "2024"), (2, "b2", "2024"), (3, "c3", "2025"), (4, "d",
"2025")).toDF()
+ )
+
+ spark.sql("UPDATE T SET name = concat(name, '4') WHERE id % 2 == 0")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a2", "2024"), (2, "b24", "2024"), (3, "c3", "2025"), (4, "d4",
"2025")).toDF()
+ )
+ }
+
+ test("Paimon Update: append-only table, condition contains subquery") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED
BY (dt)
+ |""".stripMargin)
+
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'),
(4, 'd', '2025')
+ |""".stripMargin)
- assertThatThrownBy(() => spark.sql("UPDATE T SET name = 'a_new' WHERE id =
1"))
- .hasMessageContaining("Only support to update table with primary keys.")
+ Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
+ assertThatThrownBy(
+ () => spark.sql("UPDATE T set name = 'in_new' WHERE id IN (SELECT * FROM
updated_ids)"))
+ .hasMessageContaining("Subqueries are not supported")
}
CoreOptions.MergeEngine.values().foreach {