This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 051b07a74 [MINOR] refactor: Avoid unnecessary recursion (#1441)
051b07a74 is described below
commit 051b07a74807c21cb898a7e5cfc0cee9d12c452a
Author: Enrico Minack <[email protected]>
AuthorDate: Fri Jan 12 02:52:00 2024 +0100
[MINOR] refactor: Avoid unnecessary recursion (#1441)
### What changes were proposed in this pull request?
Change the recursion of ShuffleReadClientImpl.readShuffleBlockData into an
outer while(true) loop. Diff is best compared with whitespaces ignored.
### Why are the changes needed?
The recursion in ShuffleReadClientImpl.readShuffleBlockData is not needed
because the existing return statements also terminate the introduced outer
loop. The recursion depth is limited by the number of consecutive empty
segments.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Not tested.
---
.../uniffle/client/impl/ShuffleReadClientImpl.java | 140 +++++++++++----------
1 file changed, 71 insertions(+), 69 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index e94f5aa6d..2c2fc25ce 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -169,90 +169,92 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
@Override
public CompressedShuffleBlock readShuffleBlockData() {
- // empty data expected, just return null
- if (blockIdBitmap.isEmpty()) {
- return null;
- }
-
- // All blocks are processed, so just return
- if (pendingBlockIds.isEmpty()) {
- return null;
- }
-
- // if client need request new data from shuffle server
- if (bufferSegmentQueue.isEmpty()) {
- if (read() <= 0) {
+ while (true) {
+ // empty data expected, just return null
+ if (blockIdBitmap.isEmpty()) {
return null;
}
- }
- // get next buffer segment
- BufferSegment bs = null;
+ // All blocks are processed, so just return
+ if (pendingBlockIds.isEmpty()) {
+ return null;
+ }
- // blocks in bufferSegmentQueue may be from different partition in range
partition mode,
- // or may be from speculation task, filter them and just read the
necessary block
- while (true) {
- bs = bufferSegmentQueue.poll();
- if (bs == null) {
- break;
+ // if client need request new data from shuffle server
+ if (bufferSegmentQueue.isEmpty()) {
+ if (read() <= 0) {
+ return null;
+ }
}
- // check 1: if blockId is processed
- // check 2: if blockId is required for current partition
- // check 3: if blockId is generated by required task
- if (!processedBlockIds.contains(bs.getBlockId())
- && blockIdBitmap.contains(bs.getBlockId())
- && taskIdBitmap.contains(bs.getTaskAttemptId())) {
- long expectedCrc = -1;
- long actualCrc = -1;
- try {
- long start = System.currentTimeMillis();
- expectedCrc = bs.getCrc();
- actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(),
bs.getLength());
- crcCheckTime.addAndGet(System.currentTimeMillis() - start);
- } catch (Exception e) {
- LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]", e);
+
+ // get next buffer segment
+ BufferSegment bs = null;
+
+ // blocks in bufferSegmentQueue may be from different partition in range
partition mode,
+ // or may be from speculation task, filter them and just read the
necessary block
+ while (true) {
+ bs = bufferSegmentQueue.poll();
+ if (bs == null) {
+ break;
}
+ // check 1: if blockId is processed
+ // check 2: if blockId is required for current partition
+ // check 3: if blockId is generated by required task
+ if (!processedBlockIds.contains(bs.getBlockId())
+ && blockIdBitmap.contains(bs.getBlockId())
+ && taskIdBitmap.contains(bs.getTaskAttemptId())) {
+ long expectedCrc = -1;
+ long actualCrc = -1;
+ try {
+ long start = System.currentTimeMillis();
+ expectedCrc = bs.getCrc();
+ actualCrc = ChecksumUtils.getCrc32(readBuffer, bs.getOffset(),
bs.getLength());
+ crcCheckTime.addAndGet(System.currentTimeMillis() - start);
+ } catch (Exception e) {
+ LOG.warn("Can't read data for blockId[" + bs.getBlockId() + "]",
e);
+ }
- if (expectedCrc != actualCrc) {
- String errMsg =
- "Unexpected crc value for blockId["
- + bs.getBlockId()
- + "], expected:"
- + expectedCrc
- + ", actual:"
- + actualCrc;
- // If some blocks of one replica are corrupted,but maybe other
replicas are not corrupted,
- // so exception should not be thrown here if blocks have multiple
replicas
- if (shuffleServerInfoList.size() > 1) {
- LOG.warn(errMsg);
- clientReadHandler.updateConsumedBlockInfo(bs, true);
- continue;
- } else {
- throw new RssFetchFailedException(errMsg);
+ if (expectedCrc != actualCrc) {
+ String errMsg =
+ "Unexpected crc value for blockId["
+ + bs.getBlockId()
+ + "], expected:"
+ + expectedCrc
+ + ", actual:"
+ + actualCrc;
+ // If some blocks of one replica are corrupted,but maybe other
replicas are not
+ // corrupted,
+ // so exception should not be thrown here if blocks have multiple
replicas
+ if (shuffleServerInfoList.size() > 1) {
+ LOG.warn(errMsg);
+ clientReadHandler.updateConsumedBlockInfo(bs, true);
+ continue;
+ } else {
+ throw new RssFetchFailedException(errMsg);
+ }
}
- }
+ // mark block as processed
+ processedBlockIds.addLong(bs.getBlockId());
+ pendingBlockIds.removeLong(bs.getBlockId());
+ // only update the statistics of necessary blocks
+ clientReadHandler.updateConsumedBlockInfo(bs, false);
+ break;
+ }
+ clientReadHandler.updateConsumedBlockInfo(bs, true);
// mark block as processed
processedBlockIds.addLong(bs.getBlockId());
pendingBlockIds.removeLong(bs.getBlockId());
- // only update the statistics of necessary blocks
- clientReadHandler.updateConsumedBlockInfo(bs, false);
- break;
}
- clientReadHandler.updateConsumedBlockInfo(bs, true);
- // mark block as processed
- processedBlockIds.addLong(bs.getBlockId());
- pendingBlockIds.removeLong(bs.getBlockId());
- }
- if (bs != null) {
- ByteBuffer compressedBuffer = readBuffer.duplicate();
- compressedBuffer.position(bs.getOffset());
- compressedBuffer.limit(bs.getOffset() + bs.getLength());
- return new CompressedShuffleBlock(compressedBuffer,
bs.getUncompressLength());
+ if (bs != null) {
+ ByteBuffer compressedBuffer = readBuffer.duplicate();
+ compressedBuffer.position(bs.getOffset());
+ compressedBuffer.limit(bs.getOffset() + bs.getLength());
+ return new CompressedShuffleBlock(compressedBuffer,
bs.getUncompressLength());
+ }
+ // current segment hasn't data, try next segment
}
- // current segment hasn't data, try next segment
- return readShuffleBlockData();
}
@VisibleForTesting