This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 43bfd2012 [#2679] fix(spark): Potential data mismatch on overlapping 
decompression (#2680)
43bfd2012 is described below

commit 43bfd2012b624b3eb91f473582073784cdcf4072
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Nov 24 10:20:40 2025 +0800

    [#2679] fix(spark): Potential data mismatch on overlapping decompression 
(#2680)
    
    ### What changes were proposed in this pull request?
    
    This PR is to fix the potential data mismatch when encountering the 
duplicate blockIds when the overlapping decompression is enabled.
    
    In the one batch remote fetching, if partial blocks should be filtered out 
due to duplicating or unneed, we should skip it to inc the `segmentIndex`
    
    ### Why are the changes needed?
    
    fix the data mismatch cases, that is found by the uniffle integrity 
validation mechanism.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests.
---
 .../uniffle/client/impl/ShuffleReadClientImpl.java |   2 +
 .../client/impl/ShuffleReadClientImplTest.java     | 239 ++++++++++++++-------
 2 files changed, 159 insertions(+), 82 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index c8dfa86ea..364e98526 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -308,6 +308,8 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
         // mark block as processed
         processedBlockIds.add(bs.getBlockId());
         pendingBlockIds.removeLong(bs.getBlockId());
+        // update the segment index to skip the unnecessary block in 
overlapping decompression mode
+        segmentIndex += 1;
       }
 
       if (bs != null) {
diff --git 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
index c2df89558..18b73a478 100644
--- 
a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
+++ 
b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java
@@ -22,12 +22,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 import org.roaringbitmap.longlong.LongIterator;
@@ -39,6 +42,7 @@ import org.apache.uniffle.client.response.ShuffleBlock;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.compression.NoOpCodec;
 import org.apache.uniffle.common.config.RssClientConf;
 import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.BlockId;
@@ -60,10 +64,12 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
   private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should 
be thrown";
   private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
 
-  private ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", "host1", 
0);
-  private ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", "host2", 
0);
+  private static ShuffleServerInfo ssi1 = new ShuffleServerInfo("host1-0", 
"host1", 0);
+  private static ShuffleServerInfo ssi2 = new ShuffleServerInfo("host2-0", 
"host2", 0);
 
-  private ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
+  private static AtomicInteger UNIQ_INT = new AtomicInteger(0);
+
+  private static ShuffleClientFactory.ReadClientBuilder baseReadBuilder() {
     return ShuffleClientFactory.newReadBuilder()
         .clientType(ClientType.GRPC)
         .storageType(StorageType.HDFS.name())
@@ -77,9 +83,26 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
         .shuffleServerInfoList(Lists.newArrayList(ssi1));
   }
 
-  @Test
-  public void readTest1() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest1";
+  private static ShuffleClientFactory.ReadClientBuilder 
overlappingDecompressionReadBuilder() {
+    return baseReadBuilder()
+        .codec(NoOpCodec.getInstance())
+        .overlappingDecompressionThreadNum(1)
+        .overlappingDecompressionEnabled(true);
+  }
+
+  private static Stream<Supplier<ShuffleClientFactory.ReadClientBuilder>> 
clientBuilderProvider() {
+    return Stream.of(() -> baseReadBuilder(), () -> 
overlappingDecompressionReadBuilder());
+  }
+
+  private String uniq(String path) {
+    return String.format("%s-%s", path, UNIQ_INT.getAndIncrement());
+  }
+
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest1(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest1");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, 
ssi1.getId(), conf);
 
@@ -88,7 +111,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 1, 0, expectedData, blockIdBitmap);
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .basePath(basePath)
             .blockIdBitmap(blockIdBitmap)
@@ -102,7 +126,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     blockIdBitmap.addLong(layout.getBlockId(0, 0, layout.maxTaskAttemptId - 
1));
     taskIdBitmap.addLong(layout.maxTaskAttemptId - 1);
     readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .basePath(basePath)
             .blockIdBitmap(blockIdBitmap)
@@ -120,9 +145,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     }
   }
 
-  @Test
-  public void readTest2() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest2";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest2(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest2");
     HadoopShuffleWriteHandler writeHandler1 =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
     HadoopShuffleWriteHandler writeHandler2 =
@@ -135,7 +162,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
 
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -148,9 +176,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
   }
 
-  @Test
-  public void readTest3() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest3";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest3(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest3");
     HadoopShuffleWriteHandler writeHandler1 =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
     HadoopShuffleWriteHandler writeHandler2 =
@@ -194,7 +224,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
         conf);
 
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -207,9 +238,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
   }
 
-  @Test
-  public void readTest4() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest4";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest4(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest4");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
 
@@ -219,7 +252,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
 
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -253,9 +287,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
   }
 
-  @Test
-  public void readTest5() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest5";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest5(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest5");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
 
@@ -264,7 +300,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -280,9 +317,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     assertNull(readClient.readShuffleBlockData());
   }
 
-  @Test
-  public void readTest7() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest7";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest7(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest7");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
 
@@ -297,7 +336,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
 
     writeTestData(writeHandler, 10, 30, 0, 0, expectedData1, blockIdBitmap1);
     ShuffleReadClientImpl readClient1 =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -306,7 +346,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
             .build();
 
     final ShuffleReadClientImpl readClient2 =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -322,9 +363,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient2.close();
   }
 
-  @Test
-  public void readTest8() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest8";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest8(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest8");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
 
@@ -333,7 +376,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
     writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -341,7 +385,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
             .taskIdBitmap(taskIdBitmap)
             .build();
     ShuffleReadClientImpl readClient2 =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -359,11 +404,7 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
         }
         fail(EXPECTED_EXCEPTION_MESSAGE);
       } catch (Exception e) {
-        assertTrue(
-            e.getMessage()
-                .startsWith(
-                    "Unexpected crc value for blockId[5800000000000 (seq: 44, 
part: 0, task: 0)]"),
-            e.getMessage());
+        assertTrue(e.getMessage().startsWith("Unexpected crc value"), 
e.getMessage());
       }
 
       ShuffleBlock block = readClient2.readShuffleBlockData();
@@ -373,11 +414,14 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient2.close();
   }
 
-  @Test
-  public void readTest9() {
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest9(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
     // empty data
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath("basePath")
@@ -388,9 +432,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.checkProcessedBlockIds();
   }
 
-  @Test
-  public void readTest10() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest10";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest10(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest10");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
 
@@ -408,7 +454,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     }
 
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -424,9 +471,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     }
   }
 
-  @Test
-  public void readTest11() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest11";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest11(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest11");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, 
ssi1.getId(), conf);
 
@@ -436,7 +485,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     writeTestData(writeHandler, 10, 30, 1, 0, expectedData, blockIdBitmap);
     // test with different indexReadLimit to validate result
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .indexReadLimit(1)
             .basePath(basePath)
@@ -448,7 +498,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
 
     readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .indexReadLimit(2)
             .basePath(basePath)
@@ -460,7 +511,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
 
     readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .indexReadLimit(3)
             .basePath(basePath)
@@ -472,7 +524,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
 
     readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .indexReadLimit(10)
             .basePath(basePath)
@@ -484,7 +537,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
 
     readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .indexReadLimit(11)
             .basePath(basePath)
@@ -496,9 +550,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
   }
 
-  @Test
-  public void readTest12() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest12";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest12(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest12");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, 
ssi1.getId(), conf);
 
@@ -511,7 +567,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
 
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .basePath(basePath)
             .blockIdBitmap(blockIdBitmap)
@@ -523,24 +580,30 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
   }
 
-  @Test
-  public void readTest13() throws Exception {
-    doReadTest13(BlockIdLayout.DEFAULT);
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest13(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    doReadTest13(BlockIdLayout.DEFAULT, builderSupplier);
   }
 
-  @Test
-  public void readTest13b() throws Exception {
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest13b(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
     // This test is identical to readTest13, except that it does not use the 
default BlockIdLayout
     // the layout is only used by IdHelper that extracts the task attempt id 
from the block id
     // the partition id has to be larger than 0, so that it can leak into the 
task attempt id
     // if the default layout is being used
     BlockIdLayout layout = BlockIdLayout.from(22, 21, 20);
     assertNotEquals(layout, BlockIdLayout.DEFAULT);
-    doReadTest13(layout);
+    doReadTest13(layout, builderSupplier);
   }
 
-  public void doReadTest13(BlockIdLayout layout) throws Exception {
-    String basePath = HDFS_URI + "clientReadTest13-" + layout.hashCode();
+  public void doReadTest13(
+      BlockIdLayout layout, Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest13-" + layout.hashCode());
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, 
ssi1.getId(), conf);
 
@@ -564,7 +627,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     // unexpected taskAttemptId should be filtered
     assertEquals(15, blockIdBitmap.getIntCardinality());
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .basePath(basePath)
             .blockIdBitmap(blockIdBitmap)
@@ -585,7 +649,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
       // the particular layout that created the block ids is incompatible with 
default layout, so
       // all block ids will be skipped
       // note that skipped block ids in blockIdBitmap will be removed by 
`build()`
-      baseReadBuilder()
+      builderSupplier
+          .get()
           .basePath(basePath)
           .blockIdBitmap(blockIdBitmap)
           .partitionId(1)
@@ -596,9 +661,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     }
   }
 
-  @Test
-  public void readTest14() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest14";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest14(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest14");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, 
ssi1.getId(), conf);
 
@@ -611,7 +678,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
 
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .basePath(basePath)
             .blockIdBitmap(blockIdBitmap)
@@ -623,9 +691,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
   }
 
-  @Test
-  public void readTest15() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest15";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest15(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest15");
     HadoopShuffleWriteHandler writeHandler =
         new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, 
ssi1.getId(), conf);
 
@@ -640,7 +710,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     writeTestData(writeHandler, 5, 30, 1, 0, Maps.newHashMap(), 
Roaring64NavigableMap.bitmapOf());
     // unexpected taskAttemptId should be filtered
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .basePath(basePath)
             .blockIdBitmap(blockIdBitmap)
@@ -652,9 +723,11 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
   }
 
-  @Test
-  public void readTest16() throws Exception {
-    String basePath = HDFS_URI + "clientReadTest16";
+  @ParameterizedTest
+  @MethodSource("clientBuilderProvider")
+  public void readTest16(Supplier<ShuffleClientFactory.ReadClientBuilder> 
builderSupplier)
+      throws Exception {
+    String basePath = uniq(HDFS_URI + "clientReadTest16");
     HadoopShuffleWriteHandler writeHandler0 =
         new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, 
ssi1.getId(), conf);
     HadoopShuffleWriteHandler writeHandler1 =
@@ -668,7 +741,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     writeTestData(writeHandler1, 2, 30, 1, 1, expectedData1, blockIdBitmap1);
 
     ShuffleReadClientImpl readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(0)
             .partitionNumPerRange(2)
             .basePath(basePath)
@@ -681,7 +755,8 @@ public class ShuffleReadClientImplTest extends 
HadoopTestBase {
     readClient.close();
 
     readClient =
-        baseReadBuilder()
+        builderSupplier
+            .get()
             .partitionId(1)
             .partitionNumPerRange(2)
             .basePath(basePath)

Reply via email to