advancedxy commented on code in PR #495:
URL: https://github.com/apache/incubator-uniffle/pull/495#discussion_r1094399086


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -131,15 +119,39 @@ public boolean hasNext() {
         // finish reading records, check data consistent
         shuffleReadClient.checkProcessedBlockIds();
         shuffleReadClient.logStatics();
-        LOG.info("Fetch " + compressedBytesLength + " bytes cost " + readTime 
+ " ms and "
-            + serializeTime + " ms to serialize, " + decompressTime + " ms to 
decompress with unCompressionLength["
-            + unCompressedBytesLength + "]");
+        LOG.info("Fetch " + totalRawBytesLength + " bytes cost " + readTime + 
" ms and "
+            + serializeTime + " ms to serialize" + (codec == null ? "." : (", 
" + decompressTime
+            + " ms to decompress with unCompressionLength["

Review Comment:
   this is a bit hard to read. 
   Could we extract this message and use a format method



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -66,12 +67,15 @@ public RssShuffleDataIterator(
     this.serializerInstance = serializer.newInstance();
     this.shuffleReadClient = shuffleReadClient;
     this.shuffleReadMetrics = shuffleReadMetrics;
-    this.codec = Codec.newInstance(rssConf);
+    boolean compress = 
rssConf.getBoolean(RssSparkConfig.SPARK_SHUFFLE_COMPRESS.key()
+            .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()),
+        RssSparkConfig.SPARK_SHUFFLE_COMPRESS.defaultValue().get());
+    this.codec = compress ? Codec.newInstance(rssConf) : null;
   }
 
   public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data, 
int size) {
     clearDeserializationStream();
-    byteBufInputStream = new 
ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), 0, size), true);
+    byteBufInputStream = new 
ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size), 
true);

Review Comment:
   does this mean the old code path have a bug here?



##########
client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java:
##########
@@ -41,14 +40,20 @@
 
 public class WriteBufferManagerTest {
 
+  static {
+    // trigger spark config package initialization before RssSparkConfig to 
avoid key, spark.shuffle.compress,
+    // duplication in spark3
+    org.apache.spark.internal.config.package$.MODULE$.EXECUTOR_MEMORY();

Review Comment:
   This is weird. And I believe it's caused by the following lines:
   
https://github.com/apache/incubator-uniffle/pull/495/files#diff-c3afd1d81d99843fe961baab101d1f7df3016993a0adf2b5263812b2a496780fR275-R289
   
   Since we cannot control when spark will load spark.shuffle.compress in the 
test, it could also happened in a real spark job. 
   
   I suggest just declare a `SPARK_SHUFFLE_COMPRESS` string field in 
RssSparkConfig.java just like you did it in the previous `RssClientConfig`. It 
can make things a lot easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to