This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b9c9c0069 [CELEBORN-683][SPARK][PERF] Avoid calling `CelebornConf.get` 
multi-time when columnar shuffle wri…
b9c9c0069 is described below

commit b9c9c00697d6f50ccb5231a5b81e3024fd0a99bc
Author: Fu Chen <[email protected]>
AuthorDate: Thu Jun 15 17:52:23 2023 +0800

    [CELEBORN-683][SPARK][PERF] Avoid calling `CelebornConf.get` multi-time 
when columnar shuffle wri…
    
    …te is enabled.
    
    ### What changes were proposed in this pull request?
    
    as title.
    
    ### Why are the changes needed?
    
    frame graph and stage duration before:
    
    ![截屏2023-06-15 下午4 49 
04](https://github.com/apache/incubator-celeborn/assets/8537877/6fe7f7f6-fd36-42ec-a6a1-9a4943022dc8)
    
    ![截屏2023-06-15 下午4 57 
53](https://github.com/apache/incubator-celeborn/assets/8537877/077f6c22-4dc9-497a-affe-ddba9200fe28)
    
    frame graph and stage duration after:
    
    ![截屏2023-06-15 下午4 37 
45](https://github.com/apache/incubator-celeborn/assets/8537877/d6ae7aa6-95c7-490e-a0ae-c110e6a83e5a)
    
    ![截屏2023-06-15 下午4 58 
12](https://github.com/apache/incubator-celeborn/assets/8537877/e8dd5c3b-94d9-47d7-a644-4897acef43ad)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, only perf improvement.
    
    ### How was this patch tested?
    
    tested locally.
    
    Closes #1595 from cfmcgrady/columnar-conf.
    
    Authored-by: Fu Chen <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../shuffle/celeborn/HashBasedShuffleWriter.java   | 31 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 7 deletions(-)

diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 496c91ee8..a3a6c02aa 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -102,9 +102,22 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   private StructType schema;
 
-  private boolean isColumnarShuffle = false;
   private final boolean unsafeRowFastWrite;
 
+  // //////////////////////////////////////////////////////
+  //                 Columnar Relate Conf                //
+  // //////////////////////////////////////////////////////
+
+  private boolean isColumnarShuffle = false;
+
+  private int columnarShuffleBatchSize;
+
+  private boolean columnarShuffleCodeGenEnabled;
+
+  private boolean columnarShuffleDictionaryEnabled;
+
+  private double columnarShuffleDictionaryMaxFactor;
+
   // In order to facilitate the writing of unit test code, ShuffleClient needs 
to be passed in as
   // parameters. By the way, simplify the passed parameters.
   public HashBasedShuffleWriter(
@@ -164,6 +177,10 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     }
 
     if (conf.columnarShuffleEnabled()) {
+      columnarShuffleBatchSize = conf.columnarShuffleBatchSize();
+      columnarShuffleCodeGenEnabled = conf.columnarShuffleCodeGenEnabled();
+      columnarShuffleDictionaryEnabled = 
conf.columnarShuffleDictionaryEnabled();
+      columnarShuffleDictionaryMaxFactor = 
conf.columnarShuffleDictionaryMaxFactor();
       this.schema = SparkUtils.getSchema(dep);
       this.rssBatchBuilders = new RssBatchBuilder[numPartitions];
       this.isColumnarShuffle = RssBatchBuilder.supportsColumnarType(schema);
@@ -216,22 +233,22 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
       if (rssBatchBuilders[partitionId] == null) {
         RssBatchBuilder columnBuilders;
-        if (conf.columnarShuffleCodeGenEnabled() && 
!conf.columnarShuffleDictionaryEnabled()) {
+        if (columnarShuffleCodeGenEnabled && 
!columnarShuffleDictionaryEnabled) {
           columnBuilders =
-              new RssColumnarBatchCodeGenBuild().create(schema, 
conf.columnarShuffleBatchSize());
+              new RssColumnarBatchCodeGenBuild().create(schema, 
columnarShuffleBatchSize);
         } else {
           columnBuilders =
               new RssColumnarBatchBuilder(
                   schema,
-                  conf.columnarShuffleBatchSize(),
-                  conf.columnarShuffleDictionaryMaxFactor(),
-                  conf.columnarShuffleDictionaryEnabled());
+                  columnarShuffleBatchSize,
+                  columnarShuffleDictionaryMaxFactor,
+                  columnarShuffleDictionaryEnabled);
         }
         columnBuilders.newBuilders();
         rssBatchBuilders[partitionId] = columnBuilders;
       }
       rssBatchBuilders[partitionId].writeRow(row);
-      if (rssBatchBuilders[partitionId].getRowCnt() >= 
conf.columnarShuffleBatchSize()) {
+      if (rssBatchBuilders[partitionId].getRowCnt() >= 
columnarShuffleBatchSize) {
         byte[] arr = rssBatchBuilders[partitionId].buildColumnBytes();
         pushGiantRecord(partitionId, arr, arr.length);
         if (dataSize != null) {

Reply via email to