rdblue commented on a change in pull request #4047:
URL: https://github.com/apache/iceberg/pull/4047#discussion_r805426755



##########
File path: 
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelCommand {
     ReplaceData(writeRelation, mergeRows, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      source: LogicalPlan,
+      cond: Expression,
+      matchedActions: Seq[MergeAction],
+      notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, operationTable, 
metadataAttrs, rowIdAttrs)
+    val readAttrs = readRelation.output
+
+    // project an extra column to check if a target row exists after the join
+    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, 
ROW_FROM_TARGET)()
+    val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+    // project an extra column to check if a source row exists after the join
+    val sourceTableProjExprs = source.output :+ Alias(TrueLiteral, 
ROW_FROM_SOURCE)()
+    val sourceTableProj = Project(sourceTableProjExprs, source)
+
+    // use inner join if there is no NOT MATCHED action, unmatched source rows 
can be discarded
+    // use right outer join in all other cases, unmatched source rows may be 
needed
+    // also disable broadcasts for the target table to perform the cardinality 
check later
+    val joinType = if (notMatchedActions.isEmpty) Inner else RightOuter
+    val joinHint = JoinHint(leftHint = 
Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, 
joinType, Some(cond), joinHint)
+
+    val deleteRowValues = buildDeltaDeleteRowValues(rowAttrs, rowIdAttrs)
+    val metadataReadAttrs = readAttrs.filterNot(relation.outputSet.contains)
+
+    val matchedConditions = matchedActions.map(actionCondition)
+    val matchedOutputs = matchedActions.map { action =>

Review comment:
       Nit: in the replace code, this is a one-liner. You could do the same 
thing here:
   
   ```scala
       val matchedOutputs = matchedActions.map(deltaActionOutput(_, 
deleteRowValues, metadataReadAttrs))
   ```
   
   Not a big deal, I'm just going through both and looking for changes so this 
looked a little odd. Same thing with `notMatchedOutputs`.

##########
File path: 
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelCommand {
     ReplaceData(writeRelation, mergeRows, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      source: LogicalPlan,
+      cond: Expression,
+      matchedActions: Seq[MergeAction],
+      notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, operationTable, 
metadataAttrs, rowIdAttrs)
+    val readAttrs = readRelation.output
+
+    // project an extra column to check if a target row exists after the join
+    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, 
ROW_FROM_TARGET)()
+    val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+    // project an extra column to check if a source row exists after the join
+    val sourceTableProjExprs = source.output :+ Alias(TrueLiteral, 
ROW_FROM_SOURCE)()
+    val sourceTableProj = Project(sourceTableProjExprs, source)
+
+    // use inner join if there is no NOT MATCHED action, unmatched source rows 
can be discarded
+    // use right outer join in all other cases, unmatched source rows may be 
needed
+    // also disable broadcasts for the target table to perform the cardinality 
check later
+    val joinType = if (notMatchedActions.isEmpty) Inner else RightOuter
+    val joinHint = JoinHint(leftHint = 
Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, 
joinType, Some(cond), joinHint)
+
+    val deleteRowValues = buildDeltaDeleteRowValues(rowAttrs, rowIdAttrs)
+    val metadataReadAttrs = readAttrs.filterNot(relation.outputSet.contains)
+
+    val matchedConditions = matchedActions.map(actionCondition)
+    val matchedOutputs = matchedActions.map { action =>
+      deltaActionOutput(action, deleteRowValues, metadataReadAttrs)
+    }
+
+    val notMatchedConditions = notMatchedActions.map(actionCondition)
+    val notMatchedOutputs = notMatchedActions.map { action =>
+      deltaActionOutput(action, deleteRowValues, metadataReadAttrs)
+    }
+
+    val operationTypeAttr = AttributeReference(OPERATION_COLUMN, IntegerType, 
nullable = false)()
+    val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE_REF, joinPlan)
+    val rowFromTargetAttr = resolveAttrRef(ROW_FROM_TARGET_REF, joinPlan)
+
+    // merged rows must contain values for the operation type and all read 
attrs
+    val mergeRowsOutput = buildMergeRowsOutput(
+      matchedOutputs,
+      notMatchedOutputs,
+      attrs = operationTypeAttr +: readAttrs)

Review comment:
       This also fits inline if you want to match the replace code. (minor)

##########
File path: 
spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
##########
@@ -226,6 +231,82 @@ object RewriteMergeIntoTable extends 
RewriteRowLevelCommand {
     ReplaceData(writeRelation, mergeRows, relation)
   }
 
+  // build a rewrite plan for sources that support row deltas
+  private def buildWriteDeltaPlan(
+      relation: DataSourceV2Relation,
+      operationTable: RowLevelOperationTable,
+      source: LogicalPlan,
+      cond: Expression,
+      matchedActions: Seq[MergeAction],
+      notMatchedActions: Seq[MergeAction]): WriteDelta = {
+
+    // resolve all needed attrs (e.g. row ID and any required metadata attrs)
+    val rowAttrs = relation.output
+    val rowIdAttrs = resolveRowIdAttrs(relation, operationTable.operation)
+    val metadataAttrs = resolveRequiredMetadataAttrs(relation, 
operationTable.operation)
+
+    // construct a scan relation and include all required metadata columns
+    val readRelation = buildReadRelation(relation, operationTable, 
metadataAttrs, rowIdAttrs)
+    val readAttrs = readRelation.output
+
+    // project an extra column to check if a target row exists after the join
+    val targetTableProjExprs = readAttrs :+ Alias(TrueLiteral, 
ROW_FROM_TARGET)()
+    val targetTableProj = Project(targetTableProjExprs, readRelation)
+
+    // project an extra column to check if a source row exists after the join
+    val sourceTableProjExprs = source.output :+ Alias(TrueLiteral, 
ROW_FROM_SOURCE)()
+    val sourceTableProj = Project(sourceTableProjExprs, source)
+
+    // use inner join if there is no NOT MATCHED action, unmatched source rows 
can be discarded
+    // use right outer join in all other cases, unmatched source rows may be 
needed
+    // also disable broadcasts for the target table to perform the cardinality 
check later
+    val joinType = if (notMatchedActions.isEmpty) Inner else RightOuter
+    val joinHint = JoinHint(leftHint = 
Some(HintInfo(Some(NO_BROADCAST_HASH))), rightHint = None)
+    val joinPlan = Join(NoStatsUnaryNode(targetTableProj), sourceTableProj, 
joinType, Some(cond), joinHint)
+
+    val deleteRowValues = buildDeltaDeleteRowValues(rowAttrs, rowIdAttrs)
+    val metadataReadAttrs = readAttrs.filterNot(relation.outputSet.contains)
+
+    val matchedConditions = matchedActions.map(actionCondition)
+    val matchedOutputs = matchedActions.map { action =>
+      deltaActionOutput(action, deleteRowValues, metadataReadAttrs)
+    }
+
+    val notMatchedConditions = notMatchedActions.map(actionCondition)
+    val notMatchedOutputs = notMatchedActions.map { action =>
+      deltaActionOutput(action, deleteRowValues, metadataReadAttrs)
+    }
+
+    val operationTypeAttr = AttributeReference(OPERATION_COLUMN, IntegerType, 
nullable = false)()
+    val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE_REF, joinPlan)
+    val rowFromTargetAttr = resolveAttrRef(ROW_FROM_TARGET_REF, joinPlan)
+
+    // merged rows must contain values for the operation type and all read 
attrs
+    val mergeRowsOutput = buildMergeRowsOutput(
+      matchedOutputs,
+      notMatchedOutputs,
+      attrs = operationTypeAttr +: readAttrs)
+
+    val mergeRows = MergeRows(
+      isSourceRowPresent = IsNotNull(rowFromSourceAttr),
+      isTargetRowPresent = if (notMatchedActions.isEmpty) TrueLiteral else 
IsNotNull(rowFromTargetAttr),
+      matchedConditions = matchedConditions,
+      matchedOutputs = matchedOutputs,
+      notMatchedConditions = notMatchedConditions,
+      notMatchedOutputs = notMatchedOutputs,
+      targetOutput = Nil,

Review comment:
       Would be nice to have a comment to explain this: "only needed if 
emitting unmatched target rows"

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
##########
@@ -84,17 +86,22 @@ public void testMergeWithStaticPredicatePushDown() {
 
     sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
 
-    append(tableName,
-        "{ \"id\": 1, \"dep\": \"software\" }\n" +
-        "{ \"id\": 11, \"dep\": \"software\" }\n" +
-        "{ \"id\": 1, \"dep\": \"hr\" }");
+    // add a data file to the 'software' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"software\" }");
+
+    // add a data file to the 'hr' partition
+    append(tableName, "{ \"id\": 1, \"dep\": \"hr\" }");
 
     Table table = validationCatalog.loadTable(tableIdent);
 
     Snapshot snapshot = table.currentSnapshot();
     String dataFilesCount = 
snapshot.summary().get(SnapshotSummary.TOTAL_DATA_FILES_PROP);
     Assert.assertEquals("Must have 2 files before MERGE", "2", dataFilesCount);
 
+    // remove the data file from the 'hr' partition to ensure it is not scanned
+    DataFile dataFile = Iterables.getOnlyElement(snapshot.addedFiles());
+    table.io().deleteFile(dataFile.path().toString());

Review comment:
       Could you move it and move it back so that you don't remove the 
validation at the end of the test? Something like...
   
   ```java
     
withFilesUnavailable(Iterables.getOnlyElement(snapshot.addedFiles()).path().toString())
 {
       createOrReplaceView(...);
       withSQLConf(...) { sql("MERGE INTO ...") };
     }
   ```
   
   You could move the files to a temp location, then copy them back at the end.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis

Review comment:
       I think by this you mean that we will check the write distribution mode 
and then use one of the following cases, right? So we can expect this to be 
`unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos` 
because all 3 modes (whether merge or write mode) have that behavior?

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  //
+  // UNPARTITIONED ORDERED BY id, data
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data

Review comment:
       I think it may be worth an additional comment at the top of this section 
that says: `IMPORTANT: metadata columns like _spec_id and _partition are null 
for new rows`.
   
   I think what this means is:
   * Deletes and updates will be clustered by original partition and _file, and 
additionally sorted by _pos
   * Inserts will effectively be distributed by id, data and sorted by id, data
   
   This holds roughly true for all of the cases.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  //
+  // UNPARTITIONED ORDERED BY id, data
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data

Review comment:
       I think it may be worth an additional comment at the top of this section 
that says: `IMPORTANT: metadata columns like _spec_id and _partition are null 
for new rows`.
   
   I think what this means is:
   * Deletes and updates will be clustered by original partition and _file, and 
additionally sorted by _pos
   * Inserts will effectively be distributed by id, data and sorted by id, data
   
   This holds roughly true for all of the partitioning and sort order cases.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  //
+  // UNPARTITIONED ORDERED BY id, data
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data
+  // merge mode is HASH -> unspecified distribution +

Review comment:
       Why is the distribution unspecified when mode is hash here? Shouldn't 
this hash distribute by original data file (or maybe original partition), then 
by the new partition? Or is the fear that this will create too many small tasks 
by dividing original data files by the new partitioning?
   
   If that's the case, then it seems like this is optimizing for the case where 
you're running a MERGE with data from an old partition spec. I'd rather 
optimize for the case where the partition spec matches.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  //
+  // UNPARTITIONED ORDERED BY id, data
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data
+  // merge mode is HASH -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data
+  // merge mode is RANGE -> RANGE DISTRIBUTE BY _spec_id, _partition, _file, 
id, data +
+  //                        LOCALLY ORDER BY _spec_id, _partition, _file, 
_pos, id, data
+  //
+  // PARTITIONED BY date, days(ts) UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDERED BY _spec_id, _partition, _file, 
_pos, date, days(ts)
+  // merge mode is HASH -> CLUSTER BY _spec_id, _partition, date, days(ts) +

Review comment:
       I just thought about this for the case above. I think it's okay.

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  //
+  // UNPARTITIONED ORDERED BY id, data
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data
+  // merge mode is HASH -> unspecified distribution +

Review comment:
       Oh, I think I see. I was thinking about the `PARTITIONED BY, UNORDERED` 
case that is actually below. I concluded what you did for that case, so that's 
good validation!
   
   Here, it still seems bad to me not to distribute. That's going to result in 
a lot of small delete files, which is really expensive and possibly worse than 
having a single writer for all the inserted data. It would be nice to be able 
to round-robin the new data... what about using something like `HASH DISTRIBUTE 
BY _spec, _partition, bucket(id, data, numShufflePartitions)`?

##########
File path: 
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1567,6 +1567,370 @@ public void 
testRangePositionDeltaUpdatePartitionedTable() {
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
   }
 
+  // 
==================================================================================
+  // Distribution and ordering for merge-on-read MERGE operations with 
position deletes
+  // 
==================================================================================
+  //
+  // UNPARTITIONED UNORDERED
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is HASH -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  // merge mode is RANGE -> unspecified distribution + LOCALLY ORDER BY 
_spec_id, _partition, _file, _pos
+  //
+  // UNPARTITIONED ORDERED BY id, data
+  // -------------------------------------------------------------------------
+  // merge mode is NOT SET -> rely on write distribution and ordering as a 
basis
+  // merge mode is NONE -> unspecified distribution +
+  //                       LOCALLY ORDER BY _spec_id, _partition, _file, _pos, 
id, data
+  // merge mode is HASH -> unspecified distribution +

Review comment:
       ~~Why is the distribution unspecified when mode is hash here? Shouldn't 
this hash distribute by original data file (or maybe original partition), then 
by the new partition? Or is the fear that this will create too many small tasks 
by dividing original data files by the new partitioning?~~
   
   ~~If that's the case, then it seems like this is optimizing for the case 
where you're running a MERGE with data from an old partition spec. I'd rather 
optimize for the case where the partition spec matches.~~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to