zhengchenyu commented on PR #2104:
URL:
https://github.com/apache/incubator-uniffle/pull/2104#issuecomment-2339800109
In fact I wrote a unit test. But But the probability of recurrence is low. I
made some changes to ByteBufUtils::readBytes to improve the probability of
reproducing the problem. But I don't know how to apply the changed
ByteBufUtils::readBytes to the unit test, so I does not submit the unit test.
The changes to ByteBufUtils::readBytes are below, I use sleep to simulate
slow operation to ensure that getInMemoryShuffleData and flush block occur at
the same time.
```
public static final byte[] readBytes(ByteBuf buf) {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes, 0, 100);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
buf.readBytes(bytes, 100, bytes.length - 100);
buf.resetReaderIndex();
return bytes;
}
```
And the unit test is:
```
package org.apache.uniffle.test;
import static
org.apache.uniffle.common.config.RssClientConf.RSS_STORAGE_TYPE;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.uniffle.client.api.ShuffleReadClient;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBufferType;
import org.apache.uniffle.storage.util.StorageType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
public class FetchIntegralityTest extends ShuffleReadWriteBase {
private static final AtomicInteger ATOMIC_INT_SORTED = new
AtomicInteger(0);
private static final String APP_ID = "testApp";
private static final int SHUFFLE_ID = 0;
private static final int PARTITION_ID = 0;
private static ShuffleServer shuffleServer;
private static ShuffleServerInfo shuffleServerInfo;
@BeforeAll
public static void setupServers( @TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
shuffleServerConf.set(ShuffleServerConf.SERVER_SHUFFLE_BUFFER_TYPE,
ShuffleBufferType.SKIP_LIST);
// Each shuffle data will be flushed!
shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
0.0);
shuffleServerConf.set(SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat",
10000000);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
createShuffleServer(shuffleServerConf);
startServers();
shuffleServer = nettyShuffleServers.get(0);
shuffleServerInfo = new ShuffleServerInfo("127.0.0.1-20001",
shuffleServer.getIp(), shuffleServer.getGrpcPort(),
shuffleServer.getNettyPort());
}
@ParameterizedTest
@ValueSource(strings = {"GRPC", "GRPC_NETTY"})
public void fetchIntegralityTest(String clientType, @TempDir File tmpDir)
throws Exception {
// 1 set basic parameter
int size = 100 * 1024 * 1024;
RssConf rssConf = new RssConf();
rssConf.set(RSS_STORAGE_TYPE, "MEMORY_LOCALFILE");
BlockIdLayout layout = BlockIdLayout.from(rssConf);
// 2 register shuffle
ShuffleWriteClient shuffleWriteClient = new ShuffleWriteClientImpl(
ShuffleClientFactory.newWriteBuilder().clientType(clientType).retryMax(3).retryIntervalMax(1000)
.heartBeatThreadNum(1).replica(1).replicaWrite(1).replicaRead(1).replicaSkipEnabled(true)
.dataTransferPoolSize(1).dataCommitPoolSize(1).unregisterThreadPoolSize(10).unregisterRequestTimeSec(10));
shuffleWriteClient.registerShuffle(
shuffleServerInfo,
APP_ID,
SHUFFLE_ID,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
// 3 send shuffle data
List<ShuffleBlockInfo> blocks1 = new ArrayList<>();
blocks1.add(createShuffleBlock(layout, 0, PARTITION_ID,
Lists.newArrayList(shuffleServerInfo), size));
shuffleWriteClient.sendShuffleData(APP_ID, blocks1, () -> false);
// 4 report shuffle result
Map<Integer, Set<Long>> ptb = ImmutableMap.of(PARTITION_ID, new
HashSet());
ptb.get(PARTITION_ID).addAll(blocks1.stream().map(s ->
s.getBlockId()).collect(Collectors.toList()));
Map<ShuffleServerInfo, Map<Integer, Set<Long>>>
serverToPartitionToBlockIds = new HashMap();
serverToPartitionToBlockIds.put(shuffleServerInfo, ptb);
shuffleWriteClient.reportShuffleResult(serverToPartitionToBlockIds,
APP_ID, SHUFFLE_ID, 0, 1);
// 5 get shuffle data
Roaring64NavigableMap blockIdBitmap = new Roaring64NavigableMap();
blockIdBitmap.add(blocks1.get(0).getBlockId());
Roaring64NavigableMap taskIdBitmap = new Roaring64NavigableMap();
taskIdBitmap.add(0);
ShuffleReadClient shuffleReadClient =
ShuffleClientFactory.getInstance()
.createShuffleReadClient(
ShuffleClientFactory.newReadBuilder()
.clientType(ClientType.valueOf(clientType))
.appId(APP_ID)
.shuffleId(SHUFFLE_ID)
.partitionId(PARTITION_ID)
.basePath(null)
.partitionNumPerRange(1)
.partitionNum(1)
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
.shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo))
.hadoopConf(null)
.idHelper(new DefaultIdHelper(layout))
.expectedTaskIdsBitmapFilterEnable(false)
.rssConf(rssConf));
// wait local file writer
Thread.sleep(5000);
CompressedShuffleBlock block = shuffleReadClient.readShuffleBlockData();
assertEquals(size, block.getUncompressLength());
// 6 cleanup
shuffleWriteClient.unregisterShuffle(APP_ID);
shuffleWriteClient.close();
shuffleReadClient.close();
}
public static ShuffleBlockInfo createShuffleBlock(
BlockIdLayout blockIdLayout,
int taskAttemptId,
int partitionId,
List<ShuffleServerInfo> shuffleServerInfoList,
int size) {
long blockId =
blockIdLayout.getBlockId(ATOMIC_INT_SORTED.getAndIncrement(), PARTITION_ID,
taskAttemptId);
byte[] bytes = new byte[size];
for (int i = 0; i < size; i++) {
bytes[i] = (byte) (i & 0xFF);
}
return new ShuffleBlockInfo(
SHUFFLE_ID,
partitionId,
blockId,
size,
ChecksumUtils.getCrc32(bytes),
bytes,
shuffleServerInfoList,
size,
0,
taskAttemptId);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]