zhengchenyu commented on PR #2104:
URL: 
https://github.com/apache/incubator-uniffle/pull/2104#issuecomment-2339800109

   In fact I wrote a unit test. But But the probability of recurrence is low. I 
made some changes to ByteBufUtils::readBytes to improve the probability of 
reproducing the problem. But I don't know how to apply the changed 
ByteBufUtils::readBytes to the unit test, so I does not submit the unit test.
   
   The changes to ByteBufUtils::readBytes are below, I use sleep to simulate 
slow operation to ensure that getInMemoryShuffleData and flush block occur at 
the same time.
   
   ```
     public static final byte[] readBytes(ByteBuf buf) {
       byte[] bytes = new byte[buf.readableBytes()];
       buf.readBytes(bytes, 0, 100);
       try {
         Thread.sleep(10000);
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
       buf.readBytes(bytes, 100, bytes.length - 100);
       buf.resetReaderIndex();
       return bytes;
     }
   ```
   
   And the unit test is:
   
   ```
   package org.apache.uniffle.test;
   
   import static 
org.apache.uniffle.common.config.RssClientConf.RSS_STORAGE_TYPE;
   import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE;
   import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE;
   import static org.junit.jupiter.api.Assertions.assertEquals;
   
   import com.google.common.collect.ImmutableMap;
   import com.google.common.collect.Lists;
   import java.io.File;
   import java.util.ArrayList;
   import java.util.HashMap;
   import java.util.HashSet;
   import java.util.List;
   import java.util.Map;
   import java.util.Set;
   import java.util.concurrent.atomic.AtomicInteger;
   import java.util.stream.Collectors;
   import org.apache.uniffle.client.api.ShuffleReadClient;
   import org.apache.uniffle.client.api.ShuffleWriteClient;
   import org.apache.uniffle.client.factory.ShuffleClientFactory;
   import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
   import org.apache.uniffle.client.response.CompressedShuffleBlock;
   import org.apache.uniffle.client.util.DefaultIdHelper;
   import org.apache.uniffle.common.ClientType;
   import org.apache.uniffle.common.PartitionRange;
   import org.apache.uniffle.common.RemoteStorageInfo;
   import org.apache.uniffle.common.ShuffleBlockInfo;
   import org.apache.uniffle.common.ShuffleDataDistributionType;
   import org.apache.uniffle.common.ShuffleServerInfo;
   import org.apache.uniffle.common.config.RssConf;
   import org.apache.uniffle.common.rpc.ServerType;
   import org.apache.uniffle.common.util.BlockIdLayout;
   import org.apache.uniffle.common.util.ChecksumUtils;
   import org.apache.uniffle.coordinator.CoordinatorConf;
   import org.apache.uniffle.server.ShuffleServer;
   import org.apache.uniffle.server.ShuffleServerConf;
   import org.apache.uniffle.server.buffer.ShuffleBufferType;
   import org.apache.uniffle.storage.util.StorageType;
   import org.junit.jupiter.api.BeforeAll;
   import org.junit.jupiter.api.io.TempDir;
   import org.junit.jupiter.params.ParameterizedTest;
   import org.junit.jupiter.params.provider.ValueSource;
   import org.roaringbitmap.longlong.Roaring64NavigableMap;
   
   public class FetchIntegralityTest extends ShuffleReadWriteBase {
   
     private static final AtomicInteger ATOMIC_INT_SORTED = new 
AtomicInteger(0);
     private static final String APP_ID = "testApp";
     private static final int SHUFFLE_ID = 0;
     private static final int PARTITION_ID = 0;
   
     private static ShuffleServer shuffleServer;
     private static ShuffleServerInfo shuffleServerInfo;
   
     @BeforeAll
     public static void setupServers( @TempDir File tmpDir) throws Exception {
       CoordinatorConf coordinatorConf = getCoordinatorConf();
       createCoordinatorServer(coordinatorConf);
       ShuffleServerConf shuffleServerConf = 
getShuffleServerConf(ServerType.GRPC_NETTY);
       shuffleServerConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE, 
ShuffleBufferType.SKIP_LIST);
       // Each shuffle data will be flushed!
       shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 
0.0);
       shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 
0.0);
       shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 
10000000);
       File dataDir1 = new File(tmpDir, "data1");
       File dataDir2 = new File(tmpDir, "data2");
       String basePath = dataDir1.getAbsolutePath() + "," + 
dataDir2.getAbsolutePath();
       shuffleServerConf.setString("rss.storage.type", 
StorageType.LOCALFILE.name());
       shuffleServerConf.setString("rss.storage.basePath", basePath);
       createShuffleServer(shuffleServerConf);
   
       startServers();
       shuffleServer = nettyShuffleServers.get(0);
       shuffleServerInfo = new ShuffleServerInfo("127.0.0.1-20001", 
shuffleServer.getIp(), shuffleServer.getGrpcPort(),
           shuffleServer.getNettyPort());
     }
   
     @ParameterizedTest
     @ValueSource(strings = {"GRPC", "GRPC_NETTY"})
     public void fetchIntegralityTest(String clientType, @TempDir File tmpDir) 
throws Exception {
       // 1 set basic parameter
       int size = 100 * 1024 * 1024;
       RssConf rssConf = new RssConf();
       rssConf.set(RSS_STORAGE_TYPE, "MEMORY_LOCALFILE");
       BlockIdLayout layout = BlockIdLayout.from(rssConf);
   
       // 2 register shuffle
       ShuffleWriteClient shuffleWriteClient = new ShuffleWriteClientImpl(
           
ShuffleClientFactory.newWriteBuilder().clientType(clientType).retryMax(3).retryIntervalMax(1000)
               
.heartBeatThreadNum(1).replica(1).replicaWrite(1).replicaRead(1).replicaSkipEnabled(true)
               
.dataTransferPoolSize(1).dataCommitPoolSize(1).unregisterThreadPoolSize(10).unregisterRequestTimeSec(10));
       shuffleWriteClient.registerShuffle(
           shuffleServerInfo,
           APP_ID,
           SHUFFLE_ID,
           Lists.newArrayList(new PartitionRange(0, 0)),
           new RemoteStorageInfo(""),
           ShuffleDataDistributionType.NORMAL,
           -1);
   
       // 3 send shuffle data
       List<ShuffleBlockInfo> blocks1 = new ArrayList<>();
       blocks1.add(createShuffleBlock(layout, 0, PARTITION_ID, 
Lists.newArrayList(shuffleServerInfo), size));
       shuffleWriteClient.sendShuffleData(APP_ID, blocks1, () -> false);
   
       // 4 report shuffle result
       Map<Integer, Set<Long>> ptb = ImmutableMap.of(PARTITION_ID, new 
HashSet());
       ptb.get(PARTITION_ID).addAll(blocks1.stream().map(s -> 
s.getBlockId()).collect(Collectors.toList()));
       Map<ShuffleServerInfo, Map<Integer, Set<Long>>> 
serverToPartitionToBlockIds = new HashMap();
       serverToPartitionToBlockIds.put(shuffleServerInfo, ptb);
       shuffleWriteClient.reportShuffleResult(serverToPartitionToBlockIds, 
APP_ID, SHUFFLE_ID, 0, 1);
   
       // 5 get shuffle data
       Roaring64NavigableMap blockIdBitmap = new Roaring64NavigableMap();
       blockIdBitmap.add(blocks1.get(0).getBlockId());
       Roaring64NavigableMap taskIdBitmap = new Roaring64NavigableMap();
       taskIdBitmap.add(0);
       ShuffleReadClient shuffleReadClient =
           ShuffleClientFactory.getInstance()
               .createShuffleReadClient(
                   ShuffleClientFactory.newReadBuilder()
                       .clientType(ClientType.valueOf(clientType))
                       .appId(APP_ID)
                       .shuffleId(SHUFFLE_ID)
                       .partitionId(PARTITION_ID)
                       .basePath(null)
                       .partitionNumPerRange(1)
                       .partitionNum(1)
                       .blockIdBitmap(blockIdBitmap)
                       .taskIdBitmap(taskIdBitmap)
                       
.shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo))
                       .hadoopConf(null)
                       .idHelper(new DefaultIdHelper(layout))
                       .expectedTaskIdsBitmapFilterEnable(false)
                       .rssConf(rssConf));
       // wait local file writer
       Thread.sleep(5000);
       CompressedShuffleBlock block = shuffleReadClient.readShuffleBlockData();
       assertEquals(size, block.getUncompressLength());
   
       // 6 cleanup
       shuffleWriteClient.unregisterShuffle(APP_ID);
       shuffleWriteClient.close();
       shuffleReadClient.close();
     }
   
     public static ShuffleBlockInfo createShuffleBlock(
         BlockIdLayout blockIdLayout,
         int taskAttemptId,
         int partitionId,
         List<ShuffleServerInfo> shuffleServerInfoList,
         int size) {
       long blockId = 
blockIdLayout.getBlockId(ATOMIC_INT_SORTED.getAndIncrement(), PARTITION_ID, 
taskAttemptId);
       byte[] bytes = new byte[size];
       for (int i = 0; i < size; i++) {
         bytes[i] = (byte) (i & 0xFF);
       }
       return new ShuffleBlockInfo(
           SHUFFLE_ID,
           partitionId,
           blockId,
           size,
           ChecksumUtils.getCrc32(bytes),
           bytes,
           shuffleServerInfoList,
           size,
           0,
           taskAttemptId);
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to