aokolnychyi commented on a change in pull request #2132:
URL: https://github.com/apache/iceberg/pull/2132#discussion_r562257851
##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -170,6 +170,6 @@ private TableProperties() {
public static final String MERGE_MODE = "write.merge.mode";
public static final String MERGE_MODE_DEFAULT = "copy-on-write";
- public static final String MERGE_WRITE_CARDINALITY_CHECK =
"write.merge.cardinality-check.enabled";
- public static final boolean MERGE_WRITE_CARDINALITY_CHECK_DEFAULT = true;
+ public static final String MERGE_CARDINALITY_CHECK_ENABLED =
"write.merge.cardinality-check.enabled";
Review comment:
To match the property name above.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -104,7 +103,7 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan,
cond, matchedActions, notMatchedActions)
if notMatchedActions.isEmpty =>
- val (mergeBuilder, targetTableScan) =
buildDynamicFilterTargetScan(target, source, cond, matchedActions)
+ val mergeBuilder = target.table.asMergeable.newMergeBuilder("merge",
newWriteInfo(target.schema))
Review comment:
I think we better move the merge builder creation from
`buildDynamicFilterTargetScan`.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
##########
@@ -133,16 +133,17 @@ case class RewriteMergeInto(spark: SparkSession) extends
Rule[LogicalPlan] with
case MergeIntoTable(target: DataSourceV2Relation, source: LogicalPlan,
cond, matchedActions, notMatchedActions) =>
- val (mergeBuilder, targetTableScan) =
buildDynamicFilterTargetScan(target, source, cond, matchedActions)
+ val mergeBuilder = target.table.asMergeable.newMergeBuilder("merge",
newWriteInfo(target.schema))
// rewrite the matched actions to ensure there is always an action to
produce the output row
val (matchedConditions, matchedOutputs) =
rewriteMatchedActions(matchedActions, target.output)
// use a full outer join because there are both matched and not
matched actions
val sourceTableProj = source.output ++ Seq(Alias(TRUE_LITERAL,
ROW_FROM_SOURCE)())
+ val newSourceTableScan = Project(sourceTableProj, source)
+ val targetTableScan = buildDynamicFilterTargetScan(mergeBuilder,
target, source, cond, matchedActions)
val targetTableProj = targetTableScan.output ++
Seq(Alias(TRUE_LITERAL, ROW_FROM_TARGET)())
val newTargetTableScan = Project(targetTableProj, targetTableScan)
- val newSourceTableScan = Project(sourceTableProj, source)
Review comment:
Grouped source and target plans.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DynamicFileFilter.scala
##########
@@ -42,12 +42,11 @@ case class DynamicFileFilter(
}
}
-case class DynamicFileFilterWithCountCheck(
+case class DynamicFileFilterWithCardinalityCheck(
Review comment:
I think `WithCardinalityCheck` is slightly better.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
##########
@@ -87,7 +87,8 @@ trait RewriteRowLevelOperationHelper extends PredicateHelper
with Logging {
protected val FILE_NAME_COL = "_file"
protected val ROW_POS_COL = "_pos"
- protected val AFFECTED_FILES_ACC_NAME = "affectedFiles"
+ // `internal.metrics` prefix ensures the accumulator state is not tracked by
Spark UI
+ protected val AFFECTED_FILES_ACC_NAME =
"internal.metrics.merge.affectedFiles"
Review comment:
I guess this one is important.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -67,21 +66,21 @@ case class DynamicFileFilterExec(
}
}
-case class DynamicFileFilterWithCountCheckExec(
+case class DynamicFileFilterWithCardinalityCheckExec(
scanExec: SparkPlan,
fileFilterExec: SparkPlan,
@transient filterable: SupportsFileFilter,
- filesAccumulator: SetAccumulator[String],
- @transient targetTableName: String)
- extends DynamicFileFilterExecBase(scanExec, fileFilterExec, filterable) {
+ filesAccumulator: SetAccumulator[String])
+ extends DynamicFileFilterExecBase(scanExec, fileFilterExec) {
override protected def doPrepare(): Unit = {
val rows = fileFilterExec.executeCollect()
- if (rows.size > 0) {
- val msg =
- s"""The same row of target table `$targetTableName` was identified
more than
- | once for an update, delete or insert operation of the MERGE
statement.""".stripMargin
- throw new SparkException(msg)
+ if (rows.length > 0) {
+ throw new SparkException(
+ "The ON search condition of the MERGE statement matched a single row
from " +
Review comment:
From DB2
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]