This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7aa0117f [AQE][LocalOrder] Fix wrong parameter for expectedTaskIds in
LocalOrderSegmentSplit (#319)
7aa0117f is described below
commit 7aa0117f3d0115771eb41602833014e58c77e4fd
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Nov 14 10:41:26 2022 +0800
[AQE][LocalOrder] Fix wrong parameter for expectedTaskIds in
LocalOrderSegmentSplit (#319)
### What changes were proposed in this pull request?
1. Fix wrong param of expectedTaskIds in `LocalOrderSegmentSplitter`
2. Fix the `LOCAL_ORDER` type invalid when reading
### Why are the changes needed?
In current codebase, the reads of `LOCAL_ORDER` is invalid. This PR is to
fix it.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UTs of `AQESkewedJoinWithLocalOrderTest` cover this case.
---
.../uniffle/client/impl/ShuffleReadClientImpl.java | 43 +++++++++++-----------
.../impl/LocalFileQuorumClientReadHandler.java | 2 +-
2 files changed, 22 insertions(+), 23 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index 28e81f29..99387f59 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -61,7 +61,6 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
private AtomicLong crcCheckTime = new AtomicLong(0);
private ClientReadHandler clientReadHandler;
private final IdHelper idHelper;
- private ShuffleDataDistributionType dataDistributionType =
ShuffleDataDistributionType.NORMAL;
public ShuffleReadClientImpl(
String storageType,
@@ -79,27 +78,6 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
Configuration hadoopConf,
IdHelper idHelper,
ShuffleDataDistributionType dataDistributionType) {
- this(storageType, appId, shuffleId, partitionId, indexReadLimit,
- partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
- blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
idHelper);
- this.dataDistributionType = dataDistributionType;
- }
-
- public ShuffleReadClientImpl(
- String storageType,
- String appId,
- int shuffleId,
- int partitionId,
- int indexReadLimit,
- int partitionNumPerRange,
- int partitionNum,
- int readBufferSize,
- String storageBasePath,
- Roaring64NavigableMap blockIdBitmap,
- Roaring64NavigableMap taskIdBitmap,
- List<ShuffleServerInfo> shuffleServerInfoList,
- Configuration hadoopConf,
- IdHelper idHelper) {
this.shuffleId = shuffleId;
this.partitionId = partitionId;
this.blockIdBitmap = blockIdBitmap;
@@ -140,6 +118,27 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
clientReadHandler =
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
}
+ public ShuffleReadClientImpl(
+ String storageType,
+ String appId,
+ int shuffleId,
+ int partitionId,
+ int indexReadLimit,
+ int partitionNumPerRange,
+ int partitionNum,
+ int readBufferSize,
+ String storageBasePath,
+ Roaring64NavigableMap blockIdBitmap,
+ Roaring64NavigableMap taskIdBitmap,
+ List<ShuffleServerInfo> shuffleServerInfoList,
+ Configuration hadoopConf,
+ IdHelper idHelper) {
+ this(storageType, appId, shuffleId, partitionId, indexReadLimit,
+ partitionNumPerRange, partitionNum, readBufferSize, storageBasePath,
+ blockIdBitmap, taskIdBitmap, shuffleServerInfoList, hadoopConf,
+ idHelper, ShuffleDataDistributionType.NORMAL);
+ }
+
@Override
public CompressedShuffleBlock readShuffleBlockData() {
// empty data expected, just return null
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
index d523cd63..2a6afdbe 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileQuorumClientReadHandler.java
@@ -70,7 +70,7 @@ public class LocalFileQuorumClientReadHandler extends
AbstractClientReadHandler
processBlockIds,
client,
distributionType,
- expectBlockIds
+ expectTaskIds
));
}
}