advancedxy commented on code in PR #950:
URL: https://github.com/apache/incubator-uniffle/pull/950#discussion_r1229694028


##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java:
##########
@@ -65,7 +65,8 @@
   private int startPartition;
   private int endPartition;
   private TaskContext context;
-  private ShuffleDependency<K, C, ?> shuffleDependency;
+  private ShuffleDependency<K, ?, C> shuffleDependency;

Review Comment:
   👍 



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -141,7 +140,11 @@ public WriteBufferManager(
     this.requireMemoryInterval = 
bufferManagerOptions.getRequireMemoryInterval();
     this.requireMemoryRetryMax = 
bufferManagerOptions.getRequireMemoryRetryMax();
     this.arrayOutputStream = new 
WrappedByteArrayOutputStream(serializerBufferSize);
-    this.serializeStream = instance.serializeStream(arrayOutputStream);
+    // columnar shuffle use the serialized data directly
+    if (serializer != null) {

Review Comment:
   What if `serializer == null`?  I didn't special code in the 
`addPartitionData` part.



##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java:
##########
@@ -100,6 +101,7 @@ public RssShuffleReader(
     this.mapStartIndex = mapStartIndex;
     this.mapEndIndex = mapEndIndex;
     this.context = context;
+    this.rssShuffleHandle = rssShuffleHandle;

Review Comment:
   do we need to keep shuffle handle as a field here?



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -151,25 +154,20 @@ public WriteBufferManager(
     this.memorySpillTimeoutSec = 
rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
   }
 
-  public List<ShuffleBlockInfo> addRecord(int partitionId, Object key, Object 
value) {
-    final long start = System.currentTimeMillis();
-    arrayOutputStream.reset();
-    if (key != null) {
-      serializeStream.writeKey(key, ClassTag$.MODULE$.apply(key.getClass()));
-    } else {
-      serializeStream.writeKey(null, ManifestFactory$.MODULE$.Null());
-    }
-    if (value != null) {
-      serializeStream.writeValue(value, 
ClassTag$.MODULE$.apply(value.getClass()));
-    } else {
-      serializeStream.writeValue(null, ManifestFactory$.MODULE$.Null());
+  /**
+   * add serialized columnar data directly when integrate with gluten
+   */
+  public List<ShuffleBlockInfo> addPartitionData(int partitionId, byte[] 
serializedData) {
+    return addPartitionData(partitionId, serializedData, 0, 0L);
+  }
+
+  public List<ShuffleBlockInfo> addPartitionData(
+          int partitionId, byte[] serializedData, int serializedDataLength, 
long start) {
+    if (start == 0L) {
+      start = System.currentTimeMillis();

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to