fpkgithub opened a new issue, #257:
URL: https://github.com/apache/incubator-uniffle/issues/257

   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [X] I have searched in the 
[issues](https://github.com/apache/incubator-uniffle/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### Describe the bug
   
   I run some hive sql , No data can be read from ShuffleServer when the reduce 
task number exceeds 1024, but sql result State is  SUCCEEDED . At the same 
time, the sql result count in the original way is inconsistent with that in the 
uniffle. The relevant logs and codes are as follows.
   
   ## Hive Sql
   
   
   
![image](https://user-images.githubusercontent.com/22984063/194813378-acaeb7da-9500-4676-8159-36b1666e1702.png)
   
   ## Reduce Task 
   reduce task Id <1024
   ```
   2022-10-09 21:22:25,121 INFO [main] org.apache.hadoop.mapred.ReduceTask: 
Using ShuffleConsumerPlugin: 
org.apache.hadoop.mapreduce.task.reduce.RssShuffle@533377b
   2022-10-09 21:22:28,447 INFO [main] 
org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient: 
GetInMemoryShuffleData from 10.156.174.33:19999 for 
appId[appattempt_1665305651155_5705_000001], shuffleId[0], partitionId[19] cost 
329 ms
   ...
   2022-10-09 21:22:28,876 INFO [main] 
org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap.Size=2293, 
pendingBlockIds.Size=0, bufferSegmentQueue.Size=0
   2022-10-09 21:22:28,878 INFO [main] 
org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], 
partitionId[19], read data cost 380 ms, copy data cost 0 ms, crc check cost 7 ms
   2022-10-09 21:22:28,878 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
2293 blocks [ hot:2293 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:22:28,878 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
971918 bytes [ hot:971918 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:22:28,878 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
1372670 uncompressed bytes [ hot:1372670 warm:0 cold:0 frozen:0 ]
   
   ```
   
   reduce task Id >= 1024
   ```
   2022-09-28 11:46:04,379 INFO [main] org.apache.hadoop.mapred.ReduceTask: 
Using ShuffleConsumerPlugin: 
org.apache.hadoop.mapreduce.task.reduce.RssShuffle@10fde30a
   2022-09-28 11:46:05,060 INFO [main] 
org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], 
partitionId[2003], read data cost 0 ms, copy data cost 0 ms, crc check cost 0 ms
   2022-09-28 11:46:05,061 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
0 blocks [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-09-28 11:46:05,061 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
0 bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-09-28 11:46:05,061 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
0 uncompressed bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   ```
   
   ## Client Code
   At ShuffleReadClientImpl#readShuffleBlockData, When  reduce task exceed 1024 
blockIdBitmap is empty
   
   ### org.apache.uniffle.client.impl.ShuffleReadClientImpl#readShuffleBlockData
   ```java
     @Override
     public CompressedShuffleBlock readShuffleBlockData() {
   
       LOG.info("blockIdBitmap.Size=" + blockIdBitmap.getLongCardinality() + ", 
pendingBlockIds.Size=" + pendingBlockIds.getLongCardinality()
               + ", bufferSegmentQueue.Size=" + bufferSegmentQueue.size());
   
       // empty data expected, just return null
       if (blockIdBitmap.isEmpty()) {
         LOG.info("blockIdBitmap empty , so return null ...");
         return null;
       }
   
       // All blocks are processed, so just return
       if (pendingBlockIds.isEmpty()) {
         return null;
       }
   
       // if need request new data from shuffle server
       if (bufferSegmentQueue.isEmpty()) {
         if (read() <= 0) {
           return null;
         }
       }
   
       // get next buffer segment
       BufferSegment bs = null;
   
       // blocks in bufferSegmentQueue may be from different partition in range 
partition mode,
       // or may be from speculation task, filter them and just read the 
necessary block
      
       ...
   
       // current segment hasn't data, try next segment
       return readShuffleBlockData();
     }
   ```
   
   ### log
   ```shell
   2022-10-09 21:23:21,530 INFO 
[org.apache.hadoop.mapreduce.TaskPauseMonitor$Monitor@376a312c] 
org.apache.hadoop.mapreduce.TaskPauseMonitor: Starting JVM pause monitor
   2022-10-09 21:23:21,539 INFO [main] org.apache.hadoop.mapred.ReduceTask: 
Using ShuffleConsumerPlugin: 
org.apache.hadoop.mapreduce.task.reduce.RssShuffle@3383649e
   2022-10-09 21:23:22,065 INFO [main] 
org.apache.uniffle.client.impl.ShuffleWriteClientImpl: 
getShuffleResult#appId=appattempt_1665305651155_5705_000001, shuffleId=0, 
partitionId=2003, shuffleServerInfoSet.Size=1
   2022-10-09 21:23:22,065 INFO [main] 
org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult#star ss 
getBlockIdBitmap=10.156.175.32
   2022-10-09 21:23:23,799 INFO [main] 
org.apache.uniffle.client.impl.ShuffleWriteClientImpl: 
getShuffleResult#10.156.175.32:ss get blockIdBitmapOfServer.Size=0
   2022-10-09 21:23:23,799 INFO [main] 
org.apache.uniffle.client.impl.ShuffleWriteClientImpl: getShuffleResult# break, 
ss=10.156.175.32
   2022-10-09 21:23:23,799 INFO [main] 
org.apache.uniffle.client.impl.ShuffleWriteClientImpl: 
getShuffleResult#blockIdBitmap.Size=0
   2022-10-09 21:23:23,915 INFO [main] 
org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap.Size=0, 
pendingBlockIds.Size=0, bufferSegmentQueue.Size=0
   2022-10-09 21:23:23,916 INFO [main] 
org.apache.uniffle.client.impl.ShuffleReadClientImpl: blockIdBitmap empty , so 
return null ...
   2022-10-09 21:23:23,916 INFO [main] 
org.apache.uniffle.client.impl.ShuffleReadClientImpl: Metrics for shuffleId[0], 
partitionId[2003], read data cost 0 ms, copy data cost 0 ms, crc check cost 0 ms
   2022-10-09 21:23:23,916 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
0 blocks [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:23:23,916 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
0 bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   2022-10-09 21:23:23,916 INFO [main] 
org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler: Client read 
0 uncompressed bytes [ hot:0 warm:0 cold:0 frozen:0 ]
   ```
   
   
   
   ## ShuffleServer 
   At ShuffleTaskManager#getBlockIdsByPartitionId  code logic, By adding log 
and code analysis
   - when reduce task Id(partition) less than 1024, requestPartitions can found 
contain blockId 
   - when reduce task Id(partition) exceed 1024,   requestPartitions cann't 
found contain blockId 
   
   ### org.apache.uniffle.server.ShuffleTaskManager#getBlockIdsByPartitionId
   ```java
     // filter the specific partition blockId in the bitmap to the resultBitmap
     protected Roaring64NavigableMap getBlockIdsByPartitionId(Set<Integer> 
requestPartitions,
         Roaring64NavigableMap bitmap, Roaring64NavigableMap resultBitmap) {
       LOG.info("getBlockIdsByPartitionId# start requestPartitions="+ 
requestPartitions.toString()+ ", bitmap.size=" + bitmap.getLongCardinality());
       int findPartitionIdOKCount = 0;
       int findPartitionIdFailCount = 0;
       int partitionNum = 
Integer.parseInt(requestPartitions.toString().replace("[", "").replace("]", 
""));
       LongIterator iter = bitmap.getLongIterator();
       long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
       while (iter.hasNext()) {
         long blockId = iter.next();
         int partitionId = Math.toIntExact((blockId >> 
Constants.TASK_ATTEMPT_ID_MAX_LENGTH) & mask);
         if (requestPartitions.contains(partitionId)) {
           resultBitmap.addLong(blockId);
           findPartitionIdOKCount++;
         }else{
           findPartitionIdFailCount++;
         }
       }
       LOG.info("getBlockIdsByPartitionId# end requestPartitions="+ 
requestPartitions.toString()
               + ", resultBitmap.size=" + resultBitmap.getLongCardinality()
               + ", findPartitionIdOKCount="+ findPartitionIdOKCount
               + ", findPartitionIdFailCount=" + findPartitionIdFailCount);
       return resultBitmap;
     }
   
   ```
   
   ### log
   
   - requestPartitions=[74]  return successful
   ```
   ## getShuffleResult
   [INFO] 2022-10-09 21:22:26,035 Grpc-512 ShuffleTaskManager 
getBlockIdsByPartitionId - getBlockIdsByPartitionId# start 
requestPartitions=[74], bitmap.size=1546249
   [INFO] 2022-10-09 21:22:26,063 Grpc-512 ShuffleTaskManager 
getBlockIdsByPartitionId - getBlockIdsByPartitionId# end 
requestPartitions=[74], resultBitmap.size=2292, findPartitionIdOKCount=2292, 
findPartitionIdFailCount=1543957
   
   ## getMemoryShuffleData
   [INFO] 2022-10-09 21:22:26,465 Grpc-464 ShuffleServerGrpcService 
getMemoryShuffleData - Successfully getInMemoryShuffleData cost 1 ms with 
962828 bytes shuffle data for appId[appattempt_1665305651155_5705_000001], 
shuffleId[0], partitionId[74]
   ```
   
   - requestPartitions=[2003] return failed
   ``` 
   [INFO] 2022-10-09 21:23:23,728 Grpc-1478 ShuffleTaskManager 
getBlockIdsByPartitionId - getBlockIdsByPartitionId# start 
requestPartitions=[2003], bitmap.size=1546249
   [INFO] 2022-10-09 21:23:23,785 Grpc-1478 ShuffleTaskManager 
getBlockIdsByPartitionId - getBlockIdsByPartitionId# end 
requestPartitions=[2003], resultBitmap.size=0, findPartitionIdOKCount=0, 
findPartitionIdFailC
   ount=1546249
   ```
   
   
   
   
   
   ### Affects Version(s)
   
   0.7.0-snapshot
   
   ### Uniffle Server Log Output
   
   _No response_
   
   ### Uniffle Engine Log Output
   
   _No response_
   
   ### Uniffle Server Configurations
   
   ```yaml
   ## shuffle server config
   rss.rpc.server.port 19999
   rss.jetty.http.port 19998
   rss.rpc.executor.size 2000
   # it should be configed the same as in coordinator
   rss.storage.type MEMORY_LOCALFILE_HDFS
    
   rss.coordinator.quorum 
coordinator-01:19999,coordinator-02:19999,coordinator-03:19999
    
   # local storage path for shuffle server
   rss.storage.basePath 
/mnt/storage00/hadoop/uniffle-ss/rss_data,/mnt/storage01/hadoop/uniffle-ss/rss_data,/mnt/storage02/hadoop/uniffle-ss/rss_data,/mnt/storage03/hadoop/uniffle-ss/rss_data
   # it's better to config thread num according to local disk num
   rss.server.flush.thread.alive 20 
   rss.server.flush.threadPool.size 20
   rss.server.buffer.capacity 40gb
   rss.server.read.buffer.capacity 20gb
   rss.server.heartbeat.timeout 60000
   rss.server.heartbeat.interval 10000
   rss.rpc.message.max.size 1073741824
   rss.server.preAllocation.expired 120000
   rss.server.commit.timeout 600000
   rss.server.app.expired.withoutHeartbeat 120000
   # note: the default value of rss.server.flush.cold.storage.threshold.size is 
64m
   # there will be no data written to DFS if set it as 100g even 
rss.storage.type=MEMORY_LOCALFILE_HDFS
   # please set proper value if DFS is used, eg, 64m, 128m.
   rss.server.flush.cold.storage.threshold.size 64m
    
   rss.server.health.check.enable true
   rss.server.disk.capacity 400gb
   ```
   
   
   ### Uniffle Engine Configurations
   
   _No response_
   
   ### Additional context
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@uniffle.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to