loukey-lj commented on code in PR #950:
URL: https://github.com/apache/incubator-uniffle/pull/950#discussion_r1229302396
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -151,25 +154,20 @@ public WriteBufferManager(
this.memorySpillTimeoutSec =
rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
}
- public List<ShuffleBlockInfo> addRecord(int partitionId, Object key, Object
value) {
- final long start = System.currentTimeMillis();
- arrayOutputStream.reset();
- if (key != null) {
- serializeStream.writeKey(key, ClassTag$.MODULE$.apply(key.getClass()));
- } else {
- serializeStream.writeKey(null, ManifestFactory$.MODULE$.Null());
- }
- if (value != null) {
- serializeStream.writeValue(value,
ClassTag$.MODULE$.apply(value.getClass()));
- } else {
- serializeStream.writeValue(null, ManifestFactory$.MODULE$.Null());
+ /**
+ * add serialized columnar data directly when integrate with gluten
+ */
+ public List<ShuffleBlockInfo> addPartitionData(int partitionId, byte[]
serializedData) {
+ return addPartitionData(partitionId, serializedData, 0, 0L);
Review Comment:
Is it possible to replace 0 with serializedData.length
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -151,25 +154,20 @@ public WriteBufferManager(
this.memorySpillTimeoutSec =
rssConf.get(RssSparkConfig.RSS_MEMORY_SPILL_TIMEOUT);
}
- public List<ShuffleBlockInfo> addRecord(int partitionId, Object key, Object
value) {
- final long start = System.currentTimeMillis();
- arrayOutputStream.reset();
- if (key != null) {
- serializeStream.writeKey(key, ClassTag$.MODULE$.apply(key.getClass()));
- } else {
- serializeStream.writeKey(null, ManifestFactory$.MODULE$.Null());
- }
- if (value != null) {
- serializeStream.writeValue(value,
ClassTag$.MODULE$.apply(value.getClass()));
- } else {
- serializeStream.writeValue(null, ManifestFactory$.MODULE$.Null());
+ /**
+ * add serialized columnar data directly when integrate with gluten
+ */
+ public List<ShuffleBlockInfo> addPartitionData(int partitionId, byte[]
serializedData) {
Review Comment:
org.apache.spark.shuffle.writer.rssShuffleWriter#writeImpl logic can be
divided into three parts: loop start, execution loop, loop end,
addPartitionData This method is reserved for gluten to use, gluten's
shuffleWrite logic will also have three parts, loop start, loop execution
addPartitionData, end of loop, my question is loop start and loop end also need
to execute uniffle logic, where is this part of the code?
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java:
##########
@@ -128,7 +128,11 @@ public WriteBufferManager(
super(taskMemoryManager, taskMemoryManager.pageSizeBytes(),
MemoryMode.ON_HEAP);
this.bufferSize = bufferManagerOptions.getBufferSize();
this.spillSize = bufferManagerOptions.getBufferSpillThreshold();
- this.instance = serializer.newInstance();
+ // columnar shuffle reader use the serialized data directly
+ if (serializer != null) {
Review Comment:
serializer = rssHandle.getDependency().serializer(). Even with gluten, it is
not null here,In my gluten 0.5 practice, the error is reported here because the
serializeStream method is not implemented
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]