[ 
https://issues.apache.org/jira/browse/FLINK-29461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-29461:
-----------------------------------
    Labels: pull-request-available test-stability  (was: test-stability)

> ProcessDataStreamStreamingTests.test_process_function unstable
> --------------------------------------------------------------
>
>                 Key: FLINK-29461
>                 URL: https://issues.apache.org/jira/browse/FLINK-29461
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Huang Xingbo
>            Assignee: Huang Xingbo
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>
> {code:java}
> 2022-09-29T02:10:45.3571648Z Sep 29 02:10:45 self = 
> <pyflink.datastream.tests.test_data_stream.ProcessDataStreamStreamingTests 
> testMethod=test_process_function>
> 2022-09-29T02:10:45.3572279Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3572810Z Sep 29 02:10:45     def 
> test_process_function(self):
> 2022-09-29T02:10:45.3573495Z Sep 29 02:10:45         
> self.env.set_parallelism(1)
> 2022-09-29T02:10:45.3574148Z Sep 29 02:10:45         
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-09-29T02:10:45.3580634Z Sep 29 02:10:45         
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-09-29T02:10:45.3583194Z Sep 29 02:10:45         data_stream = 
> self.env.from_collection([(1, '1603708211000'),
> 2022-09-29T02:10:45.3584515Z Sep 29 02:10:45                                  
>                (2, '1603708224000'),
> 2022-09-29T02:10:45.3585957Z Sep 29 02:10:45                                  
>                (3, '1603708226000'),
> 2022-09-29T02:10:45.3587132Z Sep 29 02:10:45                                  
>                (4, '1603708289000')],
> 2022-09-29T02:10:45.3588094Z Sep 29 02:10:45                                  
>               type_info=Types.ROW([Types.INT(), Types.STRING()]))
> 2022-09-29T02:10:45.3589090Z Sep 29 02:10:45     
> 2022-09-29T02:10:45.3589949Z Sep 29 02:10:45         class 
> MyProcessFunction(ProcessFunction):
> 2022-09-29T02:10:45.3590710Z Sep 29 02:10:45     
> 2022-09-29T02:10:45.3591856Z Sep 29 02:10:45             def 
> process_element(self, value, ctx):
> 2022-09-29T02:10:45.3592873Z Sep 29 02:10:45                 
> current_timestamp = ctx.timestamp()
> 2022-09-29T02:10:45.3593862Z Sep 29 02:10:45                 
> current_watermark = ctx.timer_service().current_watermark()
> 2022-09-29T02:10:45.3594915Z Sep 29 02:10:45                 yield "current 
> timestamp: {}, current watermark: {}, current_value: {}"\
> 2022-09-29T02:10:45.3596201Z Sep 29 02:10:45                     
> .format(str(current_timestamp), str(current_watermark), str(value))
> 2022-09-29T02:10:45.3597089Z Sep 29 02:10:45     
> 2022-09-29T02:10:45.3597942Z Sep 29 02:10:45         watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps()\
> 2022-09-29T02:10:45.3599260Z Sep 29 02:10:45             
> .with_timestamp_assigner(SecondColumnTimestampAssigner())
> 2022-09-29T02:10:45.3600611Z Sep 29 02:10:45         
> data_stream.assign_timestamps_and_watermarks(watermark_strategy)\
> 2022-09-29T02:10:45.3601877Z Sep 29 02:10:45             
> .process(MyProcessFunction(), 
> output_type=Types.STRING()).add_sink(self.test_sink)
> 2022-09-29T02:10:45.3603527Z Sep 29 02:10:45         self.env.execute('test 
> process function')
> 2022-09-29T02:10:45.3604445Z Sep 29 02:10:45         results = 
> self.test_sink.get_results()
> 2022-09-29T02:10:45.3605684Z Sep 29 02:10:45         expected = ["current 
> timestamp: 1603708211000, current watermark: "
> 2022-09-29T02:10:45.3607157Z Sep 29 02:10:45                     
> "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')",
> 2022-09-29T02:10:45.3608256Z Sep 29 02:10:45                     "current 
> timestamp: 1603708224000, current watermark: "
> 2022-09-29T02:10:45.3609650Z Sep 29 02:10:45                     
> "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')",
> 2022-09-29T02:10:45.3610854Z Sep 29 02:10:45                     "current 
> timestamp: 1603708226000, current watermark: "
> 2022-09-29T02:10:45.3612279Z Sep 29 02:10:45                     
> "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')",
> 2022-09-29T02:10:45.3613382Z Sep 29 02:10:45                     "current 
> timestamp: 1603708289000, current watermark: "
> 2022-09-29T02:10:45.3615683Z Sep 29 02:10:45                     
> "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"]
> 2022-09-29T02:10:45.3617687Z Sep 29 02:10:45 >       
> self.assert_equals_sorted(expected, results)
> 2022-09-29T02:10:45.3618620Z Sep 29 02:10:45 
> 2022-09-29T02:10:45.3619425Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:986: 
> 2022-09-29T02:10:45.3620424Z Sep 29 02:10:45 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2022-09-29T02:10:45.3621886Z Sep 29 02:10:45 
> pyflink/datastream/tests/test_data_stream.py:66: in assert_equals_sorted
> 2022-09-29T02:10:45.3622847Z Sep 29 02:10:45     self.assertEqual(expected, 
> actual)
> 2022-09-29T02:10:45.3624658Z Sep 29 02:10:45 E   AssertionError: Lists 
> differ: ["cur[414 chars]ark: -9223372036854775808, current_value: Row([22 
> chars]0')"] != ["cur[414 chars]ark: 1603708225999, current_value: Row(f0=4, 
> f[15 chars]0')"]
> 2022-09-29T02:10:45.3625881Z Sep 29 02:10:45 E   
> 2022-09-29T02:10:45.3626591Z Sep 29 02:10:45 E   First differing element 3:
> 2022-09-29T02:10:45.3627726Z Sep 29 02:10:45 E   "curr[44 chars]ark: 
> -9223372036854775808, current_value: Row([21 chars]00')"
> 2022-09-29T02:10:45.3628758Z Sep 29 02:10:45 E   "curr[44 chars]ark: 
> 1603708225999, current_value: Row(f0=4, f[14 chars]00')"
> 2022-09-29T02:10:45.3629276Z Sep 29 02:10:45 E   
> 2022-09-29T02:10:45.3629842Z Sep 29 02:10:45 E   Diff is 753 characters long. 
> Set self.maxDiff to None to see it.
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=41436&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to