This is an automated email from the ASF dual-hosted git repository.
arnabp20 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git
The following commit(s) were added to refs/heads/main by this push:
new 998dd43b1e [MINOR] Add coalesce after matrix indexing in Spark
998dd43b1e is described below
commit 998dd43b1ea32cda7d35f03edf7f4731a2d1fc06
Author: Arnab Phani <[email protected]>
AuthorDate: Thu Apr 27 23:22:47 2023 +0200
[MINOR] Add coalesce after matrix indexing in Spark
Closes #1816
---
.../runtime/instructions/spark/MatrixIndexingSPInstruction.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
index 5d0711b283..3c8583d34c 100644
---
a/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
+++
b/src/main/java/org/apache/sysds/runtime/instructions/spark/MatrixIndexingSPInstruction.java
@@ -244,6 +244,11 @@ public class MatrixIndexingSPInstruction extends
IndexingSPInstruction {
|| OptimizerUtils.isIndexingRangeBlockAligned(ixrange,
mcIn) ) {
out = in1.filter(new IsBlockInRange(ixrange.rowStart,
ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))
.mapToPair(new SliceSingleBlock(ixrange, mcOut));
+ int prefNoutPart =
SparkUtils.getNumPreferredPartitions(mcOut);
+ //determine the need for coalesce
+ boolean coalesce = 1.4*prefNoutPart <
in1.getNumPartitions() && !SparkUtils.isHashPartitioned(in1);
+ if (coalesce) //merge partitions without shuffle
+ out = out.coalesce(prefNoutPart);
}
else {
out = in1.filter(new IsBlockInRange(ixrange.rowStart,
ixrange.rowEnd, ixrange.colStart, ixrange.colEnd, mcOut))