tsreaper edited a comment on pull request #17520:
URL: https://github.com/apache/flink/pull/17520#issuecomment-959002265


   @slinkydeveloper @fapaul @JingGe
   
   I've done some benchmarking on a testing yarn cluster.
   
   * Test data: The [Kaggle flight delay 
data](https://www.kaggle.com/usdot/flight-delays), a ~500MB csv file. I've 
changed it into an avro file with xz compression and 64kb or 2mb block size.
   * Number of task slots: 8
   * Number of task manager: 1
   * Configuration
   ```yaml
   # common JVM configurations used in a lot of our production job, also for 
producing the TPCDS benchmark result in Flink 1.12
   env.java.opts.jobmanager: -XX:+UseParNewGC 
-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 
-verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC 
-XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 
-XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
-XX:SurvivorRatio=5 -XX:ParallelGCThreads=4
   env.java.opts.taskmanager: -XX:+UseParNewGC 
-XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 
-verbose:gc -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC 
-XX:+UseCMSCompactAtFullCollection -XX:CMSMaxAbortablePrecleanTime=1000 
-XX:+CMSClassUnloadingEnabled -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
-XX:SurvivorRatio=5 -XX:ParallelGCThreads=4
   
   # default memory configuration of Flink
   jobmanager.memory.process.size: 1600m
   taskmanager.memory.process.size: 1728m
   ```
   
   I've tested three implementations.
   * Bulk Format + Lazy Reading: This is the implementation of this PR.
   * Bulk Format + ArrayList: This implementation reads and deserialize all 
records of the whole block into an array list and send it to the reader thread. 
This implementation does not have a blocking pool as @JingGe suggested. See 
[here](https://github.com/tsreaper/flink/commit/3b86337cea499cd4245a34550a6b597239be3066)
 for code.
   * Stream Format: This is the implementation based on Stephan's 
[draft](https://github.com/apache/flink/commit/11c606096f6beeac45c4f4dabe0fde93cc91923d#diff-edfd2d187d920f781382054f22fb4e6e5b5d9361b95a87ebeda68ba3a49d5a55R51).
 See 
[here](https://github.com/tsreaper/flink/commit/6b3a65fd099fcffb4d7a5b20c9bde9aeace18f69)
 for code. I didn't implement projection pushdown for this but it should be 
fine because there is no projection pushdown in the benchmark.
   
   Here are the test results. I've run a very simple job which reads from the 
avro file and writes directly into a blackhole sink. The time in the table is 
the execution time of the only task, excluding the scheduling time.
   
   ||xz compression, 64kb block size|xz compression, 2mb block size|
   |---|---|---|
   |bulk format + lazy reading|14s|10s|
   |bulk format + array list|14s|30s, due to GC, sometimes out of memory|
   |stream format|2m24s, due to heavy GC|51s, due to GC, sometimes out of 
memory|
   
   It is obvious that any implementation which loads all records of a block 
into memory at once will suffer from GC more or less. Also for smaller block 
sizes, blocking pool has almost no impact on performance. So I would say the 
implementation in this PR is the most suited implementation so far.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to