aokolnychyi commented on a change in pull request #2584:
URL: https://github.com/apache/iceberg/pull/2584#discussion_r630659943
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
override def supportsColumnar: Boolean = scanExec.supportsColumnar
- override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
scanExec.executeColumnar()
+ /*
+ If both target and source have the same partitioning we can have a problem
here if our filter exec actually
+ changes the partition. Currently this can only occur in the SinglePartition
distribution is in use which only
Review comment:
`partition` -> `the output partitioning of the node`.
##########
File path:
spark3-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DynamicFileFilterExec.scala
##########
@@ -48,8 +48,32 @@ abstract class DynamicFileFilterExecBase(
override def outputOrdering: Seq[SortOrder] = scanExec.outputOrdering
override def supportsColumnar: Boolean = scanExec.supportsColumnar
- override protected def doExecute(): RDD[InternalRow] = scanExec.execute()
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
scanExec.executeColumnar()
+ /*
+ If both target and source have the same partitioning we can have a problem
here if our filter exec actually
+ changes the partition. Currently this can only occur in the SinglePartition
distribution is in use which only
+ happens if both the target and source have a single partition, but if it
does we have the potential of eliminating
+ the only partition in the target. If there are no partitions in the target
then we will throw an exception because
+ the partitioning was assumed to be the same 1 partition in source and
target. We fix this by making sure that
+ we always return at least 1 empty partition, in the future we may need to
handle more complicated partitioner
+ scenarios.
+ */
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val result = scanExec.execute()
+ if (result.partitions.length == 0) {
Review comment:
Will this break a case when we originally planned 0 tasks? Then the
correct distribution is UnspecifiedDistribution(0) but we will report an RDD
with one partition?
I thought about adding a check like `outputPartitioning == SinglePartition`
but that is not going to work as the partitioning will be current (i.e.
whatever the scan node reports currently, not originally).
Question. Can we approach this problem from a different angle? Say we make
our `SparkMergeScan` report a single empty task after dynamic filtering if we
originally had only 1 partition.
I think it should be sufficient to modify `filterFiles` in the following way:
```
@Override
public void filterFiles(Set<String> locations) {
singlePartitionScan = tasks().size() == 1;
...
}
```
And then `tasks`:
```
if (singlePartitionScan && tasks.isEmpty()) {
tasks = Lists.newArrayList(new
BaseCombinedScanTask(Lists.newArrayList()));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]