This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b9c9c0069 [CELEBORN-683][SPARK][PERF] Avoid calling `CelebornConf.get`
multi-time when columnar shuffle wri…
b9c9c0069 is described below
commit b9c9c00697d6f50ccb5231a5b81e3024fd0a99bc
Author: Fu Chen <[email protected]>
AuthorDate: Thu Jun 15 17:52:23 2023 +0800
[CELEBORN-683][SPARK][PERF] Avoid calling `CelebornConf.get` multi-time
when columnar shuffle wri…
…te is enabled.
### What changes were proposed in this pull request?
as title.
### Why are the changes needed?
frame graph and stage duration before:


frame graph and stage duration after:


### Does this PR introduce _any_ user-facing change?
No, only perf improvement.
### How was this patch tested?
tested locally.
Closes #1595 from cfmcgrady/columnar-conf.
Authored-by: Fu Chen <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../shuffle/celeborn/HashBasedShuffleWriter.java | 31 +++++++++++++++++-----
1 file changed, 24 insertions(+), 7 deletions(-)
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 496c91ee8..a3a6c02aa 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -102,9 +102,22 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
private StructType schema;
- private boolean isColumnarShuffle = false;
private final boolean unsafeRowFastWrite;
+ // //////////////////////////////////////////////////////
+ // Columnar Relate Conf //
+ // //////////////////////////////////////////////////////
+
+ private boolean isColumnarShuffle = false;
+
+ private int columnarShuffleBatchSize;
+
+ private boolean columnarShuffleCodeGenEnabled;
+
+ private boolean columnarShuffleDictionaryEnabled;
+
+ private double columnarShuffleDictionaryMaxFactor;
+
// In order to facilitate the writing of unit test code, ShuffleClient needs
to be passed in as
// parameters. By the way, simplify the passed parameters.
public HashBasedShuffleWriter(
@@ -164,6 +177,10 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
if (conf.columnarShuffleEnabled()) {
+ columnarShuffleBatchSize = conf.columnarShuffleBatchSize();
+ columnarShuffleCodeGenEnabled = conf.columnarShuffleCodeGenEnabled();
+ columnarShuffleDictionaryEnabled =
conf.columnarShuffleDictionaryEnabled();
+ columnarShuffleDictionaryMaxFactor =
conf.columnarShuffleDictionaryMaxFactor();
this.schema = SparkUtils.getSchema(dep);
this.rssBatchBuilders = new RssBatchBuilder[numPartitions];
this.isColumnarShuffle = RssBatchBuilder.supportsColumnarType(schema);
@@ -216,22 +233,22 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
if (rssBatchBuilders[partitionId] == null) {
RssBatchBuilder columnBuilders;
- if (conf.columnarShuffleCodeGenEnabled() &&
!conf.columnarShuffleDictionaryEnabled()) {
+ if (columnarShuffleCodeGenEnabled &&
!columnarShuffleDictionaryEnabled) {
columnBuilders =
- new RssColumnarBatchCodeGenBuild().create(schema,
conf.columnarShuffleBatchSize());
+ new RssColumnarBatchCodeGenBuild().create(schema,
columnarShuffleBatchSize);
} else {
columnBuilders =
new RssColumnarBatchBuilder(
schema,
- conf.columnarShuffleBatchSize(),
- conf.columnarShuffleDictionaryMaxFactor(),
- conf.columnarShuffleDictionaryEnabled());
+ columnarShuffleBatchSize,
+ columnarShuffleDictionaryMaxFactor,
+ columnarShuffleDictionaryEnabled);
}
columnBuilders.newBuilders();
rssBatchBuilders[partitionId] = columnBuilders;
}
rssBatchBuilders[partitionId].writeRow(row);
- if (rssBatchBuilders[partitionId].getRowCnt() >=
conf.columnarShuffleBatchSize()) {
+ if (rssBatchBuilders[partitionId].getRowCnt() >=
columnarShuffleBatchSize) {
byte[] arr = rssBatchBuilders[partitionId].buildColumnBytes();
pushGiantRecord(partitionId, arr, arr.length);
if (dataSize != null) {