Hi Stephan,
sorry for late response.
We indeed use timers inside a KeyedProcessFunction but the triggers of the
timers are kinda evenly distributed, so not causing a firing storm.
We have a custom ttl logic which is used by the deletion timer to decide
whether delete a record from inmemory state or not.
Can you maybe give some links to those changes in the RocksDB?

Thanks in advance,
Bekir Oguz

On Fri, 30 Aug 2019 at 14:56, Stephan Ewen <se...@apache.org> wrote:

> Hi all!
>
> A thought would be that this has something to do with timers. Does the
> task with that behavior use timers (windows, or process function)?
>
> If that is the case, some theories to check:
>   - Could it be a timer firing storm coinciding with a checkpoint?
> Currently, that storm synchronously fires, checkpoints cannot preempt that,
> which should change in 1.10 with the new mailbox model.
>   - Could the timer-async checkpointing changes have something to do with
> that? Does some of the usually small "preparation work" (happening
> synchronously) lead to an unwanted effect?
>   - Are you using TTL for state in that operator?
>   - There were some changes made to support timers in RocksDB recently.
> Could that have something to do with it?
>
> Best,
> Stephan
>
>
> On Fri, Aug 30, 2019 at 2:45 PM Congxian Qiu <qcx978132...@gmail.com>
> wrote:
>
>> CC flink dev mail list
>> Update for those who may be interested in this issue, we'are still
>> diagnosing this problem currently.
>>
>> Best,
>> Congxian
>>
>>
>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月29日周四 下午8:58写道:
>>
>> > Hi Bekir
>> >
>> > Currently, from what we have diagnosed, there is some task complete its
>> > checkpoint too late (maybe 15 mins), but we checked the kafka broker log
>> > and did not find any interesting things there. could we run another job,
>> > that did not commit offset to kafka, this wants to check if it is the
>> > "commit offset to kafka" step consumes too much time.
>> >
>> > Best,
>> > Congxian
>> >
>> >
>> > Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月28日周三 下午4:19写道:
>> >
>> >> Hi Congxian,
>> >> sorry for the late reply, but no progress on this issue yet. I checked
>> >> also the kafka broker logs, found nothing interesting there.
>> >> And we still have 15 min duration checkpoints quite often. Any more
>> ideas
>> >> on where to look at?
>> >>
>> >> Regards,
>> >> Bekir
>> >>
>> >> On Fri, 23 Aug 2019 at 12:12, Congxian Qiu <qcx978132...@gmail.com>
>> >> wrote:
>> >>
>> >>> Hi Bekir
>> >>>
>> >>> Do you come back to work now, does there any more findings of this
>> >>> problem?
>> >>>
>> >>> Best,
>> >>> Congxian
>> >>>
>> >>>
>> >>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月13日周二 下午4:39写道:
>> >>>
>> >>>> Hi Congxian,
>> >>>> Thanks for following up this issue. It is still unresolved and I am
>> on
>> >>>> vacation at the moment.  Hopefully my collegues Niels and Vlad can
>> spare
>> >>>> some time to look into this.
>> >>>>
>> >>>> @Niels, @Vlad: do you guys also think that this issue might be Kafka
>> >>>> related? We could also check kafka broker logs at the time of long
>> >>>> checkpointing.
>> >>>>
>> >>>> Thanks,
>> >>>> Bekir
>> >>>>
>> >>>> Verstuurd vanaf mijn iPhone
>> >>>>
>> >>>> Op 12 aug. 2019 om 15:18 heeft Congxian Qiu <qcx978132...@gmail.com>
>> >>>> het volgende geschreven:
>> >>>>
>> >>>> Hi Bekir
>> >>>>
>> >>>> Is there any progress about this problem?
>> >>>>
>> >>>> Best,
>> >>>> Congxian
>> >>>>
>> >>>>
>> >>>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月8日周四 下午11:17写道:
>> >>>>
>> >>>>> hi Bekir
>> >>>>> Thanks for the information.
>> >>>>>
>> >>>>> - Source's checkpoint was triggered by RPC calls, so it has the
>> >>>>> "Trigger checkpoint xxx" log,
>> >>>>> - other task's checkpoint was triggered after received all the
>> barrier
>> >>>>> of upstreams, it didn't log the "Trigger checkpoint XXX" :(
>> >>>>>
>> >>>>> Your diagnose makes sense to me, we need to check the Kafka log.
>> >>>>> I also find out that we always have a log like
>> >>>>> "org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>> Marking
>> >>>>> the coordinator 172.19.200.73:9092 (id: 2147483646 rack: null) dead
>> >>>>> for group userprofileaggregator
>> >>>>> 2019-08-06 13:58:49,872 DEBUG
>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask           -
>> Notifica",
>> >>>>>
>> >>>>> I checked the doc of kafka[1], only find that the default of `
>> >>>>> transaction.max.timeout.ms` is 15 min
>> >>>>>
>> >>>>> Please let me know there you have any finds. thanks
>> >>>>>
>> >>>>> PS: maybe you can also checkpoint the log for task
>> >>>>> `d0aa98767c852c97ae8faf70a54241e3`, JM received its ack message
>> late also.
>> >>>>>
>> >>>>> [1] https://kafka.apache.org/documentation/
>> >>>>> Best,
>> >>>>> Congxian
>> >>>>>
>> >>>>>
>> >>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月7日周三 下午6:48写道:
>> >>>>>
>> >>>>>> Hi Congxian,
>> >>>>>> Thanks for checking the logs. What I see from the logs is:
>> >>>>>>
>> >>>>>> - For the tasks like "Source:
>> >>>>>> profileservice-snowplow-clean-events_kafka_source -> Filter” {17,
>> 27, 31,
>> >>>>>> 33, 34} / 70 : We have the ’Triggering checkpoint’ and also
>> ‘Confirm
>> >>>>>> checkpoint’ log lines, with 15 mins delay in between.
>> >>>>>> - For the tasks like “KeyedProcess -> (Sink:
>> >>>>>> profileservice-userprofiles_kafka_sink, Sink:
>> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink:
>> >>>>>> profileservice-profiledeletion_kafka_sink” {1,2,3,4,5}/70 : We DO
>> NOT have
>> >>>>>> the “Triggering checkpoint” log, but only the ‘Confirm checkpoint’
>> lines.
>> >>>>>>
>> >>>>>> And as a final point, we ALWAYS have Kafka AbstractCoordinator logs
>> >>>>>> about lost connection to Kafka at the same time we have the
>> checkpoints
>> >>>>>> confirmed. This 15 minutes delay might be because of some timeout
>> at the
>> >>>>>> Kafka client (maybe 15 mins timeout), and then marking kafka
>> coordinator
>> >>>>>> dead, and then discovering kafka coordinator again.
>> >>>>>>
>> >>>>>> If the kafka connection is IDLE during 15 mins, Flink cannot
>> confirm
>> >>>>>> the checkpoints, cannot send the async offset commit request to
>> Kafka. This
>> >>>>>> could be the root cause of the problem. Please see the attached
>> logs
>> >>>>>> filtered on the Kafka AbstractCoordinator. Every time we have a 15
>> minutes
>> >>>>>> checkpoint, we have this kafka issue. (Happened today at 9:14 and
>> 9:52)
>> >>>>>>
>> >>>>>>
>> >>>>>> I will enable Kafka DEBUG logging to see more and let you know
>> about
>> >>>>>> the findings.
>> >>>>>>
>> >>>>>> Thanks a lot for your support,
>> >>>>>> Bekir Oguz
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>> Op 7 aug. 2019, om 12:06 heeft Congxian Qiu <
>> qcx978132...@gmail.com>
>> >>>>>> het volgende geschreven:
>> >>>>>>
>> >>>>>> Hi
>> >>>>>>
>> >>>>>> Received all the files, as a first glance, the program uses at
>> least
>> >>>>>> once checkpoint mode, from the tm log, maybe we need to check
>> checkpoint of
>> >>>>>> this operator "Invoking async call Checkpoint Confirmation for
>> KeyedProcess
>> >>>>>> -> (Sink: profileservice-userprofiles_kafka_sink, Sink:
>> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink:
>> >>>>>> profileservice-profiledeletion_kafka_sink) (5/70) on task
>> KeyedProcess ->
>> >>>>>> (Sink: profileservice-userprofiles_kafka_sink, Sink:
>> >>>>>> profileservice-userprofiles_kafka_deletion_marker, Sink:
>> >>>>>> profileservice-profiledeletion_kafka_sink) (5/70)",
>> >>>>>>
>> >>>>>> Seems it took too long to complete the checkpoint (maybe something
>> >>>>>> about itself, or maybe something of Kafka). I'll go through the
>> logs
>> >>>>>> carefully today or tomorrow again.
>> >>>>>>
>> >>>>>> Best,
>> >>>>>> Congxian
>> >>>>>>
>> >>>>>>
>> >>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月6日周二 下午10:38写道:
>> >>>>>>
>> >>>>>>> Ok, I am removing apache dev group from CC.
>> >>>>>>> Only sending to you and my colleagues.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Bekir
>> >>>>>>>
>> >>>>>>> Op 6 aug. 2019, om 17:33 heeft Bekir Oguz <
>> bekir.o...@persgroep.net>
>> >>>>>>> het volgende geschreven:
>> >>>>>>>
>> >>>>>>> Hi Congxian,
>> >>>>>>> Previous email didn’t work out due to size limits.
>> >>>>>>> I am sending you only job manager log zipped, and will send other
>> >>>>>>> info in separate email.
>> >>>>>>> <jobmanager_sb77v.log.zip>
>> >>>>>>> Regards,
>> >>>>>>> Bekir
>> >>>>>>>
>> >>>>>>> Op 2 aug. 2019, om 16:37 heeft Congxian Qiu <
>> qcx978132...@gmail.com>
>> >>>>>>> het volgende geschreven:
>> >>>>>>>
>> >>>>>>> Hi Bekir
>> >>>>>>>
>> >>>>>>> Cloud you please also share the below information:
>> >>>>>>> - jobmanager.log
>> >>>>>>> - taskmanager.log(with debug info enabled) for the problematic
>> >>>>>>> subtask.
>> >>>>>>> - the DAG of your program (if can provide the skeleton program is
>> >>>>>>> better -- can send to me privately)
>> >>>>>>>
>> >>>>>>> For the subIndex, maybe we can use the deploy log message in
>> >>>>>>> jobmanager log to identify which subtask we want. For example in
>> JM log,
>> >>>>>>> we'll have something like "2019-08-02 11:38:47,291 INFO
>> >>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph -
>> Deploying Source:
>> >>>>>>> Custom Source (2/2) (attempt #0) to
>> >>>>>>> container_e62_1551952890130_2071_01_000002 @ aa.bb.cc.dd.ee
>> >>>>>>> (dataPort=39488)" then we know "Custum Source (2/2)" was deplyed
>> to "
>> >>>>>>> aa.bb.cc.dd.ee" with port 39488. Sadly, there maybe still more
>> than
>> >>>>>>> one subtasks in one contain :(
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Congxian
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月2日周五 下午4:22写道:
>> >>>>>>>
>> >>>>>>>> Forgot to add the checkpoint details after it was complete. This
>> is
>> >>>>>>>> for that long running checkpoint with id 95632.
>> >>>>>>>>
>> >>>>>>>> <PastedGraphic-5.png>
>> >>>>>>>>
>> >>>>>>>> Op 2 aug. 2019, om 11:18 heeft Bekir Oguz <
>> bekir.o...@persgroep.net>
>> >>>>>>>> het volgende geschreven:
>> >>>>>>>>
>> >>>>>>>> Hi Congxian,
>> >>>>>>>> I was able to fetch the logs of the task manager (attached) and
>> the
>> >>>>>>>> screenshots of the latest long checkpoint. I will get the logs
>> of the job
>> >>>>>>>> manager for the next long running checkpoint. And also I will
>> try to get a
>> >>>>>>>> jstack during the long running checkpoint.
>> >>>>>>>>
>> >>>>>>>> Note: Since at the Subtasks tab we do not have the subtask
>> numbers,
>> >>>>>>>> and at the Details tab of the checkpoint, we have the subtask
>> numbers but
>> >>>>>>>> not the task manager hosts, it is difficult to match those.
>> We’re assuming
>> >>>>>>>> they have the same order, so seeing that 3rd subtask is failing,
>> I am
>> >>>>>>>> getting the 3rd line at the Subtasks tab which leads to the task
>> manager
>> >>>>>>>> host flink-taskmanager-84ccd5bddf-2cbxn. ***It would be a great
>> feature if
>> >>>>>>>> you guys also include the subtask-id’s to the Subtasks view.***
>> >>>>>>>>
>> >>>>>>>> Note: timestamps in the task manager log are in UTC and I am at
>> the
>> >>>>>>>> moment at zone UTC+3, so the time 10:30 at the screenshot
>> matches the time
>> >>>>>>>> 7:30 in the log.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Kind regards,
>> >>>>>>>> Bekir
>> >>>>>>>>
>> >>>>>>>> <task_manager.log>
>> >>>>>>>>
>> >>>>>>>> <PastedGraphic-4.png>
>> >>>>>>>> <PastedGraphic-3.png>
>> >>>>>>>> <PastedGraphic-2.png>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Op 2 aug. 2019, om 07:23 heeft Congxian Qiu <
>> qcx978132...@gmail.com>
>> >>>>>>>> het volgende geschreven:
>> >>>>>>>>
>> >>>>>>>> Hi Bekir
>> >>>>>>>> I’ll first summary the problem here(please correct me if I’m
>> wrong)
>> >>>>>>>> 1. The same program runs on 1.6 never encounter such problems
>> >>>>>>>> 2. Some checkpoints completed too long (15+ min), but other
>> normal
>> >>>>>>>> checkpoints complete less than 1 min
>> >>>>>>>> 3. Some  bad checkpoint will have a large sync time, async time
>> >>>>>>>> seems ok
>> >>>>>>>> 4. Some bad checkpoint, the e2e duration will much bigger than
>> >>>>>>>> (sync_time + async_time)
>> >>>>>>>> First, answer the last question, the e2e duration is ack_time -
>> >>>>>>>> trigger_time, so it always bigger than (sync_time + async_time),
>> but we
>> >>>>>>>> have a big gap here, this may be problematic.
>> >>>>>>>> According to all the information, maybe the problem is some task
>> >>>>>>>> start to do checkpoint too late and the sync checkpoint part
>> took some time
>> >>>>>>>> too long, Could you please share some more information such
>> below:
>> >>>>>>>> - A Screenshot of summary for one bad checkpoint(we call it A
>> here)
>> >>>>>>>> - The detailed information of checkpoint A(includes all the
>> >>>>>>>> problematic subtasks)
>> >>>>>>>> - Jobmanager.log and the taskmanager.log for the problematic task
>> >>>>>>>> and a health task
>> >>>>>>>> - Share the screenshot of subtasks for the problematic
>> >>>>>>>> task(includes the `Bytes received`, `Records received`, `Bytes
>> sent`,
>> >>>>>>>> `Records sent` column), here wants to compare the problematic
>> parallelism
>> >>>>>>>> and good parallelism’s information, please also share the
>> information is
>> >>>>>>>> there has a data skew among the parallelisms,
>> >>>>>>>> - could you please share some jstacks of the problematic
>> >>>>>>>> parallelism — here wants to check whether the task is too busy
>> to handle
>> >>>>>>>> the barrier. (flame graph or other things is always welcome here)
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Congxian
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Congxian Qiu <qcx978132...@gmail.com> 于2019年8月1日周四 下午8:26写道:
>> >>>>>>>>
>> >>>>>>>>> Hi Bekir
>> >>>>>>>>>
>> >>>>>>>>> I'll first comb through all the information here, and try to
>> find
>> >>>>>>>>> out the reason with you, maybe need you to share some more
>> information :)
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Congxian
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年8月1日周四 下午5:00写道:
>> >>>>>>>>>
>> >>>>>>>>>> Hi Fabian,
>> >>>>>>>>>> Thanks for sharing this with us, but we’re already on version
>> >>>>>>>>>> 1.8.1.
>> >>>>>>>>>>
>> >>>>>>>>>> What I don’t understand is which mechanism in Flink adds 15
>> >>>>>>>>>> minutes to the checkpoint duration occasionally. Can you maybe
>> give us some
>> >>>>>>>>>> hints on where to look at? Is there a default timeout of 15
>> minutes defined
>> >>>>>>>>>> somewhere in Flink? I couldn’t find one.
>> >>>>>>>>>>
>> >>>>>>>>>> In our pipeline, most of the checkpoints complete in less than
>> a
>> >>>>>>>>>> minute and some of them completed in 15 minutes+(less than a
>> minute).
>> >>>>>>>>>> There’s definitely something which adds 15 minutes. This is
>> >>>>>>>>>> happening in one or more subtasks during checkpointing.
>> >>>>>>>>>>
>> >>>>>>>>>> Please see the screenshot below:
>> >>>>>>>>>>
>> >>>>>>>>>> Regards,
>> >>>>>>>>>> Bekir
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Op 23 jul. 2019, om 16:37 heeft Fabian Hueske <
>> fhue...@gmail.com>
>> >>>>>>>>>> het volgende geschreven:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Bekir,
>> >>>>>>>>>>
>> >>>>>>>>>> Another user reported checkpointing issues with Flink 1.8.0
>> [1].
>> >>>>>>>>>> These seem to be resolved with Flink 1.8.1.
>> >>>>>>>>>>
>> >>>>>>>>>> Hope this helps,
>> >>>>>>>>>> Fabian
>> >>>>>>>>>>
>> >>>>>>>>>> [1]
>> >>>>>>>>>>
>> >>>>>>>>>>
>> https://lists.apache.org/thread.html/991fe3b09fd6a052ff52e5f7d9cdd9418545e68b02e23493097d9bc4@%3Cuser.flink.apache.org%3E
>> >>>>>>>>>>
>> >>>>>>>>>> Am Mi., 17. Juli 2019 um 09:16 Uhr schrieb Congxian Qiu <
>> >>>>>>>>>> qcx978132...@gmail.com>:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi Bekir
>> >>>>>>>>>>
>> >>>>>>>>>> First of all, I think there is something wrong.  the state size
>> >>>>>>>>>> is almost
>> >>>>>>>>>> the same,  but the duration is different so much.
>> >>>>>>>>>>
>> >>>>>>>>>> The checkpoint for RocksDBStatebackend is dump sst files, then
>> >>>>>>>>>> copy the
>> >>>>>>>>>> needed sst files(if you enable incremental checkpoint, the sst
>> >>>>>>>>>> files
>> >>>>>>>>>> already on remote will not upload), then complete checkpoint.
>> Can
>> >>>>>>>>>> you check
>> >>>>>>>>>> the network bandwidth usage during checkpoint?
>> >>>>>>>>>>
>> >>>>>>>>>> Best,
>> >>>>>>>>>> Congxian
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Bekir Oguz <bekir.o...@persgroep.net> 于2019年7月16日周二 下午10:45写道:
>> >>>>>>>>>>
>> >>>>>>>>>> Hi all,
>> >>>>>>>>>> We have a flink job with user state, checkpointing to
>> >>>>>>>>>> RocksDBBackend
>> >>>>>>>>>> which is externally stored in AWS S3.
>> >>>>>>>>>> After we have migrated our cluster from 1.6 to 1.8, we see
>> >>>>>>>>>> occasionally
>> >>>>>>>>>> that some slots do to acknowledge the checkpoints quick enough.
>> >>>>>>>>>> As an
>> >>>>>>>>>> example: All slots acknowledge between 30-50 seconds except
>> only
>> >>>>>>>>>> one slot
>> >>>>>>>>>> acknowledges in 15 mins. Checkpoint sizes are similar to each
>> >>>>>>>>>> other, like
>> >>>>>>>>>> 200-400 MB.
>> >>>>>>>>>>
>> >>>>>>>>>> We did not experience this weird behaviour in Flink 1.6. We
>> have
>> >>>>>>>>>> 5 min
>> >>>>>>>>>> checkpoint interval and this happens sometimes once in an hour
>> >>>>>>>>>> sometimes
>> >>>>>>>>>> more but not in all the checkpoint requests. Please see the
>> >>>>>>>>>> screenshot
>> >>>>>>>>>> below.
>> >>>>>>>>>>
>> >>>>>>>>>> Also another point: For the faulty slots, the duration is
>> >>>>>>>>>> consistently 15
>> >>>>>>>>>> mins and some seconds, we couldn’t find out where this 15 mins
>> >>>>>>>>>> response
>> >>>>>>>>>> time comes from. And each time it is a different task manager,
>> >>>>>>>>>> not always
>> >>>>>>>>>> the same one.
>> >>>>>>>>>>
>> >>>>>>>>>> Do you guys aware of any other users having similar issues with
>> >>>>>>>>>> the new
>> >>>>>>>>>> version and also a suggested bug fix or solution?
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks in advance,
>> >>>>>>>>>> Bekir Oguz
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>
>> >> --
>> >> -- Bekir Oguz
>> >>
>> >
>>
>

-- 
-- Bekir Oguz

Reply via email to