RussellSpitzer commented on a change in pull request #2829:
URL: https://github.com/apache/iceberg/pull/2829#discussion_r706502243
##########
File path:
spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -149,7 +168,13 @@ public RewriteDataFiles filter(Expression expression) {
try {
Map<StructLike, List<FileScanTask>> filesByPartition =
Streams.stream(fileScanTasks)
- .collect(Collectors.groupingBy(task -> task.file().partition()));
+ .collect(Collectors.groupingBy(task -> {
+ if (task.file().specId() == table.spec().specId()) {
+ return task.file().partition();
+ } else {
+ return EmptyStruct.get();
Review comment:
Trying something like
```
StructLikeMap<List<FileScanTask>> filesByPartition =
StructLikeMap.create(table.spec().partitionType());
StructLike emptyStruct = GenericRecord.create(table.schema());
fileScanTasks.forEach(task -> {
/*
If a task uses an incompatible partition spec the data inside could
contain values which
belong to multiple partitions in the current spec. Treating all such
files as un-partitioned and
grouping them together helps to minimize new files made.
*/
StructLike taskPartition = task.file().specId() ==
table.spec().specId() ?
task.file().partition() : emptyStruct;
List<FileScanTask> files = filesByPartition.get(taskPartition);
if (files == null) {
files = Lists.newArrayList();
}
files.add(task);
filesByPartition.put(taskPartition, files);
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]