rajarshisarkar commented on a change in pull request #4377:
URL: https://github.com/apache/iceberg/pull/4377#discussion_r834944232
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -339,19 +342,25 @@ private Result
doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<
private Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
Map<StructLike,
List<List<FileScanTask>>> fileGroupsByPartition) {
+ PriorityQueue<RewriteFileGroup> rewriteFileGroups = new
PriorityQueue<>(comparator());
Review comment:
Thanks, I have made the changes.
##########
File path:
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java
##########
@@ -339,19 +342,25 @@ private Result
doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<
private Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
Map<StructLike,
List<List<FileScanTask>>> fileGroupsByPartition) {
+ PriorityQueue<RewriteFileGroup> rewriteFileGroups = new
PriorityQueue<>(comparator());
- // Todo Add intelligence to the order in which we do rewrites instead of
just using partition order
- return fileGroupsByPartition.entrySet().stream()
- .flatMap(e -> {
- StructLike partition = e.getKey();
- List<List<FileScanTask>> fileGroups = e.getValue();
- return fileGroups.stream().map(tasks -> {
- int globalIndex = ctx.currentGlobalIndex();
- int partitionIndex = ctx.currentPartitionIndex(partition);
- FileGroupInfo info = new
BaseRewriteDataFilesFileGroupInfo(globalIndex, partitionIndex, partition);
- return new RewriteFileGroup(info, tasks);
- });
- });
+ fileGroupsByPartition.forEach((partition, fileGroups) ->
fileGroups.forEach(tasks -> {
+ int globalIndex = ctx.currentGlobalIndex();
+ int partitionIndex = ctx.currentPartitionIndex(partition);
+ FileGroupInfo info = new BaseRewriteDataFilesFileGroupInfo(globalIndex,
partitionIndex, partition);
+ rewriteFileGroups.offer(new RewriteFileGroup(info, tasks));
+ }));
+
+ return rewriteFileGroups.stream();
+ }
+
+ private Comparator<RewriteFileGroup> comparator() {
Review comment:
Thanks, I have renamed it as `rewriteGroupComparator()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]