[
https://issues.apache.org/jira/browse/FLINK-18433?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17146650#comment-17146650
]
Arvid Heise edited comment on FLINK-18433 at 6/27/20, 9:12 AM:
---------------------------------------------------------------
Hi [~liyu], thanks for the update - I feared as much.
Without further information, I did run a particular comparison that may or may
not help.
I picked TwoInputs_KeyBy_Eager_AtLeastOnce_100_heap as it had the biggest
regression (-17.31%) and ran some tests on a single m5ad.2xlarge (with SSD, but
state backend is heap). I built a flink-dist from release-1.10 and release-1.11.
Since there are no built-in evaluation metrics, I just used {{time}}. To reduce
the impact of cluster setup and to really see if it's related to heap state
backend or network stack, I simply executed on a local executor who took the
full 8 cores and I gave it 5gb RAM (job doesn't need much and I wanted to avoid
too much allocation overhead).
Full commands for reference:
{noformat}
time java -Xmx5g -Dlog.file=flink_10.log
-Dlog4j.configuration=file:///`pwd`/flink-1.10/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf/log4j.properties
-cp flink-basic-operations_2.11-1.10-SNAPSHOT.jar:"${flink_10[*]}"
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000
--checkpointInterval 100 --checkpointPath file:///home/ec2-user/spda/checkpoints
time java -Xmx5g -Dlog.file=`pwd`/flink_11.log
-Dlog4j.configurationFile=file:///`pwd`/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/conf/log4j.properties
-cp flink-basic-operations_2.11-1.11-SNAPSHOT.jar:"${flink_11[*]}"
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000
--checkpointInterval 100 --checkpointPath
file:///home/ec2-user/spda/checkpoints
--execution.checkpointing.tolerable-failed-checkpoints 1000
{noformat}
I modified the test job to compile and to create a local executor forwarding
the parameters to configuration (more on that later).
I ran these commands interleaved for a few hours and got [this
sheet|https://docs.google.com/spreadsheets/d/1NPoDQakQu1apdzWZfxD2IoRgo28MpBfD9s4K9Aq9nA4/edit?usp=sharing].
On average, we have
Flink 1.10 01m59s
Flink 1.11 01m50s
Note that less is better in this case as we measure the time needed to process
1m elements.
So TL;DR in this particular benchmark setup, it rather looks like performance
actually improved. Note that DOP=8 is higher than what [~Aihua] used. Assuming
that both benchmarks are okay I see 3 options to explain them.
-# We may have a regression on local input channels, but an improvement for
remote input channels. Since, remote input channels are usually the bottleneck,
I'd say this is rather good, but ideally we can still remove the regression
while keeping the improvement.- (just realized that we probably don't have a
single LocalInputChannel with this configuration)
# Memory management in 1.11 works differently/incorrectly. My test excludes the
memory management on TM/JM level, so that may be the root cause for the
original regression.
# I experienced restarts due to failed checkpoints in the end. My first
impression was that when the job is about to be finished may cause some
in-progress checkpoints to be canceled which is propagated to checkpoint
coordinator, which ultimately restarts the job because by default no checkpoint
is allowed to fail. In my final setup, I ignored these errors, but it is
obvious that any restart would impact the performance tremendously. In my
setup, I even ran in some kind of live lock for 1m records (100k records didn't
suffer from these issues oddly).
I'm attaching a log that shows this live lock. [~roman_khachatryan]
investigated but couldn't find anything suspicious.
The key errors are
{noformat}
2020-06-26 14:53:09,662 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_252]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_252]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_252]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
2020-06-26 14:53:09,682 DEBUG
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] -
Notification of aborted checkpoint for task Flat Map (1/1)
2020-06-26 14:53:09,682 DEBUG
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Cleanup
AsyncCheckpointRunnable for checkpoint 1 of Flat Map (1/1).
2020-06-26 14:53:09,683 DEBUG
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map
(1/1) - asynchronous part of checkpoint 1 could not be completed.
java.util.concurrent.CancellationException: null
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
~[?:1.8.0_252]
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
~[?:1.8.0_252]
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102)
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
{noformat}
maybe [~pnowojski] or [~zjwang] have more ideas. [^flink_11.log.gz]
was (Author: aheise):
Hi [~liyu], thanks for the update - I feared as much.
Without further information, I did run a particular comparison that may or may
not help.
I picked TwoInputs_KeyBy_Eager_AtLeastOnce_100_heap as it had the biggest
regression (-17.31%) and ran some tests on a single m5ad.2xlarge (with SSD, but
state backend is heap). I built a flink-dist from release-1.10 and release-1.11.
Since there are no built-in evaluation metrics, I just used {{time}}. To reduce
the impact of cluster setup and to really see if it's related to heap state
backend or network stack, I simply executed on a local executor who took the
full 8 cores and I gave it 5gb RAM (job doesn't need much and I wanted to avoid
too much allocation overhead).
Full commands for reference:
{noformat}
time java -Xmx5g -Dlog.file=flink_10.log
-Dlog4j.configuration=file:///`pwd`/flink-1.10/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf/log4j.properties
-cp flink-basic-operations_2.11-1.10-SNAPSHOT.jar:"${flink_10[*]}"
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000
--checkpointInterval 100 --checkpointPath file:///home/ec2-user/spda/checkpoints
time java -Xmx5g -Dlog.file=`pwd`/flink_11.log
-Dlog4j.configurationFile=file:///`pwd`/flink/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT/conf/log4j.properties
-cp flink-basic-operations_2.11-1.11-SNAPSHOT.jar:"${flink_11[*]}"
org.apache.flink.basic.operations.PerformanceTestJob --topologyName TwoInputs
--LogicalAttributesofEdges KeyBy --ScheduleMode LazyFromSource --CheckpointMode
AtLeastOnce --recordSize 100 --stateBackend heap --maxCount 1000000
--checkpointInterval 100 --checkpointPath
file:///home/ec2-user/spda/checkpoints
--execution.checkpointing.tolerable-failed-checkpoints 1000
{noformat}
I modified the test job to compile and to create a local executor forwarding
the parameters to configuration (more on that later).
I ran these commands interleaved for a few hours and got [this
sheet|https://docs.google.com/spreadsheets/d/1NPoDQakQu1apdzWZfxD2IoRgo28MpBfD9s4K9Aq9nA4/edit?usp=sharing].
On average, we have
Flink 1.10 01m59s
Flink 1.11 01m50s
Note that less is better in this case as we measure the time needed to process
1m elements.
So TL;DR in this particular benchmark setup, it rather looks like performance
actually improved. Note that DOP=8 is higher than what [~Aihua] used. Assuming
that both benchmarks are okay I see 3 options to explain them.
# We may have a regression on local input channels, but an improvement for
remote input channels. Since, remote input channels are usually the bottleneck,
I'd say this is rather good, but ideally we can still remove the regression
while keeping the improvement.
# Memory management in 1.11 works differently/incorrectly. My test excludes the
memory management on TM/JM level, so that may be the root cause for the
original regression.
# I experienced restarts due to failed checkpoints in the end. My first
impression was that when the job is about to be finished may cause some
in-progress checkpoints to be canceled which is propagated to checkpoint
coordinator, which ultimately restarts the job because by default no checkpoint
is allowed to fail. In my final setup, I ignored these errors, but it is
obvious that any restart would impact the performance tremendously. In my
setup, I even ran in some kind of live lock for 1m records (100k records didn't
suffer from these issues oddly).
I'm attaching a log that shows this live lock. [~roman_khachatryan]
investigated but couldn't find anything suspicious.
The key errors are
{noformat}
2020-06-26 14:53:09,662 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
failure threshold.
at
org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:66)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1626)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1603)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:90)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:1736)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_252]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_252]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_252]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_252]
2020-06-26 14:53:09,682 DEBUG
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] -
Notification of aborted checkpoint for task Flat Map (1/1)
2020-06-26 14:53:09,682 DEBUG
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Cleanup
AsyncCheckpointRunnable for checkpoint 1 of Flat Map (1/1).
2020-06-26 14:53:09,683 DEBUG
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Flat Map
(1/1) - asynchronous part of checkpoint 1 could not be completed.
java.util.concurrent.CancellationException: null
at java.util.concurrent.FutureTask.report(FutureTask.java:121)
~[?:1.8.0_252]
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
~[?:1.8.0_252]
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:479)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:50)
~[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable.run(AsyncCheckpointRunnable.java:102)
[flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_252]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_252]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_252]
{noformat}
maybe [~pnowojski] or [~zjwang] have more ideas. [^flink_11.log.gz]
> From the end-to-end performance test results, 1.11 has a regression
> -------------------------------------------------------------------
>
> Key: FLINK-18433
> URL: https://issues.apache.org/jira/browse/FLINK-18433
> Project: Flink
> Issue Type: Bug
> Components: API / Core, API / DataStream
> Affects Versions: 1.11.0
> Environment: 3 machines
> [|https://github.com/Li-Aihua/flink/blob/test_suite_for_basic_operations_1.11/flink-end-to-end-perf-tests/flink-basic-operations/src/main/java/org/apache/flink/basic/operations/PerformanceTestJob.java]
> Reporter: Aihua Li
> Priority: Major
> Attachments: flink_11.log.gz
>
>
>
> I ran end-to-end performance tests between the Release-1.10 and Release-1.11.
> the results were as follows:
> |scenarioName|release-1.10|release-1.11| |
> |OneInput_Broadcast_LazyFromSource_ExactlyOnce_10_rocksdb|46.175|43.81333333|-5.11%|
> |OneInput_Rescale_LazyFromSource_ExactlyOnce_100_heap|211.835|200.355|-5.42%|
> |OneInput_Rebalance_LazyFromSource_ExactlyOnce_1024_rocksdb|1721.041667|1618.323333|-5.97%|
> |OneInput_KeyBy_LazyFromSource_ExactlyOnce_10_heap|46|43.615|-5.18%|
> |OneInput_Broadcast_Eager_ExactlyOnce_100_rocksdb|212.105|199.6883333|-5.85%|
> |OneInput_Rescale_Eager_ExactlyOnce_1024_heap|1754.64|1600.123333|-8.81%|
> |OneInput_Rebalance_Eager_ExactlyOnce_10_rocksdb|45.91666667|43.09833333|-6.14%|
> |OneInput_KeyBy_Eager_ExactlyOnce_100_heap|212.0816667|200.7266667|-5.35%|
> |OneInput_Broadcast_LazyFromSource_AtLeastOnce_1024_rocksdb|1718.245|1614.381667|-6.04%|
> |OneInput_Rescale_LazyFromSource_AtLeastOnce_10_heap|46.12|43.55166667|-5.57%|
> |OneInput_Rebalance_LazyFromSource_AtLeastOnce_100_rocksdb|212.0383333|200.3883333|-5.49%|
> |OneInput_KeyBy_LazyFromSource_AtLeastOnce_1024_heap|1762.048333|1606.408333|-8.83%|
> |OneInput_Broadcast_Eager_AtLeastOnce_10_rocksdb|46.05833333|43.49666667|-5.56%|
> |OneInput_Rescale_Eager_AtLeastOnce_100_heap|212.2333333|201.1883333|-5.20%|
> |OneInput_Rebalance_Eager_AtLeastOnce_1024_rocksdb|1720.663333|1616.85|-6.03%|
> |OneInput_KeyBy_Eager_AtLeastOnce_10_heap|46.14|43.62333333|-5.45%|
> |TwoInputs_Broadcast_LazyFromSource_ExactlyOnce_100_rocksdb|156.9183333|152.9566667|-2.52%|
> |TwoInputs_Rescale_LazyFromSource_ExactlyOnce_1024_heap|1415.511667|1300.1|-8.15%|
> |TwoInputs_Rebalance_LazyFromSource_ExactlyOnce_10_rocksdb|34.29666667|34.16666667|-0.38%|
> |TwoInputs_KeyBy_LazyFromSource_ExactlyOnce_100_heap|158.3533333|151.8483333|-4.11%|
> |TwoInputs_Broadcast_Eager_ExactlyOnce_1024_rocksdb|1373.406667|1300.056667|-5.34%|
> |TwoInputs_Rescale_Eager_ExactlyOnce_10_heap|34.57166667|32.09666667|-7.16%|
> |TwoInputs_Rebalance_Eager_ExactlyOnce_100_rocksdb|158.655|147.44|-7.07%|
> |TwoInputs_KeyBy_Eager_ExactlyOnce_1024_heap|1356.611667|1292.386667|-4.73%|
> |TwoInputs_Broadcast_LazyFromSource_AtLeastOnce_10_rocksdb|34.01|33.205|-2.37%|
> |TwoInputs_Rescale_LazyFromSource_AtLeastOnce_100_heap|149.5883333|145.9966667|-2.40%|
> |TwoInputs_Rebalance_LazyFromSource_AtLeastOnce_1024_rocksdb|1359.74|1299.156667|-4.46%|
> |TwoInputs_KeyBy_LazyFromSource_AtLeastOnce_10_heap|34.025|29.68333333|-12.76%|
> |TwoInputs_Broadcast_Eager_AtLeastOnce_100_rocksdb|157.3033333|151.4616667|-3.71%|
> |TwoInputs_Rescale_Eager_AtLeastOnce_1024_heap|1368.74|1293.238333|-5.52%|
> |TwoInputs_Rebalance_Eager_AtLeastOnce_10_rocksdb|34.325|33.285|-3.03%|
> |TwoInputs_KeyBy_Eager_AtLeastOnce_100_heap|162.5116667|134.375|-17.31%|
> It can be seen that the performance of 1.11 has a regression, basically
> around 5%, and the maximum regression is 17%. This needs to be checked.
> the test code:
> flink-1.10.0:
> [https://github.com/Li-Aihua/flink/blob/test_suite_for_basic_operations/flink-end-to-end-perf-tests/flink-basic-operations/src/main/java/org/apache/flink/basic/operations/PerformanceTestJob.java]
> flink-1.11.0:
> [https://github.com/Li-Aihua/flink/blob/test_suite_for_basic_operations_1.11/flink-end-to-end-perf-tests/flink-basic-operations/src/main/java/org/apache/flink/basic/operations/PerformanceTestJob.java]
> commit cmd like tis:
> bin/flink run -d -m 192.168.39.246:8081 -c
> org.apache.flink.basic.operations.PerformanceTestJob
> /home/admin/flink-basic-operations_2.11-1.10-SNAPSHOT.jar --topologyName
> OneInput --LogicalAttributesofEdges Broadcast --ScheduleMode LazyFromSource
> --CheckpointMode ExactlyOnce --recordSize 10 --stateBackend rocksdb
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)