huyuanfeng2018 commented on issue #7393:
URL: https://github.com/apache/iceberg/issues/7393#issuecomment-1521084658

   1. Regarding the 4x slowdown, this is indeed a four-fold decrease in 
throughput. As a user, I am more concerned about the throughput under the same 
resources, so this does not mean that the CPU consumption is increased 4x.
   
   2. Regarding the custom range partitioner, I implemented a custom range 
partitioner myself. As described in my previous reply, we will specify the 
proportion of each enumeration partition to the total data volume before the 
task starts (distribution-balance-column- ratio), and then use the custom range 
partitioner to calculate which streamWrite subtasks the data of each 
enumeration type is distributed to. According to #6382 and #7269, I then 
realized the dynamic adjustment of the total data of each enumeration partition 
based on each checkpoint statistics The proportion of 
volume(distribution-balance-column- ratio), so as to achieve the purpose of 
dynamic allocation, from the metric on the Flink ui, my data shuffling is very 
uniform, from the monitoring point of view, the cpu occupancy rate of all my tm 
is 100%, the following two pictures These are some of my monitoring data at 
this time
   <img width="623" alt="image" 
src="https://user-images.githubusercontent.com/40817998/234160124-aeb88a2d-953c-45be-a299-fc5fb4189c78.png";>
   
   <img width="1093" alt="image" 
src="https://user-images.githubusercontent.com/40817998/234160387-baeaddcd-1697-4b5d-9d0c-22ba8a915257.png";>
   3. In fact, when I doubled the parallelism, I found that my processing 
efficiency also doubled like this, so it can be determined with a high 
probability that it is the bottleneck of the cpu rather than the network io:
   <img width="1364" alt="image" 
src="https://user-images.githubusercontent.com/40817998/234161033-47b26d7d-81a0-4205-948c-f0544d32c203.png";>
   
   4. In fact, there is one thing I have doubts about. Why doesn't the 
DataStatisticsOrRecord object directly implement Flink Rowdata? Because the 
generic type here has been determined to be Rowdata when using Flink to write
   ```
   public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData>
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to