[ 
https://issues.apache.org/jira/browse/FLINK-28357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564346#comment-17564346
 ] 

James commented on FLINK-28357:
-------------------------------

Hello - sorry been busy and just got round to rechecking my test program. 

If I comment out the line "env.disableOperatorChaining();" to allow operator 
chaining the issue also occurs. The screenshot of the dashboard after a 
recovery and the watermark not working properly is shown below:

!image-2022-07-08-17-06-01-256.png!

As you can see input2 for the join is saying Long.MIN_VALUE meaning the 
framework didn't trasnmit the watermark on behalf of my finished source (I 
didn't see the console message as you describe because it is finished).

If you're having trouble reproducing on 1.15.0 before your fix, try changing 
the checkpointing frequency to 5 * 1000 instead of 70 as per my comments. For 
me that cause the issue immediately after the first recovery.

 

BTW, I can't re-open this issue as you describe - not sure if I have perms etc.

> Watermark issue when recovering Finished sources
> ------------------------------------------------
>
>                 Key: FLINK-28357
>                 URL: https://issues.apache.org/jira/browse/FLINK-28357
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>         Environment: This can be reproduced in an IDE with the attached 
> sample program.
>            Reporter: James
>            Assignee: Piotr Nowojski
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.16.0, 1.15.2, 1.14.6
>
>         Attachments: WatermarkDemoMain.java, 
> image-2022-07-01-16-18-14-768.png, image-2022-07-08-17-06-01-256.png, 
> longExample.txt
>
>
> Copied mostly from email trail on the flink user mailing list:
> I done a lot of experimentation and I’m convinced there is a problem with 
> Flink handling Finished sources and recovery. 
> The program consists of:
>  * Two sources:
>  ** One “Long Running Source” – stays alive and emits a watermark of 
> DateTime.now() every 10 seconds.
>  *** Prints the console a message saying the watermark has been emitted.
>  *** *Throws an exception every 5 or 10 iterations to force a recovery.*
>  ** One “Short Lived Source” – emits a Long.MAX_VALUE watermark, prints a 
> message to the console and returns.
>  * The “Short Live Source” feeds into a map() and then it joins with the 
> “Long Running Source” with a KeyedCoProcessFunction. Moves to “FINISHED” 
> state by Flink.
> The problem here is that the “Join” receives no Long.MAX_VALUE watermark from 
> the map() in some situations after a recovery. The dashboard goes from 
> showing this:
> !https://attachment.outlook.live.net/owa/MSA%3Ajas_sl%40hotmail.com/service.svc/s/GetAttachmentThumbnail?id=AQMkADAwATEyMTk3LTZiMDQtODBkMi0wMAItMDAKAEYAAAOeUdiydD9QS6CQDK1Dg0olBwACkmHn2W1HRKQHhbPYmGe%2BAASF%2B488AAAAApJh59ltR0SkB4Wz2JhnvgAFXJ9puQAAAAESABAAyemY6ar4b0GAFLHn3hpyCw%3D%3D&thumbnailType=2&isc=1&token=eyJhbGciOiJSUzI1NiIsImtpZCI6IkZBRDY1NDI2MkM2QUYyOTYxQUExRThDQUI3OEZGMUIyNzBFNzA3RTkiLCJ0eXAiOiJKV1QiLCJ4NXQiOiItdFpVSml4cThwWWFvZWpLdDRfeHNuRG5CLWsifQ.eyJvcmlnaW4iOiJodHRwczovL291dGxvb2subGl2ZS5jb20iLCJ1YyI6IjMwMDU4MTIzODAzMzRlMmZhNzE5ZGUxOTNjNjA4NjQ3IiwidmVyIjoiRXhjaGFuZ2UuQ2FsbGJhY2suVjEiLCJhcHBjdHhzZW5kZXIiOiJPd2FEb3dubG9hZEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJpc3NyaW5nIjoiV1ciLCJhcHBjdHgiOiJ7XCJtc2V4Y2hwcm90XCI6XCJvd2FcIixcInB1aWRcIjpcIjMxODQwOTE5NTk0NjE5NFwiLFwic2NvcGVcIjpcIk93YURvd25sb2FkXCIsXCJvaWRcIjpcIjAwMDEyMTk3LTZiMDQtODBkMi0wMDAwLTAwMDAwMDAwMDAwMFwiLFwicHJpbWFyeXNpZFwiOlwiUy0xLTI4MjctNzQxMzUtMTc5NTQ1NzIzNFwifSIsIm5iZiI6MTY1NjY4ODI3OCwiZXhwIjoxNjU2Njg4ODc4LCJpc3MiOiIwMDAwMDAwMi0wMDAwLTBmZjEtY2UwMC0wMDAwMDAwMDAwMDBAODRkZjllN2YtZTlmNi00MGFmLWI0MzUtYWFhYWFhYWFhYWFhIiwiYXVkIjoiMDAwMDAwMDItMDAwMC0wZmYxLWNlMDAtMDAwMDAwMDAwMDAwL2F0dGFjaG1lbnQub3V0bG9vay5saXZlLm5ldEA4NGRmOWU3Zi1lOWY2LTQwYWYtYjQzNS1hYWFhYWFhYWFhYWEiLCJoYXBwIjoib3dhIn0.KI4I55ycdP1duIwxyYZstLCtnNOwEkyTxfEwK_5a35-ZLMrKd8zHCB5Elw-9-A9UHIxFGSYOlwnHXRvDT0xa6FqFIlO8cnebBRLKv9DhxHwfZqdKWIeF2EcUqwH0ejeA3RvD3-dR95iHPTf52-tuKi27nclPUUEJgbfRWQY3wHMDAFLLaLvKM6AV5S1IhGjBmy3MF_1oulTXbqRZx0ar3L8YQiHEGnfKGjFO2zSxQcTZXAp_rch4HIrVv9GSEcQnD7nBhWPBuuzuvXOvJiUzg0u_e9CUuf1-OcQwhUV3cf7cvme8JadfliY6ywkOne1OZsclQeDFc8EnGZke3l2V_Q&X-OWA-CANARY=Es9QgEoDXEyksG3kZxXeMGC1LvlzW9oYJ-lyWNl-xblWQjmqz5FH_a2-eHuR6Zr51XNjigQpQDs.&owa=outlook.live.com&scriptVer=20220617005.11&animation=true!
> To the below after a recovery (with the currentInput1/2Watermark metrics 
> showing input 2 having not received a watermark from the map, saying 
> –Long.MAX_VALUE):
> !image-2022-07-01-16-18-14-768.png!
> The program is currently set to checkpoint every 5 seconds. By experimenting 
> with 70 seconds, it seems that if only one checkpoint has been taken with the 
> “Short Lived Source” in a FINISHED state since the last recovery then 
> everything works fine and the restarted “Short Lived Source” emits its 
> watermark and I see the “ShortedLivedEmptySource emitting Long.MAX_VALUE 
> watermark” message on the console meaning the run() definitely executed. 
> However, I found that if 2 or more checkpoints are taken since the last 
> recovery with the source in a FINISHED state then the console message does 
> not appear and the watermark is not emitted.
> To repeat – the Join does not get a Long.MAX_VALUE watermark from my source 
> or Flink if I see two or more checkpoints logged in between recoveries. If 
> zero or checkpoints are made, everything is fine – the join gets the 
> watermark and I see my console message. You can play with the checkpointing 
> frequency as per the code comments:
>         // Useful checkpoint interval options:
>         //    5 - see the problem after the first recovery
>         //   70 - useful to see bad behaviour kick in after a recovery or two
>         //  120 - won't see the problem as we don't have 2 checkpoints within 
> a single recovery session
> If I merge the Triggering/Completed checkpoint messages in the log with my 
> console output I see something like this clearly showing the “Short Lived 
> Source” run() method is not executed after 2 checkpoints with the operators 
> marked as FINISHED:
>  
> 2022-06-29T11:52:31.268Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE 
> watermark.
> 2022-06-29T11:52:31.293Z: LongRunningSource emitting initial 
> watermark=1656503551268
> 2022-06-29T11:52:41.302Z: LongRunningSource emitting loop 
> watermark=1656503561302
> 2022-06-29T11:52:51.302Z: LongRunningSource emitting loop 
> watermark=1656503571302
> 2022-06-29T11:53:01.303Z: LongRunningSource emitting loop 
> watermark=1656503581303
> 2022-06-29 11:53:02.772 INFO  [Checkpoint Timer] 
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 1 
> (type=CheckpointType\{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:53:02.870 INFO  [jobmanager-io-thread-10] 
> o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 1 for job 
> 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:53:11.303Z: LongRunningSource emitting loop 
> watermark=1656503591303
> 2022-06-29T11:53:21.304Z: LongRunningSource emitting loop 
> watermark=1656503601304
> 2022-06-29T11:53:21.304Z: ------------------ Recovery ------------------
> 2022-06-29T11:53:22.405Z: LongRunningSource emitting initial 
> watermark=1656503602405
> 2022-06-29T11:53:22.408Z: *ShortLivedEmptySource* emitting Long.MAX_VALUE 
> watermark.
> 2022-06-29T11:53:32.406Z: LongRunningSource emitting loop 
> watermark=1656503612406
> 2022-06-29T11:53:42.406Z: LongRunningSource emitting loop 
> watermark=1656503622406
> 2022-06-29 11:53:51.048 INFO  [Checkpoint Timer] 
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 2 
> (type=CheckpointType\{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:53:51.067 INFO  [jobmanager-io-thread-4] 
> o.a.f.r.c.CheckpointCoordinator     Completed checkpoint 2 for job 
> 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:53:52.407Z: LongRunningSource emitting loop 
> watermark=1656503632407
> 2022-06-29T11:54:02.407Z: LongRunningSource emitting loop 
> watermark=1656503642407
> 2022-06-29T11:54:12.408Z: LongRunningSource emitting loop 
> watermark=1656503652408
> 2022-06-29T11:54:22.408Z: LongRunningSource emitting loop 
> watermark=1656503662408
> 2022-06-29T11:54:32.409Z: LongRunningSource emitting loop 
> watermark=1656503672409
> 2022-06-29T11:54:42.409Z: LongRunningSource emitting loop 
> watermark=1656503682409
> 2022-06-29T11:54:52.410Z: LongRunningSource emitting loop 
> watermark=1656503692410
> 2022-06-29 11:55:01.048 INFO  [Checkpoint Timer] 
> o.a.f.r.c.CheckpointCoordinator           Triggering checkpoint 3 
> (type=CheckpointType\{name='Checkpoint', 
> sharingFilesStrategy=FORWARD_BACKWARD})
> 2022-06-29 11:55:01.057 INFO  [jobmanager-io-thread-10] 
> o.a.f.r.c.CheckpointCoordinator    Completed checkpoint 3 for job 
> 877656d7752bc1304c2cb92790e6aefb
> 2022-06-29T11:55:02.410Z: LongRunningSource emitting loop 
> watermark=1656503702410
> 2022-06-29T11:55:02.411Z: ------------------ Recovery ------------------
> 2022-06-29T11:55:03.445Z: LongRunningSource emitting initial 
> watermark=1656503703444       <<<<< NO “ShortLivedEmptySource” message after 
> recovery
> 2022-06-29T11:55:13.446Z: LongRunningSource emitting loop 
> watermark=1656503713445
> 2022-06-29T11:55:23.446Z: LongRunningSource emitting loop 
> watermark=1656503723446
> 2022-06-29T11:55:33.446Z: LongRunningSource emitting loop 
> watermark=1656503733446
>  
> I have also attached a longer example with shows everything working fine 
> after 5 recoveries, and then breaking after the 6{^}th{^}.
> I am guessing here it has something to do with the checkpointing and recovery 
> of a FINISHED source.
> Finally, here are some ways that allows the code to work:
>  * Change the code so the “Short Lived Source” doesn’t return from run() and 
> stays RUNNING (uncomment the Thread.sleep)
>  * As I mentioned before, if I remove the map() operator the problem in the 
> join also goes away. (I don’t see the console output but the join is happy)
>  * Use a long enough checkpoint interval (e.g. 120 seconds) so we don’t have 
> two checkpoints with FINISHED state per recovery.
> The fact these changes prevent the issue means I really think there’s some 
> bug or inconsistency here – if somebody could explain I would really 
> appreciate it.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to