[ 
https://issues.apache.org/jira/browse/BEAM-11749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marek Pikulski updated BEAM-11749:
----------------------------------
    Attachment: timer_test.py

> Portable Flink runner skips timers when dynamic timer tags are used
> -------------------------------------------------------------------
>
>                 Key: BEAM-11749
>                 URL: https://issues.apache.org/jira/browse/BEAM-11749
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.27.0
>            Reporter: Marek Pikulski
>            Priority: P2
>         Attachments: timer_test.py
>
>
> Timers in Flink runner do not fire as expected. See example code below for 
> details:
>  
> {color:#ce9178}"""Demonstrates Beam timer issue with portable Flink 
> runner.{color}
> {color:#ce9178}Run (with 'apache_beam' 2.27.0, OpenJDK 8, Python 
> 3.8.2):{color}
> {color:#ce9178}  python timer_test.py{color}
> {color:#ce9178}Typical program output:{color}
> {color:#ce9178}  ...{color}
> {color:#ce9178}  INFO:__main__:Setting timer to Timestamp(2), data: (None, 
> [2]){color}
> {color:#ce9178}  INFO:__main__:Setting timer to Timestamp(1), data: (None, 
> [1]){color}
> {color:#ce9178}  INFO:__main__:Setting timer to 
> Timestamp(9223371950454.775000), data: (None, [])  # noqa: E501{color}
> {color:#ce9178}  INFO:__main__:Timer fired at 
> Timestamp(9223371950454.775000){color}
> {color:#ce9178}  ...{color}
> {color:#ce9178}  (program exits, or hangs with 
> --shutdown_sources_after_idle_ms=9223372036854775807){color}
> {color:#ce9178}Expected program output (based on 'Dynamic timer tags' 
> in{color}
> {color:#ce9178}https://beam.apache.org/documentation/programming-guide/#timers):{color}
> {color:#ce9178}  ...{color}
> {color:#ce9178}  INFO:__main__:Setting timer to Timestamp(2), data: (None, 
> [2]){color}
> {color:#ce9178}  INFO:__main__:Setting timer to Timestamp(1), data: (None, 
> [1]){color}
> {color:#ce9178}  INFO:__main__:Setting timer to 
> Timestamp(9223371950454.775000), data: (None, [])  # noqa: E501{color}
> {color:#ce9178}  INFO:__main__:Timer fired at Timestamp(1){color}
> {color:#ce9178}  INFO:__main__:Timer fired at Timestamp(2){color}
> {color:#ce9178}  INFO:__main__:Timer fired at 
> Timestamp(9223371950454.775000){color}
> {color:#ce9178}  ...{color}
> {color:#ce9178}  (program exit){color}
> {color:#ce9178}Note that in this example, things might seem not too bad, 
> because there{color}
> {color:#ce9178}is a final timestamp emitted by the `beam.Create` source. In a 
> true streaming{color}
> {color:#ce9178}case, however, the timer would fire only at timestamp 1, so 
> data at{color}
> {color:#ce9178}timestamp 2 would never be considered complete.{color}
> {color:#ce9178}"""{color}
> {color:#c586c0}import{color}{color:#d4d4d4} {color}{color:#4ec9b0}sys{color}
> {color:#c586c0}import{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}logging{color}
> {color:#c586c0}import{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4} 
> {color}{color:#c586c0}as{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}beam{color}
> {color:#c586c0}from{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}options{color}{color:#d4d4d4}.{color}{color:#4ec9b0}pipeline_options{color}{color:#d4d4d4}
>  {color}{color:#c586c0}import{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}PipelineOptions{color}
> {color:#c586c0}from{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}timeutil{color}{color:#d4d4d4}
>  {color}{color:#c586c0}import{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}TimeDomain{color}
> {color:#c586c0}from{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}userstate{color}{color:#d4d4d4}
>  {color}{color:#c586c0}import{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}TimerSpec{color}{color:#d4d4d4}, 
> {color}{color:#dcdcaa}on_timer{color}
> {color:#c586c0}from{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}apache_beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}transforms{color}{color:#d4d4d4}.{color}{color:#4ec9b0}window{color}{color:#d4d4d4}
>  {color}{color:#c586c0}import{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}TimestampCombiner{color}
> {color:#4fc1ff}LOGGER{color}{color:#d4d4d4} = 
> {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}getLogger{color}{color:#d4d4d4}({color}{color:#d4d4d4}__name__{color}{color:#d4d4d4}){color}
> {color:#569cd6}class{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}TimerTestFn{color}{color:#d4d4d4}({color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}):
>   {color}{color:#6a9955}# noqa: D101{color}
> {color:#d4d4d4}    {color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4} = 
> {color}{color:#4ec9b0}TimerSpec{color}{color:#d4d4d4}({color}{color:#ce9178}'timestamp_expired'{color}{color:#d4d4d4},
>  
> {color}{color:#4ec9b0}TimeDomain{color}{color:#d4d4d4}.{color}{color:#9cdcfe}WATERMARK{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}    {color}{color:#569cd6}def{color}{color:#d4d4d4} 
> {color}{color:#dcdcaa}process{color}{color:#d4d4d4}({color}
> {color:#d4d4d4}            
> {color}{color:#9cdcfe}self{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}            
> {color}{color:#9cdcfe}element{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}            
> {color}{color:#9cdcfe}t{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimestampParam{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}            
> {color}{color:#9cdcfe}timer{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimerParam{color}{color:#d4d4d4}({color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}    ) -> {color}{color:#569cd6}None{color}{color:#d4d4d4}:  
> {color}{color:#6a9955}# noqa: D102{color}
> {color:#d4d4d4}        
> {color}{color:#4fc1ff}LOGGER{color}{color:#d4d4d4}.{color}{color:#dcdcaa}info{color}{color:#d4d4d4}({color}{color:#569cd6}f{color}{color:#ce9178}"Setting
>  timer to 
> {color}{color:#569cd6}{{color}{color:#9cdcfe}t{color}{color:#569cd6}}{color}{color:#ce9178},
>  data: 
> {color}{color:#569cd6}{{color}{color:#9cdcfe}element{color}{color:#569cd6}}{color}{color:#ce9178}"{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}        
> {color}{color:#9cdcfe}timer{color}{color:#d4d4d4}.set({color}{color:#9cdcfe}t{color}{color:#d4d4d4},
>  
> {color}{color:#9cdcfe}dynamic_timer_tag{color}{color:#d4d4d4}={color}{color:#4ec9b0}str{color}{color:#d4d4d4}({color}{color:#9cdcfe}t{color}{color:#d4d4d4}.micros)){color}
> {color:#d4d4d4}    
> {color}{color:#dcdcaa}@{color}{color:#dcdcaa}on_timer{color}{color:#d4d4d4}({color}{color:#9cdcfe}TIMER_SPEC{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}    {color}{color:#569cd6}def{color}{color:#d4d4d4} 
> {color}{color:#dcdcaa}timer_cb{color}{color:#d4d4d4}({color}
> {color:#d4d4d4}            
> {color}{color:#9cdcfe}self{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}            
> {color}{color:#9cdcfe}t{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}TimestampParam{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}            
> {color}{color:#9cdcfe}tag{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}DoFn{color}{color:#d4d4d4}.{color}{color:#9cdcfe}DynamicTimerTagParam{color}
> {color:#d4d4d4}    ) -> {color}{color:#569cd6}None{color}{color:#d4d4d4}:  
> {color}{color:#6a9955}# noqa: D102{color}
> {color:#d4d4d4}        
> {color}{color:#4fc1ff}LOGGER{color}{color:#d4d4d4}.{color}{color:#dcdcaa}info{color}{color:#d4d4d4}({color}{color:#569cd6}f{color}{color:#ce9178}"Timer
>  fired at 
> {color}{color:#569cd6}{{color}{color:#9cdcfe}t{color}{color:#569cd6}}{color}{color:#ce9178}"{color}{color:#d4d4d4}){color}
> {color:#569cd6}def{color}{color:#d4d4d4} 
> {color}{color:#dcdcaa}main{color}{color:#d4d4d4}():{color}
> {color:#d4d4d4}    {color}{color:#ce9178}"""Do the work."""{color}
> {color:#d4d4d4}    
> {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}basicConfig{color}{color:#d4d4d4}({color}{color:#9cdcfe}level{color}{color:#d4d4d4}={color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#9cdcfe}INFO{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}    
> {color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#dcdcaa}getLogger{color}{color:#d4d4d4}({color}
> {color:#d4d4d4}        
> {color}{color:#ce9178}"apache_beam.utils.subprocess_server"{color}{color:#d4d4d4}).{color}{color:#dcdcaa}setLevel{color}{color:#d4d4d4}({color}{color:#4ec9b0}logging{color}{color:#d4d4d4}.{color}{color:#9cdcfe}WARNING{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}    {color}{color:#9cdcfe}pipeline_opts{color}{color:#d4d4d4} 
> = {color}{color:#4ec9b0}PipelineOptions{color}{color:#d4d4d4}({color}
> {color:#d4d4d4}        
> {color}{color:#4ec9b0}sys{color}{color:#d4d4d4}.{color}{color:#9cdcfe}argv{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}        
> {color}{color:#9cdcfe}runner{color}{color:#d4d4d4}={color}{color:#ce9178}'FlinkRunner'{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}        
> {color}{color:#9cdcfe}flink_master{color}{color:#d4d4d4}={color}{color:#ce9178}'[local]'{color}{color:#d4d4d4},{color}
> {color:#d4d4d4}        
> {color}{color:#9cdcfe}streaming{color}{color:#d4d4d4}={color}{color:#569cd6}True{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}    {color}{color:#c586c0}with{color}{color:#d4d4d4} 
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}Pipeline{color}{color:#d4d4d4}({color}{color:#9cdcfe}options{color}{color:#d4d4d4}={color}{color:#9cdcfe}pipeline_opts{color}{color:#d4d4d4})
>  {color}{color:#c586c0}as{color}{color:#d4d4d4} 
> {color}{color:#9cdcfe}p{color}{color:#d4d4d4}:{color}
> {color:#d4d4d4}        ({color}
> {color:#d4d4d4}            {color}{color:#9cdcfe}p{color}
> {color:#d4d4d4}            | 
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}Create{color}{color:#d4d4d4}([{color}{color:#b5cea8}2{color}{color:#d4d4d4},
>  {color}{color:#b5cea8}1{color}{color:#d4d4d4}]){color}
> {color:#d4d4d4}            | 
> {color}{color:#ce9178}'AddTimestamps'{color}{color:#d4d4d4} >> 
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#dcdcaa}Map{color}{color:#d4d4d4}({color}
> {color:#d4d4d4}               
> {color}{color:#569cd6}lambda{color}{color:#d4d4d4} 
> {color}{color:#9cdcfe}element{color}{color:#d4d4d4}: 
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.window.TimestampedValue({color}
> {color:#d4d4d4}                   
> {color}{color:#9cdcfe}element{color}{color:#d4d4d4}, 
> {color}{color:#9cdcfe}element{color}{color:#d4d4d4})){color}
> {color:#d4d4d4}            | 
> {color}{color:#ce9178}'GlobalWindow'{color}{color:#d4d4d4} >> 
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}WindowInto{color}{color:#d4d4d4}({color}
> {color:#d4d4d4}                    
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.window.GlobalWindows(),{color}
> {color:#d4d4d4}                    
> {color}{color:#9cdcfe}trigger{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.Repeatedly({color}
> {color:#d4d4d4}                        
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.AfterCount({color}{color:#b5cea8}1{color}{color:#d4d4d4})),{color}
> {color:#d4d4d4}                    
> {color}{color:#9cdcfe}accumulation_mode{color}{color:#d4d4d4}={color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.trigger.AccumulationMode.DISCARDING,{color}
> {color:#d4d4d4}                    
> {color}{color:#9cdcfe}timestamp_combiner{color}{color:#d4d4d4}={color}{color:#4ec9b0}TimestampCombiner{color}{color:#d4d4d4}.{color}{color:#9cdcfe}OUTPUT_AT_LATEST{color}
> {color:#d4d4d4}                ){color}
> {color:#d4d4d4}            | 
> {color}{color:#ce9178}'GroupBy'{color}{color:#d4d4d4} >> 
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}GroupBy{color}{color:#d4d4d4}({color}{color:#569cd6}lambda{color}{color:#d4d4d4}
>  {color}{color:#9cdcfe}element{color}{color:#d4d4d4}: 
> {color}{color:#569cd6}None{color}{color:#d4d4d4}){color}
> {color:#d4d4d4}            | 
> {color}{color:#ce9178}'TestTimers'{color}{color:#d4d4d4} >> 
> {color}{color:#4ec9b0}beam{color}{color:#d4d4d4}.{color}{color:#4ec9b0}ParDo{color}{color:#d4d4d4}({color}{color:#4ec9b0}TimerTestFn{color}{color:#d4d4d4}()){color}
> {color:#d4d4d4}        ){color}
> {color:#c586c0}if{color}{color:#d4d4d4} 
> {color}{color:#d4d4d4}__name__{color}{color:#d4d4d4} == 
> {color}{color:#ce9178}"__main__"{color}{color:#d4d4d4}:{color}
> {color:#d4d4d4}    {color}{color:#dcdcaa}main{color}{color:#d4d4d4}(){color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to