advancedxy commented on code in PR #495:
URL: https://github.com/apache/incubator-uniffle/pull/495#discussion_r1094399086
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -131,15 +119,39 @@ public boolean hasNext() {
// finish reading records, check data consistent
shuffleReadClient.checkProcessedBlockIds();
shuffleReadClient.logStatics();
- LOG.info("Fetch " + compressedBytesLength + " bytes cost " + readTime
+ " ms and "
- + serializeTime + " ms to serialize, " + decompressTime + " ms to
decompress with unCompressionLength["
- + unCompressedBytesLength + "]");
+ LOG.info("Fetch " + totalRawBytesLength + " bytes cost " + readTime +
" ms and "
+ + serializeTime + " ms to serialize" + (codec == null ? "." : (",
" + decompressTime
+ + " ms to decompress with unCompressionLength["
Review Comment:
this is a bit hard to read.
Could we extract this message and use a format method
##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -66,12 +67,15 @@ public RssShuffleDataIterator(
this.serializerInstance = serializer.newInstance();
this.shuffleReadClient = shuffleReadClient;
this.shuffleReadMetrics = shuffleReadMetrics;
- this.codec = Codec.newInstance(rssConf);
+ boolean compress =
rssConf.getBoolean(RssSparkConfig.SPARK_SHUFFLE_COMPRESS.key()
+ .substring(RssSparkConfig.SPARK_RSS_CONFIG_PREFIX.length()),
+ RssSparkConfig.SPARK_SHUFFLE_COMPRESS.defaultValue().get());
+ this.codec = compress ? Codec.newInstance(rssConf) : null;
}
public Iterator<Tuple2<Object, Object>> createKVIterator(ByteBuffer data,
int size) {
clearDeserializationStream();
- byteBufInputStream = new
ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), 0, size), true);
+ byteBufInputStream = new
ByteBufInputStream(Unpooled.wrappedBuffer(data.array(), data.position(), size),
true);
Review Comment:
does this mean the old code path have a bug here?
##########
client-spark/common/src/test/java/org/apache/spark/shuffle/writer/WriteBufferManagerTest.java:
##########
@@ -41,14 +40,20 @@
public class WriteBufferManagerTest {
+ static {
+ // trigger spark config package initialization before RssSparkConfig to
avoid key, spark.shuffle.compress,
+ // duplication in spark3
+ org.apache.spark.internal.config.package$.MODULE$.EXECUTOR_MEMORY();
Review Comment:
This is weird. And I believe it's caused by the following lines:
https://github.com/apache/incubator-uniffle/pull/495/files#diff-c3afd1d81d99843fe961baab101d1f7df3016993a0adf2b5263812b2a496780fR275-R289
Since we cannot control when spark will load spark.shuffle.compress in the
test, it could also happened in a real spark job.
I suggest just declare a `SPARK_SHUFFLE_COMPRESS` string field in
RssSparkConfig.java just like you did it in the previous `RssClientConfig`. It
can make things a lot easier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]