summaryzb commented on code in PR #950:
URL: https://github.com/apache/incubator-uniffle/pull/950#discussion_r1230351921


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -151,25 +154,20 @@ public WriteBufferManager(
     this.memorySpillTimeoutSec = 
rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
   }
 
-  public List<ShuffleBlockInfo> addRecord(int partitionId, Object key, Object 
value) {
-    final long start = System.currentTimeMillis();
-    arrayOutputStream.reset();
-    if (key != null) {
-      serializeStream.writeKey(key, ClassTag$.MODULE$.apply(key.getClass()));
-    } else {
-      serializeStream.writeKey(null, ManifestFactory$.MODULE$.Null());
-    }
-    if (value != null) {
-      serializeStream.writeValue(value, 
ClassTag$.MODULE$.apply(value.getClass()));
-    } else {
-      serializeStream.writeValue(null, ManifestFactory$.MODULE$.Null());
+  /**
+   * add serialized columnar data directly when integrate with gluten
+   */
+  public List<ShuffleBlockInfo> addPartitionData(int partitionId, byte[] 
serializedData) {
+    return addPartitionData(partitionId, serializedData, 0, 0L);
+  }
+
+  public List<ShuffleBlockInfo> addPartitionData(
+          int partitionId, byte[] serializedData, int serializedDataLength, 
long start) {
+    if (start == 0L) {
+      start = System.currentTimeMillis();

Review Comment:
   agree, so as to `serializedDataLength`



##########
client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java:
##########
@@ -100,6 +101,7 @@ public RssShuffleReader(
     this.mapStartIndex = mapStartIndex;
     this.mapEndIndex = mapEndIndex;
     this.context = context;
+    this.rssShuffleHandle = rssShuffleHandle;

Review Comment:
   yes, it's used in `rssShuffleHandle.getNumMaps()`



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -141,7 +140,11 @@ public WriteBufferManager(
     this.requireMemoryInterval = 
bufferManagerOptions.getRequireMemoryInterval();
     this.requireMemoryRetryMax = 
bufferManagerOptions.getRequireMemoryRetryMax();
     this.arrayOutputStream = new 
WrappedByteArrayOutputStream(serializerBufferSize);
-    this.serializeStream = instance.serializeStream(arrayOutputStream);
+    // columnar shuffle use the serialized data directly
+    if (serializer != null) {

Review Comment:
   When use `addPartitionData` to add byte already serialized, serializer is 
never used. What's more use ColumnarSerializer  here in 
`instance.serializeStream(arrayOutputStream)` cause exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to