This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 43bfd2012 [#2679] fix(spark): Potential data mismatch on overlapping
decompression (#2680)
43bfd2012 is described below
commit 43bfd2012b624b3eb91f473582073784cdcf4072
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Nov 24 10:20:40 2025 +0800
[#2679] fix(spark): Potential data mismatch on overlapping decompression
(#2680)
### What changes were proposed in this pull request?
This PR is to fix the potential data mismatch when encountering the
duplicate blockIds when the overlapping decompression is enabled.
In the one batch remote fetching, if partial blocks should be filtered out
due to duplicating or unneed, we should skip it to inc the `segmentIndex`
### Why are the changes needed?
fix the data mismatch cases, that is found by the uniffle integrity
validation mechanism.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests.
---
.../uniffle/client/impl/ShuffleReadClientImpl.java | 2 +
.../client/impl/ShuffleReadClientImplTest.java | 239 ++++++++++++++-------
2 files changed, 159 insertions(+), 82 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index c8dfa86ea..364e98526 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -308,6 +308,8 @@ public class ShuffleReadClientImpl implements
ShuffleReadClient {
// mark block as processed
processedBlockIds.add(bs.getBlockId());
pendingBlockIds.removeLong(bs.getBlockId());
+ // update the segment index to skip the unnecessary block in
overlapping decompression mode
+ segmentIndex += 1;
}
if (bs != null) {
diff --git
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index c2df89558..18b73a478 100644
---
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -22,12 +22,15 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.roaringbitmap.longlong.LongIterator;
@@ -39,6 +42,7 @@ import org.apache.uniffle.client.response.ShuffleBlock;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.compression.NoOpCodec;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.BlockId;
@@ -60,10 +64,12 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should
be thrown";
private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
- private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1",
0);
- private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2",
0);
+ private static ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0",
"host1", 0);
+ private static ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0",
"host2", 0);
- private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+ private static AtomicInteger UNIQ_INT = new AtomicInteger(0);
+
+ private static ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
return ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.GRPC)
.storageType(StorageType.HDFS.name())
@@ -77,9 +83,26 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
.shuffleServerInfoList(Lists.newArrayList(ssi1));
}
- @Test
- public void readTest1() throws Exception {
- String basePath = HDFS_URI + "clientReadTest1";
+ private static ShuffleClientFactory.ReadClientBuilder
overlappingDecompressionReadBuilder() {
+ return baseReadBuilder()
+ .codec(NoOpCodec.getInstance())
+ .overlappingDecompressionThreadNum(1)
+ .overlappingDecompressionEnabled(true);
+ }
+
+ private static Stream<Supplier<ShuffleClientFactory.ReadClientBuilder>>
clientBuilderProvider() {
+ return Stream.of(() -> baseReadBuilder(), () ->
overlappingDecompressionReadBuilder());
+ }
+
+ private String uniq(String path) {
+ return String.format("%s-%s", path, UNIQ_INT.getAndIncrement());
+ }
+
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest1(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest1");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath,
ssi1.getId(), conf);
@@ -88,7 +111,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 1, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -102,7 +126,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
blockIdBitmap.addLong(layout.getBlockId(0, 0, layout.maxTaskAttemptId -
1));
taskIdBitmap.addLong(layout.maxTaskAttemptId - 1);
readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -120,9 +145,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
}
}
- @Test
- public void readTest2() throws Exception {
- String basePath = HDFS_URI + "clientReadTest2";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest2(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest2");
HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler2 =
@@ -135,7 +162,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -148,9 +176,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
}
- @Test
- public void readTest3() throws Exception {
- String basePath = HDFS_URI + "clientReadTest3";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest3(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest3");
HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler2 =
@@ -194,7 +224,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
conf);
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -207,9 +238,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
}
- @Test
- public void readTest4() throws Exception {
- String basePath = HDFS_URI + "clientReadTest4";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest4(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest4");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
@@ -219,7 +252,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -253,9 +287,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
}
- @Test
- public void readTest5() throws Exception {
- String basePath = HDFS_URI + "clientReadTest5";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest5(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest5");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
@@ -264,7 +300,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -280,9 +317,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
assertNull(readClient.readShuffleBlockData());
}
- @Test
- public void readTest7() throws Exception {
- String basePath = HDFS_URI + "clientReadTest7";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest7(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest7");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
@@ -297,7 +336,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
ShuffleReadClientImpl readClient1 =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -306,7 +346,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
.build();
final ShuffleReadClientImpl readClient2 =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -322,9 +363,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient2.close();
}
- @Test
- public void readTest8() throws Exception {
- String basePath = HDFS_URI + "clientReadTest8";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest8(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest8");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
@@ -333,7 +376,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -341,7 +385,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
.taskIdBitmap(taskIdBitmap)
.build();
ShuffleReadClientImpl readClient2 =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -359,11 +404,7 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
}
fail(EXPECTED_EXCEPTION_MESSAGE);
} catch (Exception e) {
- assertTrue(
- e.getMessage()
- .startsWith(
- "Unexpected crc value for blockId[5800000000000 (seq: 44,
part: 0, task: 0)]"),
- e.getMessage());
+ assertTrue(e.getMessage().startsWith("Unexpected crc value"),
e.getMessage());
}
ShuffleBlock block = readClient2.readShuffleBlockData();
@@ -373,11 +414,14 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient2.close();
}
- @Test
- public void readTest9() {
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest9(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
// empty data
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath("basePath")
@@ -388,9 +432,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.checkProcessedBlockIds();
}
- @Test
- public void readTest10() throws Exception {
- String basePath = HDFS_URI + "clientReadTest10";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest10(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest10");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
@@ -408,7 +454,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
}
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -424,9 +471,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
}
}
- @Test
- public void readTest11() throws Exception {
- String basePath = HDFS_URI + "clientReadTest11";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest11(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest11");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath,
ssi1.getId(), conf);
@@ -436,7 +485,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
writeTestData(writeHandler, 10, 30, 1, 0, expectedData, blockIdBitmap);
// test with different indexReadLimit to validate result
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.indexReadLimit(1)
.basePath(basePath)
@@ -448,7 +498,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.indexReadLimit(2)
.basePath(basePath)
@@ -460,7 +511,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.indexReadLimit(3)
.basePath(basePath)
@@ -472,7 +524,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.indexReadLimit(10)
.basePath(basePath)
@@ -484,7 +537,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.indexReadLimit(11)
.basePath(basePath)
@@ -496,9 +550,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
}
- @Test
- public void readTest12() throws Exception {
- String basePath = HDFS_URI + "clientReadTest12";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest12(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest12");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath,
ssi1.getId(), conf);
@@ -511,7 +567,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -523,24 +580,30 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
}
- @Test
- public void readTest13() throws Exception {
- doReadTest13(BlockIdLayout.DEFAULT);
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest13(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ doReadTest13(BlockIdLayout.DEFAULT, builderSupplier);
}
- @Test
- public void readTest13b() throws Exception {
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest13b(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
// This test is identical to readTest13, except that it does not use the
default BlockIdLayout
// the layout is only used by IdHelper that extracts the task attempt id
from the block id
// the partition id has to be larger than 0, so that it can leak into the
task attempt id
// if the default layout is being used
BlockIdLayout layout = BlockIdLayout.from(22, 21, 20);
assertNotEquals(layout, BlockIdLayout.DEFAULT);
- doReadTest13(layout);
+ doReadTest13(layout, builderSupplier);
}
- public void doReadTest13(BlockIdLayout layout) throws Exception {
- String basePath = HDFS_URI + "clientReadTest13-" + layout.hashCode();
+ public void doReadTest13(
+ BlockIdLayout layout, Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest13-" + layout.hashCode());
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath,
ssi1.getId(), conf);
@@ -564,7 +627,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
// unexpected taskAttemptId should be filtered
assertEquals(15, blockIdBitmap.getIntCardinality());
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -585,7 +649,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
// the particular layout that created the block ids is incompatible with
default layout, so
// all block ids will be skipped
// note that skipped block ids in blockIdBitmap will be removed by
`build()`
- baseReadBuilder()
+ builderSupplier
+ .get()
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
.partitionId(1)
@@ -596,9 +661,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
}
}
- @Test
- public void readTest14() throws Exception {
- String basePath = HDFS_URI + "clientReadTest14";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest14(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest14");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath,
ssi1.getId(), conf);
@@ -611,7 +678,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -623,9 +691,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
}
- @Test
- public void readTest15() throws Exception {
- String basePath = HDFS_URI + "clientReadTest15";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest15(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest15");
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath,
ssi1.getId(), conf);
@@ -640,7 +710,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(),
Roaring64NavigableMap.bitmapOf());
// unexpected taskAttemptId should be filtered
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.basePath(basePath)
.blockIdBitmap(blockIdBitmap)
@@ -652,9 +723,11 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
}
- @Test
- public void readTest16() throws Exception {
- String basePath = HDFS_URI + "clientReadTest16";
+ @ParameterizedTest
+ @MethodSource("clientBuilderProvider")
+ public void readTest16(Supplier<ShuffleClientFactory.ReadClientBuilder>
builderSupplier)
+ throws Exception {
+ String basePath = uniq(HDFS_URI + "clientReadTest16");
HadoopShuffleWriteHandler writeHandler0 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath,
ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler1 =
@@ -668,7 +741,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
writeTestData(writeHandler1, 2, 30, 1, 1, expectedData1, blockIdBitmap1);
ShuffleReadClientImpl readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(0)
.partitionNumPerRange(2)
.basePath(basePath)
@@ -681,7 +755,8 @@ public class ShuffleReadClientImplTest extends
HadoopTestBase {
readClient.close();
readClient =
- baseReadBuilder()
+ builderSupplier
+ .get()
.partitionId(1)
.partitionNumPerRange(2)
.basePath(basePath)