This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 56490d4c4 [#1358] fix(spark): pre-check bytebuffer whether is direct
before uncompress (#1360)
56490d4c4 is described below
commit 56490d4c454a30ca89cf1e407993b0e577ba82fc
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Dec 19 14:16:50 2023 +0800
[#1358] fix(spark): pre-check bytebuffer whether is direct before
uncompress (#1360)
### What changes were proposed in this pull request?
precheck bytebuffer whether is direct before uncompress, and rebuild the
temp bytebuffer.
### Why are the changes needed?
For #1358 . But this PR is not the fundamental solution
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
---
.../spark/shuffle/reader/RssShuffleDataIterator.java | 19 ++++++++++++++++++-
1 file changed, 18 insertions(+), 1 deletion(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 38ff12a2f..e1b97896f 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -150,6 +150,10 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
return recordsIterator.hasNext();
}
+ private boolean isSameMemoryType(ByteBuffer left, ByteBuffer right) {
+ return left.isDirect() == right.isDirect();
+ }
+
private int uncompress(CompressedShuffleBlock rawBlock, ByteBuffer rawData) {
long rawDataLength = rawData.limit() - rawData.position();
totalRawBytesLength += rawDataLength;
@@ -157,7 +161,20 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
int uncompressedLen = rawBlock.getUncompressLength();
if (codec != null) {
- if (uncompressedData == null || uncompressedData.capacity() <
uncompressedLen) {
+ if (uncompressedData == null
+ || uncompressedData.capacity() < uncompressedLen
+ || !isSameMemoryType(uncompressedData, rawData)) {
+
+ if (LOG.isDebugEnabled()) {
+ if (!isSameMemoryType(uncompressedData, rawData)) {
+ LOG.debug(
+ "This should not happen that the temporary uncompressed data's
memory type(isDirect:{}) "
+ + "is not same with fetched data buffer(isDirect:{})",
+ uncompressedData.isDirect(),
+ rawData.isDirect());
+ }
+ }
+
if (uncompressedData != null) {
RssUtils.releaseByteBuffer(uncompressedData);
}