[ 
https://issues.apache.org/jira/browse/FLINK-25967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao closed FLINK-25967.
---------------------------
    Resolution: Duplicate

> StreamingModeDataStreamTests.test_keyed_process_function_with_state failed on 
> azure
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-25967
>                 URL: https://issues.apache.org/jira/browse/FLINK-25967
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.15.0
>            Reporter: Yun Gao
>            Priority: Major
>
> {code:java}
> 2022-02-07T03:46:14.7065997Z Feb 07 03:46:14 
> =================================== FAILURES 
> ===================================
> 2022-02-07T03:46:14.7077277Z Feb 07 03:46:14 _____ 
> StreamingModeDataStreamTests.test_keyed_process_function_with_state ______
> 2022-02-07T03:46:14.7077773Z Feb 07 03:46:14 
> 2022-02-07T03:46:14.7078301Z Feb 07 03:46:14 self = 
> <pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests 
> testMethod=test_keyed_process_function_with_state>
> 2022-02-07T03:46:14.7078856Z Feb 07 03:46:14 
> 2022-02-07T03:46:14.7079241Z Feb 07 03:46:14     def 
> test_keyed_process_function_with_state(self):
> 2022-02-07T03:46:14.7079730Z Feb 07 03:46:14         
> self.env.get_config().set_auto_watermark_interval(2000)
> 2022-02-07T03:46:14.7080460Z Feb 07 03:46:14         
> self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> 2022-02-07T03:46:14.7081786Z Feb 07 03:46:14         data_stream = 
> self.env.from_collection([(1, 'hi', '1603708211000'),
> 2022-02-07T03:46:14.7082489Z Feb 07 03:46:14                                  
>                (2, 'hello', '1603708224000'),
> 2022-02-07T03:46:14.7083172Z Feb 07 03:46:14                                  
>                (3, 'hi', '1603708226000'),
> 2022-02-07T03:46:14.7083808Z Feb 07 03:46:14                                  
>                (4, 'hello', '1603708289000'),
> 2022-02-07T03:46:14.7084439Z Feb 07 03:46:14                                  
>                (5, 'hi', '1603708291000'),
> 2022-02-07T03:46:14.7085060Z Feb 07 03:46:14                                  
>                (6, 'hello', '1603708293000')],
> 2022-02-07T03:46:14.7085549Z Feb 07 03:46:14                                  
>               type_info=Types.ROW([Types.INT(), Types.STRING(),
> 2022-02-07T03:46:14.7086019Z Feb 07 03:46:14                                  
>                                    Types.STRING()]))
> 2022-02-07T03:46:14.7086374Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7086749Z Feb 07 03:46:14         class 
> MyTimestampAssigner(TimestampAssigner):
> 2022-02-07T03:46:14.7087121Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7087695Z Feb 07 03:46:14             def 
> extract_timestamp(self, value, record_timestamp) -> int:
> 2022-02-07T03:46:14.7088140Z Feb 07 03:46:14                 return 
> int(value[2])
> 2022-02-07T03:46:14.7088469Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7088848Z Feb 07 03:46:14         class 
> MyProcessFunction(KeyedProcessFunction):
> 2022-02-07T03:46:14.7089220Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7089537Z Feb 07 03:46:14             def __init__(self):
> 2022-02-07T03:46:14.7089916Z Feb 07 03:46:14                 self.value_state 
> = None
> 2022-02-07T03:46:14.7090294Z Feb 07 03:46:14                 self.list_state 
> = None
> 2022-02-07T03:46:14.7090676Z Feb 07 03:46:14                 self.map_state = 
> None
> 2022-02-07T03:46:14.7091117Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7091482Z Feb 07 03:46:14             def open(self, 
> runtime_context: RuntimeContext):
> 2022-02-07T03:46:14.7092223Z Feb 07 03:46:14                 
> value_state_descriptor = ValueStateDescriptor('value_state', Types.INT())
> 2022-02-07T03:46:14.7092790Z Feb 07 03:46:14                 self.value_state 
> = runtime_context.get_state(value_state_descriptor)
> 2022-02-07T03:46:14.7093544Z Feb 07 03:46:14                 
> list_state_descriptor = ListStateDescriptor('list_state', Types.INT())
> 2022-02-07T03:46:14.7094102Z Feb 07 03:46:14                 self.list_state 
> = runtime_context.get_list_state(list_state_descriptor)
> 2022-02-07T03:46:14.7094883Z Feb 07 03:46:14                 
> map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), 
> Types.STRING())
> 2022-02-07T03:46:14.7095550Z Feb 07 03:46:14                 state_ttl_config 
> = StateTtlConfig \
> 2022-02-07T03:46:14.7095960Z Feb 07 03:46:14                     
> .new_builder(Time.seconds(1)) \
> 2022-02-07T03:46:14.7096643Z Feb 07 03:46:14                     
> .set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite) \
> 2022-02-07T03:46:14.7097104Z Feb 07 03:46:14                     
> .set_state_visibility(
> 2022-02-07T03:46:14.7097563Z Feb 07 03:46:14                         
> StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) \
> 2022-02-07T03:46:14.7098044Z Feb 07 03:46:14                     
> .disable_cleanup_in_background() \
> 2022-02-07T03:46:14.7098425Z Feb 07 03:46:14                     .build()
> 2022-02-07T03:46:14.7098835Z Feb 07 03:46:14                 
> map_state_descriptor.enable_time_to_live(state_ttl_config)
> 2022-02-07T03:46:14.7099362Z Feb 07 03:46:14                 self.map_state = 
> runtime_context.get_map_state(map_state_descriptor)
> 2022-02-07T03:46:14.7099787Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7100147Z Feb 07 03:46:14             def 
> process_element(self, value, ctx):
> 2022-02-07T03:46:14.7100532Z Feb 07 03:46:14                 import time
> 2022-02-07T03:46:14.7100974Z Feb 07 03:46:14                 time.sleep(1)
> 2022-02-07T03:46:14.7101368Z Feb 07 03:46:14                 current_value = 
> self.value_state.value()
> 2022-02-07T03:46:14.7101799Z Feb 07 03:46:14                 
> self.value_state.update(value[0])
> 2022-02-07T03:46:14.7102241Z Feb 07 03:46:14                 current_list = 
> [_ for _ in self.list_state.get()]
> 2022-02-07T03:46:14.7102676Z Feb 07 03:46:14                 
> self.list_state.add(value[0])
> 2022-02-07T03:46:14.7103119Z Feb 07 03:46:14                 map_entries = 
> {k: v for k, v in self.map_state.items()}
> 2022-02-07T03:46:14.7103552Z Feb 07 03:46:14                 keys = 
> sorted(map_entries.keys())
> 2022-02-07T03:46:14.7104263Z Feb 07 03:46:14                 
> map_entries_string = [str(k) + ': ' + str(map_entries[k]) for k in keys]
> 2022-02-07T03:46:14.7104977Z Feb 07 03:46:14                 
> map_entries_string = '{' + ', '.join(map_entries_string) + '}'
> 2022-02-07T03:46:14.7105453Z Feb 07 03:46:14                 
> self.map_state.put(value[0], value[1])
> 2022-02-07T03:46:14.7105882Z Feb 07 03:46:14                 current_key = 
> ctx.get_current_key()
> 2022-02-07T03:46:14.7106361Z Feb 07 03:46:14                 yield "current 
> key: {}, current value state: {}, current list state: {}, " \
> 2022-02-07T03:46:14.7106871Z Feb 07 03:46:14                       "current 
> map state: {}, current value: {}".format(str(current_key),
> 2022-02-07T03:46:14.7107344Z Feb 07 03:46:14                                  
>                                        str(current_value),
> 2022-02-07T03:46:14.7107765Z Feb 07 03:46:14                                  
>                                        str(current_list),
> 2022-02-07T03:46:14.7108185Z Feb 07 03:46:14                                  
>                                        map_entries_string,
> 2022-02-07T03:46:14.7108598Z Feb 07 03:46:14                                  
>                                        str(value))
> 2022-02-07T03:46:14.7108938Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7109285Z Feb 07 03:46:14             def on_timer(self, 
> timestamp, ctx):
> 2022-02-07T03:46:14.7109656Z Feb 07 03:46:14                 pass
> 2022-02-07T03:46:14.7109958Z Feb 07 03:46:14     
> 2022-02-07T03:46:14.7110370Z Feb 07 03:46:14         watermark_strategy = 
> WatermarkStrategy.for_monotonous_timestamps() \
> 2022-02-07T03:46:14.7110865Z Feb 07 03:46:14             
> .with_timestamp_assigner(MyTimestampAssigner())
> 2022-02-07T03:46:14.7111365Z Feb 07 03:46:14         
> data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
> 2022-02-07T03:46:14.7111844Z Feb 07 03:46:14             .key_by(lambda x: 
> x[1], key_type=Types.STRING()) \
> 2022-02-07T03:46:14.7112334Z Feb 07 03:46:14             
> .process(MyProcessFunction(), output_type=Types.STRING()) \
> 2022-02-07T03:46:14.7112781Z Feb 07 03:46:14             
> .add_sink(self.test_sink)
> 2022-02-07T03:46:14.7113561Z Feb 07 03:46:14         self.env.execute('test 
> time stamp assigner with keyed process function')
> 2022-02-07T03:46:14.7114051Z Feb 07 03:46:14         results = 
> self.test_sink.get_results()
> 2022-02-07T03:46:14.7114548Z Feb 07 03:46:14         expected = ["current 
> key: hi, current value state: None, current list state: [], "
> 2022-02-07T03:46:14.7115251Z Feb 07 03:46:14                     "current map 
> state: {}, current value: Row(f0=1, f1='hi', "
> 2022-02-07T03:46:14.7115862Z Feb 07 03:46:14                     
> "f2='1603708211000')",
> 2022-02-07T03:46:14.7116293Z Feb 07 03:46:14                     "current 
> key: hello, current value state: None, "
> 2022-02-07T03:46:14.7116795Z Feb 07 03:46:14                     "current 
> list state: [], current map state: {}, current value: Row(f0=2,"
> 2022-02-07T03:46:14.7117458Z Feb 07 03:46:14                     " 
> f1='hello', f2='1603708224000')",
> 2022-02-07T03:46:14.7117937Z Feb 07 03:46:14                     "current 
> key: hi, current value state: 1, current list state: [1], "
> 2022-02-07T03:46:14.7118640Z Feb 07 03:46:14                     "current map 
> state: {1: hi}, current value: Row(f0=3, f1='hi', "
> 2022-02-07T03:46:14.7119348Z Feb 07 03:46:14                     
> "f2='1603708226000')",
> 2022-02-07T03:46:14.7119814Z Feb 07 03:46:14                     "current 
> key: hello, current value state: 2, current list state: [2], "
> 2022-02-07T03:46:14.7120541Z Feb 07 03:46:14                     "current map 
> state: {2: hello}, current value: Row(f0=4, f1='hello', "
> 2022-02-07T03:46:14.7121161Z Feb 07 03:46:14                     
> "f2='1603708289000')",
> 2022-02-07T03:46:14.7121720Z Feb 07 03:46:14                     "current 
> key: hi, current value state: 3, current list state: [1, 3], "
> 2022-02-07T03:46:14.7122444Z Feb 07 03:46:14                     "current map 
> state: {1: hi, 3: hi}, current value: Row(f0=5, f1='hi', "
> 2022-02-07T03:46:14.7123070Z Feb 07 03:46:14                     
> "f2='1603708291000')",
> 2022-02-07T03:46:14.7123540Z Feb 07 03:46:14                     "current 
> key: hello, current value state: 4, current list state: [2, 4],"
> 2022-02-07T03:46:14.7124088Z Feb 07 03:46:14                     " current 
> map state: {2: hello, 4: hello}, current value: Row(f0=6, "
> 2022-02-07T03:46:14.7124739Z Feb 07 03:46:14                     "f1='hello', 
> f2='1603708293000')"]
> 2022-02-07T03:46:14.7125184Z Feb 07 03:46:14 >       
> self.assert_equals_sorted(expected, results)
> 2022-02-07T03:46:14.7125537Z Feb 07 03:46:14 
> 2022-02-07T03:46:14.7125922Z Feb 07 03:46:14 
> pyflink/datastream/tests/test_data_stream.py:683: 
> 2022-02-07T03:46:14.7126404Z Feb 07 03:46:14 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> 2022-02-07T03:46:14.7126918Z Feb 07 03:46:14 
> pyflink/datastream/tests/test_data_stream.py:62: in assert_equals_sorted
> 2022-02-07T03:46:14.7127403Z Feb 07 03:46:14     self.assertEqual(expected, 
> actual)
> 2022-02-07T03:46:14.7128357Z Feb 07 03:46:14 E   AssertionError: Lists 
> differ: ["cur[719 chars]te: {1: hi, 3: hi}, current value: Row(f0=5, f[172 
> chars]0')"] != ["cur[719 chars]te: {3: hi}, current value: Row(f0=5, 
> f1='hi',[165 chars]0')"]
> 2022-02-07T03:46:14.7128969Z Feb 07 03:46:14 E   
> 2022-02-07T03:46:14.7129299Z Feb 07 03:46:14 E   First differing element 4:
> 2022-02-07T03:46:14.7168430Z Feb 07 03:46:14 E   "curr[80 chars]te: {1: hi, 
> 3: hi}, current value: Row(f0=5, f[23 chars]00')"
> 2022-02-07T03:46:14.7169423Z Feb 07 03:46:14 E   "curr[80 chars]te: {3: hi}, 
> current value: Row(f0=5, f1='hi',[16 chars]00')"
> 2022-02-07T03:46:14.7169864Z Feb 07 03:46:14 E   
> 2022-02-07T03:46:14.7170276Z Feb 07 03:46:14 E   Diff is 1211 characters 
> long. Set self.maxDiff to None to see it.
> 2022-02-07T03:46:14.7170964Z Feb 07 03:46:14 =============================== 
> warnings summary ===============================
> 2022-02-07T03:46:14.7171618Z Feb 07 03:46:14 
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_classpaths
> 2022-02-07T03:46:14.7172883Z Feb 07 03:46:14   
> /__w/1/s/flink-python/.tox/py36-cython/lib/python3.6/site-packages/future/standard_library/__init__.py:65:
>  DeprecationWarning: the imp module is deprecated in favour of importlib; see 
> the module's documentation for alternative uses
> 2022-02-07T03:46:14.7173901Z Feb 07 03:46:14     import imp
> 2022-02-07T03:46:14.7174207Z Feb 07 03:46:14 
> 2022-02-07T03:46:14.7174734Z Feb 07 03:46:14 
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file
> 2022-02-07T03:46:14.7175827Z Feb 07 03:46:14   
> /__w/1/s/flink-python/pyflink/table/table_environment.py:1999: 
> DeprecationWarning: Deprecated in 1.12. Use from_data_stream(DataStream, 
> *Expression) instead.
> 2022-02-07T03:46:14.7212771Z Feb 07 03:46:14     DeprecationWarning)
> 2022-02-07T03:46:14.7213185Z Feb 07 03:46:14 
> 2022-02-07T03:46:14.7213709Z Feb 07 03:46:14 
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_execute
> 2022-02-07T03:46:14.7214927Z Feb 07 03:46:14   
> /__w/1/s/flink-python/pyflink/table/table_environment.py:538: 
> DeprecationWarning: Deprecated in 1.10. Use create_table instead.
> 2022-02-07T03:46:14.7215607Z Feb 07 03:46:14     warnings.warn("Deprecated in 
> 1.10. Use create_table instead.", DeprecationWarning)
> 2022-02-07T03:46:14.7216284Z Feb 07 03:46:14 
> 2022-02-07T03:46:14.7217229Z Feb 07 03:46:14 -- Docs: 
> https://docs.pytest.org/en/stable/how-to/capture-warnings.html
> 2022-02-07T03:46:14.7217782Z Feb 07 03:46:14 ============================= 
> slowest 20 durations =============================
> 2022-02-07T03:46:14.7218383Z Feb 07 03:46:14 10.67s call     
> pyflink/datastream/tests/test_connectors.py::ConnectorTests::test_stream_file_sink
> 2022-02-07T03:46:14.7219092Z Feb 07 03:46:14 10.18s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_process_function_with_state
> 2022-02-07T03:46:14.7219864Z Feb 07 03:46:14 9.85s call     
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file
> 2022-02-07T03:46:14.7230675Z Feb 07 03:46:14 8.86s call     
> pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state
> 2022-02-07T03:46:14.7231521Z Feb 07 03:46:14 7.62s call     
> pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_aggregating_state
> 2022-02-07T03:46:14.7232254Z Feb 07 03:46:14 6.19s call     
> pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_execute_and_collect
> 2022-02-07T03:46:14.7232975Z Feb 07 03:46:14 5.55s call     
> pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_partition_custom
> 2022-02-07T03:46:14.7233690Z Feb 07 03:46:14 5.37s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_execute_and_collect
> 2022-02-07T03:46:14.7234452Z Feb 07 03:46:14 5.01s call     
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_add_python_file_2
> 2022-02-07T03:46:14.7235303Z Feb 07 03:46:14 5.00s call     
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_without_cached_directory
> 2022-02-07T03:46:14.7236194Z Feb 07 03:46:14 4.98s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations
> 2022-02-07T03:46:14.7236909Z Feb 07 03:46:14 4.97s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_process
> 2022-02-07T03:46:14.7237661Z Feb 07 03:46:14 4.90s call     
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_stream_env
> 2022-02-07T03:46:14.7238446Z Feb 07 03:46:14 4.80s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_basic_co_operations_with_output_type
> 2022-02-07T03:46:14.7239268Z Feb 07 03:46:14 4.23s call     
> pyflink/datastream/tests/test_stream_execution_environment.py::StreamExecutionEnvironmentTests::test_set_requirements_with_cached_directory
> 2022-02-07T03:46:14.7240269Z Feb 07 03:46:14 4.07s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_reduce_with_state
> 2022-02-07T03:46:14.7240956Z Feb 07 03:46:14 4.05s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_map
> 2022-02-07T03:46:14.7241649Z Feb 07 03:46:14 3.96s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_co_flat_map
> 2022-02-07T03:46:14.7242335Z Feb 07 03:46:14 3.90s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_map
> 2022-02-07T03:46:14.7242990Z Feb 07 03:46:14 3.82s call     
> pyflink/datastream/tests/test_data_stream.py::BatchModeDataStreamTests::test_keyed_filter
> 2022-02-07T03:46:14.7243598Z Feb 07 03:46:14 =========================== 
> short test summary info ============================
> 2022-02-07T03:46:14.7244251Z Feb 07 03:46:14 FAILED 
> pyflink/datastream/tests/test_data_stream.py::StreamingModeDataStreamTests::test_keyed_process_function_with_state
> {code}
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=30817&view=logs&j=821b528f-1eed-5598-a3b4-7f748b13f261&t=6bb545dd-772d-5d8c-f258-f5085fba3295&l=23725



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to