aokolnychyi commented on a change in pull request #4047:
URL: https://github.com/apache/iceberg/pull/4047#discussion_r806228530
##########
File path:
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends
RewriteRowLevelCommand {
ReplaceData(writeRelation, mergeRows, relation)
}
+ // build a rewrite plan for sources that support row deltas
+ private def buildWriteDeltaPlan(
+ relation: DataSourceV2Relation,
+ operationTable: RowLevelOperationTable,
+ source: LogicalPlan,
+ cond: Expression,
+ matchedActions: Seq[MergeAction],
+ notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+ // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+ val rowAttrs = relation.output
+ val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+ val metadataAttrs = resolveRequiredMetadataAttrs(relation,
operationTable.operation)
+
+ // construct a scan relation and include all required metadata columns
+ val readRelation = buildReadRelation(relation, operationTable,
metadataAttrs, rowIdAttrs)
+ val readAttrs = readRelation.output
+
+ // project an extra column to check if a target row exists after the join
+ val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral,
ROW_FROM_TARGET)()
+ val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+ // project an extra column to check if a source row exists after the join
+ val sourceTableProjExprs = source.output :+ Alias(TrueLiteral,
ROW_FROM_SOURCE)()
+ val sourceTableProj = Project(sourceTableProjExprs, source)
+
+ // use inner join if there is no NOT MATCHED action, unmatched source rows
can be discarded
+ // use right outer join in all other cases, unmatched source rows may be
needed
+ // also disable broadcasts for the target table to perform the cardinality
check later
+ val joinType = if (notMatchedActions.isEmpty) Inner else RightOuter
+ val joinHint = JoinHint(leftHint =
Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
+ val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj,
joinType, Some(cond), joinHint)
+
+ val deleteRowValues = buildDeltaDeleteRowValues(rowAttrs, rowIdAttrs)
+ val metadataReadAttrs = readAttrs.filterNot(relation.outputSet.contains)
+
+ val matchedConditions = matchedActions.map(actionCondition)
+ val matchedOutputs = matchedActions.map { action =>
Review comment:
I like it on a single line too. It is just the line limit in Spark is
100 (instead of 120 in Iceberg) and I copied the code from there. I'll make it
consistent in Iceberg, at least.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void
testRangePositionDeltaUpdatePartitionedTable() {
table, UPDATE, expectedDistribution,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
}
+ //
==================================================================================
+ // Distribution and ordering for merge-on-read MERGE operations with
position deletes
+ //
==================================================================================
+ //
+ // UNPARTITIONED UNORDERED
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
Review comment:
Correct, I just did not want to duplicate all possible values of the
write property. If merge mode is not set, use the write mode and adapt it to
cover deletes.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void
testRangePositionDeltaUpdatePartitionedTable() {
table, UPDATE, expectedDistribution,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
}
+ //
==================================================================================
+ // Distribution and ordering for merge-on-read MERGE operations with
position deletes
+ //
==================================================================================
+ //
+ // UNPARTITIONED UNORDERED
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
Review comment:
Correct, I just did not want to duplicate all possible values of the
write property. If the merge mode is not set, use the write mode and adapt it
to cover deletes.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void
testRangePositionDeltaUpdatePartitionedTable() {
table, UPDATE, expectedDistribution,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
}
+ //
==================================================================================
+ // Distribution and ordering for merge-on-read MERGE operations with
position deletes
+ //
==================================================================================
+ //
+ // UNPARTITIONED UNORDERED
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
+ // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ //
+ // UNPARTITIONED ORDERED BY id, data
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
+ // merge mode is NONE -> unspecified distribution +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos,
id, data
Review comment:
Good idea, will add a comment.
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void
testRangePositionDeltaUpdatePartitionedTable() {
table, UPDATE, expectedDistribution,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
}
+ //
==================================================================================
+ // Distribution and ordering for merge-on-read MERGE operations with
position deletes
+ //
==================================================================================
+ //
+ // UNPARTITIONED UNORDERED
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
+ // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ //
+ // UNPARTITIONED ORDERED BY id, data
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
+ // merge mode is NONE -> unspecified distribution +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos,
id, data
+ // merge mode is HASH -> unspecified distribution +
Review comment:
Well, I am not sure. I like that our merge and write logic are
consistent right now. My hope was that AQE would coalesce tasks as needed to
avoid a huge number of small writing tasks (and hence a huge number of delete
files). I think AQE should behave better than a round-robin distribution. This
case is about unpartitioned tables so we will most like produce at a single
delete file per writing task (that shouldn't be that bad). As long as we don't
have a huge number of writing tasks, we should be fine, I guess?
##########
File path:
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void
testRangePositionDeltaUpdatePartitionedTable() {
table, UPDATE, expectedDistribution,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
}
+ //
==================================================================================
+ // Distribution and ordering for merge-on-read MERGE operations with
position deletes
+ //
==================================================================================
+ //
+ // UNPARTITIONED UNORDERED
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
+ // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY
_spec_id, _partition, _file, _pos
+ //
+ // UNPARTITIONED ORDERED BY id, data
+ // -------------------------------------------------------------------------
+ // merge mode is NOT SET -> rely on write distribution and ordering as a
basis
+ // merge mode is NONE -> unspecified distribution +
+ // LOCALLY ORDER BY _spec_id, _partition, _file, _pos,
id, data
+ // merge mode is HASH -> unspecified distribution +
Review comment:
Well, I am not sure. I like that our merge and write logic are
consistent right now. My hope was that AQE would coalesce tasks as needed to
avoid a huge number of small writing tasks (and hence a huge number of delete
files). I think AQE should behave better than a round-robin distribution. This
case is about unpartitioned tables so we will most likely produce at a single
delete file per writing task (that shouldn't be that bad). As long as we don't
have a huge number of writing tasks, we should be fine, I guess?
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
##########
@@ -84,17 +86,22 @@ public void testMergeWithStaticPredicatePushDown() {
sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
- append(tableName,
- "{ \"id\": 1, \"dep\": \"software\" }\n" +
- "{ \"id\": 11, \"dep\": \"software\" }\n" +
- "{ \"id\": 1, \"dep\": \"hr\" }");
+ // add a data file to the 'software' partition
+ append(tableName, "{ \"id\": 1, \"dep\": \"software\" }");
+
+ // add a data file to the 'hr' partition
+ append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }");
Table table = validationCatalog.loadTable(tableIdent);
Snapshot snapshot = table.currentSnapshot();
String dataFilesCount =
snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount);
+ // remove the data file from the 'hr' partition to ensure it is not scanned
+ DataFile dataFile = Iterables.getOnlyElement(snapshot.addedFiles());
+ table.io().deleteFile(dataFile.path().toString());
Review comment:
Yeah, this will be helpful for future use cases. Let me spend some time
on this one now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]