fpkgithub opened a new issue, #257: URL: https://github.com/apache/incubator-uniffle/issues/257
### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) ### Search before asking - [X] I have searched in the [issues](https://github.com/apache/incubator-uniffle/issues?q=is%3Aissue) and found no similar issues. ### Describe the bug I run some hive sql , No data can be read from ShuffleServer when the reduce task number exceeds 1024, but sql result State is SUCCEEDED . At the same time, the sql result count in the original way is inconsistent with that in the uniffle. The relevant logs and codes are as follows. ## Hive Sql ![image](https://user-images.githubusercontent.com/22984063/194813378-acaeb7da-9500-4676-8159-36b1666e1702.png) ## Reduce Task reduce task Id <1024 ``` 2022-10-09 21:22:25,121 INFO [main] org.apache.hadoop.mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.RssShuffle@533377b 2022-10-09 21:22:28,447 INFO [main] org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient: GetInMemoryShuffleData from 10.156.174.33:19999 for appId[appattempt_1665305651155_5705_000001], shuffleId[0], partitionId[19] cost 329 ms ... 2022-10-09 21:22:28,876 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap.Size=2293, pendingBlockIds.Size=0, bufferSegmentQueue.Size=0 2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], partitionId[19], read data cost 380 ms, copy data cost 0 ms, crc check cost 7 ms 2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 2293 blocks [ hot:2293 warm:0 cold:0 frozen:0 ] 2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 971918 bytes [ hot:971918 warm:0 cold:0 frozen:0 ] 2022-10-09 21:22:28,878 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 1372670 uncompressed bytes [ hot:1372670 warm:0 cold:0 frozen:0 ] ``` reduce task Id >= 1024 ``` 2022-09-28 11:46:04,379 INFO [main] org.apache.hadoop.mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.RssShuffle@10fde30a 2022-09-28 11:46:05,060 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], partitionId[2003], read data cost 0 ms, copy data cost 0 ms, crc check cost 0 ms 2022-09-28 11:46:05,061 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 blocks [ hot:0 warm:0 cold:0 frozen:0 ] 2022-09-28 11:46:05,061 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 bytes [ hot:0 warm:0 cold:0 frozen:0 ] 2022-09-28 11:46:05,061 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 uncompressed bytes [ hot:0 warm:0 cold:0 frozen:0 ] ``` ## Client Code At ShuffleReadClientImpl#readShuffleBlockData, When reduce task exceed 1024 blockIdBitmap is empty ### org.apache.uniffle.client.impl.ShuffleReadClientImpl#readShuffleBlockData ```java @Override public CompressedShuffleBlock readShuffleBlockData() { LOG.info("blockIdBitmap.Size=" + blockIdBitmap.getLongCardinality() + ", pendingBlockIds.Size=" + pendingBlockIds.getLongCardinality() + ", bufferSegmentQueue.Size=" + bufferSegmentQueue.size()); // empty data expected, just return null if (blockIdBitmap.isEmpty()) { LOG.info("blockIdBitmap empty , so return null ..."); return null; } // All blocks are processed, so just return if (pendingBlockIds.isEmpty()) { return null; } // if need request new data from shuffle server if (bufferSegmentQueue.isEmpty()) { if (read() <= 0) { return null; } } // get next buffer segment BufferSegment bs = null; // blocks in bufferSegmentQueue may be from different partition in range partition mode, // or may be from speculation task, filter them and just read the necessary block ... // current segment hasn't data, try next segment return readShuffleBlockData(); } ``` ### log ```shell 2022-10-09 21:23:21,530 INFO [org.apache.hadoop.mapreduce.TaskPauseMonitor$Monitor@376a312c] org.apache.hadoop.mapreduce.TaskPauseMonitor: Starting JVM pause monitor 2022-10-09 21:23:21,539 INFO [main] org.apache.hadoop.mapred.ReduceTask: Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.RssShuffle@3383649e 2022-10-09 21:23:22,065 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#appId=appattempt_1665305651155_5705_000001, shuffleId=0, partitionId=2003, shuffleServerInfoSet.Size=1 2022-10-09 21:23:22,065 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#star ss getBlockIdBitmap=10.156.175.32 2022-10-09 21:23:23,799 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#10.156.175.32:ss get blockIdBitmapOfServer.Size=0 2022-10-09 21:23:23,799 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult# break, ss=10.156.175.32 2022-10-09 21:23:23,799 INFO [main] org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#blockIdBitmap.Size=0 2022-10-09 21:23:23,915 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap.Size=0, pendingBlockIds.Size=0, bufferSegmentQueue.Size=0 2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap empty , so return null ... 2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], partitionId[2003], read data cost 0 ms, copy data cost 0 ms, crc check cost 0 ms 2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 blocks [ hot:0 warm:0 cold:0 frozen:0 ] 2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 bytes [ hot:0 warm:0 cold:0 frozen:0 ] 2022-10-09 21:23:23,916 INFO [main] org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 0 uncompressed bytes [ hot:0 warm:0 cold:0 frozen:0 ] ``` ## ShuffleServer At ShuffleTaskManager#getBlockIdsByPartitionId code logic, By adding log and code analysis - when reduce task Id(partition) less than 1024, requestPartitions can found contain blockId - when reduce task Id(partition) exceed 1024, requestPartitions cann't found contain blockId ### org.apache.uniffle.server.ShuffleTaskManager#getBlockIdsByPartitionId ```java // filter the specific partition blockId in the bitmap to the resultBitmap protected Roaring64NavigableMap getBlockIdsByPartitionId(Set<Integer> requestPartitions, Roaring64NavigableMap bitmap, Roaring64NavigableMap resultBitmap) { LOG.info("getBlockIdsByPartitionId# start requestPartitions="+ requestPartitions.toString()+ ", bitmap.size=" + bitmap.getLongCardinality()); int findPartitionIdOKCount = 0; int findPartitionIdFailCount = 0; int partitionNum = Integer.parseInt(requestPartitions.toString().replace("[", "").replace("]", "")); LongIterator iter = bitmap.getLongIterator(); long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1; while (iter.hasNext()) { long blockId = iter.next(); int partitionId = Math.toIntExact((blockId >> Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask); if (requestPartitions.contains(partitionId)) { resultBitmap.addLong(blockId); findPartitionIdOKCount++; }else{ findPartitionIdFailCount++; } } LOG.info("getBlockIdsByPartitionId# end requestPartitions="+ requestPartitions.toString() + ", resultBitmap.size=" + resultBitmap.getLongCardinality() + ", findPartitionIdOKCount="+ findPartitionIdOKCount + ", findPartitionIdFailCount=" + findPartitionIdFailCount); return resultBitmap; } ``` ### log - requestPartitions=[74] return successful ``` ## getShuffleResult [INFO] 2022-10-09 21:22:26,035 Grpc-512 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# start requestPartitions=[74], bitmap.size=1546249 [INFO] 2022-10-09 21:22:26,063 Grpc-512 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# end requestPartitions=[74], resultBitmap.size=2292, findPartitionIdOKCount=2292, findPartitionIdFailCount=1543957 ## getMemoryShuffleData [INFO] 2022-10-09 21:22:26,465 Grpc-464 ShuffleServerGrpcService getMemoryShuffleData - Successfully getInMemoryShuffleData cost 1 ms with 962828 bytes shuffle data for appId[appattempt_1665305651155_5705_000001], shuffleId[0], partitionId[74] ``` - requestPartitions=[2003] return failed ``` [INFO] 2022-10-09 21:23:23,728 Grpc-1478 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# start requestPartitions=[2003], bitmap.size=1546249 [INFO] 2022-10-09 21:23:23,785 Grpc-1478 ShuffleTaskManager getBlockIdsByPartitionId - getBlockIdsByPartitionId# end requestPartitions=[2003], resultBitmap.size=0, findPartitionIdOKCount=0, findPartitionIdFailC ount=1546249 ``` ### Affects Version(s) 0.7.0-snapshot ### Uniffle Server Log Output _No response_ ### Uniffle Engine Log Output _No response_ ### Uniffle Server Configurations ```yaml ## shuffle server config rss.rpc.server.port 19999 rss.jetty.http.port 19998 rss.rpc.executor.size 2000 # it should be configed the same as in coordinator rss.storage.type MEMORY_LOCALFILE_HDFS rss.coordinator.quorum coordinator-01:19999,coordinator-02:19999,coordinator-03:19999 # local storage path for shuffle server rss.storage.basePath /mnt/storage00/hadoop/uniffle-ss/rss_data,/mnt/storage01/hadoop/uniffle-ss/rss_data,/mnt/storage02/hadoop/uniffle-ss/rss_data,/mnt/storage03/hadoop/uniffle-ss/rss_data # it's better to config thread num according to local disk num rss.server.flush.thread.alive 20 rss.server.flush.threadPool.size 20 rss.server.buffer.capacity 40gb rss.server.read.buffer.capacity 20gb rss.server.heartbeat.timeout 60000 rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000 rss.server.commit.timeout 600000 rss.server.app.expired.withoutHeartbeat 120000 # note: the default value of rss.server.flush.cold.storage.threshold.size is 64m # there will be no data written to DFS if set it as 100g even rss.storage.type=MEMORY_LOCALFILE_HDFS # please set proper value if DFS is used, eg, 64m, 128m. rss.server.flush.cold.storage.threshold.size 64m rss.server.health.check.enable true rss.server.disk.capacity 400gb ``` ### Uniffle Engine Configurations _No response_ ### Additional context _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@uniffle.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org