This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 56490d4c4 [#1358] fix(spark): pre-check bytebuffer whether is direct 
before uncompress (#1360)
56490d4c4 is described below

commit 56490d4c454a30ca89cf1e407993b0e577ba82fc
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 19 14:16:50 2023 +0800

    [#1358] fix(spark): pre-check bytebuffer whether is direct before 
uncompress (#1360)
    
    ### What changes were proposed in this pull request?
    
    precheck bytebuffer whether is direct before uncompress, and rebuild the 
temp bytebuffer.
    
    ### Why are the changes needed?
    
    For #1358 . But this PR is not the fundamental solution
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../spark/shuffle/reader/RssShuffleDataIterator.java  | 19 ++++++++++++++++++-
 1 file changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 38ff12a2f..e1b97896f 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -150,6 +150,10 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
     return recordsIterator.hasNext();
   }
 
+  private boolean isSameMemoryType(ByteBuffer left, ByteBuffer right) {
+    return left.isDirect() == right.isDirect();
+  }
+
   private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {
     long rawDataLength = rawData.limit() - rawData.position();
     totalRawBytesLength += rawDataLength;
@@ -157,7 +161,20 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
 
     int uncompressedLen = rawBlock.getUncompressLength();
     if (codec != null) {
-      if (uncompressedData == null || uncompressedData.capacity() < 
uncompressedLen) {
+      if (uncompressedData == null
+          || uncompressedData.capacity() < uncompressedLen
+          || !isSameMemoryType(uncompressedData, rawData)) {
+
+        if (LOG.isDebugEnabled()) {
+          if (!isSameMemoryType(uncompressedData, rawData)) {
+            LOG.debug(
+                "This should not happen that the temporary uncompressed data's 
memory type(isDirect:{}) "
+                    + "is not same with fetched data buffer(isDirect:{})",
+                uncompressedData.isDirect(),
+                rawData.isDirect());
+          }
+        }
+
         if (uncompressedData != null) {
           RssUtils.releaseByteBuffer(uncompressedData);
         }

Reply via email to