Xintong Song created FLINK-23745:
------------------------------------
Summary: Python test_keyed_co_process fails on azure
Key: FLINK-23745
URL: https://issues.apache.org/jira/browse/FLINK-23745
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.14.0
Reporter: Xintong Song
Fix For: 1.14.0
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=22020&view=logs&j=9cada3cb-c1d3-5621-16da-0f718fb86602&t=c67e71ed-6451-5d26-8920-5a8cf9651901&l=21602
{code}
Aug 12 22:44:38 =================================== FAILURES
===================================
Aug 12 22:44:38 ______________
StreamingModeDataStreamTests.test_keyed_co_process ______________
Aug 12 22:44:38
Aug 12 22:44:38 self =
<pyflink.datastream.tests.test_data_stream.StreamingModeDataStreamTests
testMethod=test_keyed_co_process>
Aug 12 22:44:38
Aug 12 22:44:38 def test_keyed_co_process(self):
Aug 12 22:44:38 ds1 = self.env.from_collection([("a", 1), ("b", 2),
("c", 3)],
Aug 12 22:44:38
type_info=Types.ROW([Types.STRING(), Types.INT()]))
Aug 12 22:44:38 ds2 = self.env.from_collection([("b", 2), ("c", 3),
("d", 4)],
Aug 12 22:44:38
type_info=Types.ROW([Types.STRING(), Types.INT()]))
Aug 12 22:44:38 ds1 = ds1.assign_timestamps_and_watermarks(
Aug 12 22:44:38
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
Aug 12 22:44:38 SecondColumnTimestampAssigner()))
Aug 12 22:44:38 ds2 = ds2.assign_timestamps_and_watermarks(
Aug 12 22:44:38
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
Aug 12 22:44:38 SecondColumnTimestampAssigner()))
Aug 12 22:44:38 ds1.connect(ds2) \
Aug 12 22:44:38 .key_by(lambda x: x[0], lambda x: x[0]) \
Aug 12 22:44:38 .process(MyKeyedCoProcessFunction()) \
Aug 12 22:44:38 .map(lambda x: Row(x[0], x[1] + 1)) \
Aug 12 22:44:38 .add_sink(self.test_sink)
Aug 12 22:44:38 self.env.execute('test_keyed_co_process_function')
Aug 12 22:44:38 results = self.test_sink.get_results(True)
Aug 12 22:44:38 expected = ["<Row('a', 2)>",
Aug 12 22:44:38 "<Row('b', 2)>",
Aug 12 22:44:38 "<Row('b', 3)>",
Aug 12 22:44:38 "<Row('c', 2)>",
Aug 12 22:44:38 "<Row('c', 3)>",
Aug 12 22:44:38 "<Row('d', 2)>",
Aug 12 22:44:38 "<Row('on_timer', 4)>"]
Aug 12 22:44:38 > self.assert_equals_sorted(expected, results)
Aug 12 22:44:38
Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:211:
Aug 12 22:44:38 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _
Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:61: in
assert_equals_sorted
Aug 12 22:44:38 self.assertEqual(expected, actual)
Aug 12 22:44:38 E AssertionError: Lists differ: ["<Ro[82 chars]<Row('d',
2)>", "<Row('on_timer', 4)>"] != ["<Ro[82 chars]<Row('d', 2)>",
"<Row('on_timer', 4)>", "<Row('on_timer', 4)>"]
Aug 12 22:44:38 E
Aug 12 22:44:38 E Second list contains 1 additional elements.
Aug 12 22:44:38 E First extra element 7:
Aug 12 22:44:38 E "<Row('on_timer', 4)>"
Aug 12 22:44:38 E
Aug 12 22:44:38 E ["<Row('a', 2)>",
Aug 12 22:44:38 E "<Row('b', 2)>",
Aug 12 22:44:38 E "<Row('b', 3)>",
Aug 12 22:44:38 E "<Row('c', 2)>",
Aug 12 22:44:38 E "<Row('c', 3)>",
Aug 12 22:44:38 E "<Row('d', 2)>",
Aug 12 22:44:38 E + "<Row('on_timer', 4)>",
Aug 12 22:44:38 E "<Row('on_timer', 4)>"]
Aug 12 22:44:38 ________________ BatchModeDataStreamTests.test_keyed_co_process
________________
Aug 12 22:44:38
Aug 12 22:44:38 self =
<pyflink.datastream.tests.test_data_stream.BatchModeDataStreamTests
testMethod=test_keyed_co_process>
Aug 12 22:44:38
Aug 12 22:44:38 def test_keyed_co_process(self):
Aug 12 22:44:38 ds1 = self.env.from_collection([("a", 1), ("b", 2),
("c", 3)],
Aug 12 22:44:38
type_info=Types.ROW([Types.STRING(), Types.INT()]))
Aug 12 22:44:38 ds2 = self.env.from_collection([("b", 2), ("c", 3),
("d", 4)],
Aug 12 22:44:38
type_info=Types.ROW([Types.STRING(), Types.INT()]))
Aug 12 22:44:38 ds1 = ds1.assign_timestamps_and_watermarks(
Aug 12 22:44:38
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
Aug 12 22:44:38 SecondColumnTimestampAssigner()))
Aug 12 22:44:38 ds2 = ds2.assign_timestamps_and_watermarks(
Aug 12 22:44:38
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
Aug 12 22:44:38 SecondColumnTimestampAssigner()))
Aug 12 22:44:38 ds1.connect(ds2) \
Aug 12 22:44:38 .key_by(lambda x: x[0], lambda x: x[0]) \
Aug 12 22:44:38 .process(MyKeyedCoProcessFunction()) \
Aug 12 22:44:38 .map(lambda x: Row(x[0], x[1] + 1)) \
Aug 12 22:44:38 .add_sink(self.test_sink)
Aug 12 22:44:38 self.env.execute('test_keyed_co_process_function')
Aug 12 22:44:38 results = self.test_sink.get_results(True)
Aug 12 22:44:38 expected = ["<Row('a', 2)>",
Aug 12 22:44:38 "<Row('b', 2)>",
Aug 12 22:44:38 "<Row('b', 3)>",
Aug 12 22:44:38 "<Row('c', 2)>",
Aug 12 22:44:38 "<Row('c', 3)>",
Aug 12 22:44:38 "<Row('d', 2)>",
Aug 12 22:44:38 "<Row('on_timer', 4)>"]
Aug 12 22:44:38 > self.assert_equals_sorted(expected, results)
Aug 12 22:44:38
Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:211:
Aug 12 22:44:38 _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _
Aug 12 22:44:38 pyflink/datastream/tests/test_data_stream.py:61: in
assert_equals_sorted
Aug 12 22:44:38 self.assertEqual(expected, actual)
Aug 12 22:44:38 E AssertionError: Lists differ: ["<Ro[82 chars]<Row('d',
2)>", "<Row('on_timer', 4)>"] != ["<Ro[82 chars]<Row('d', 2)>",
"<Row('on_timer', 4)>", "<Row('on_timer', 4)>"]
Aug 12 22:44:38 E
Aug 12 22:44:38 E Second list contains 1 additional elements.
Aug 12 22:44:38 E First extra element 7:
Aug 12 22:44:38 E "<Row('on_timer', 4)>"
Aug 12 22:44:38 E
Aug 12 22:44:38 E ["<Row('a', 2)>",
Aug 12 22:44:38 E "<Row('b', 2)>",
Aug 12 22:44:38 E "<Row('b', 3)>",
Aug 12 22:44:38 E "<Row('c', 2)>",
Aug 12 22:44:38 E "<Row('c', 3)>",
Aug 12 22:44:38 E "<Row('d', 2)>",
Aug 12 22:44:38 E + "<Row('on_timer', 4)>",
Aug 12 22:44:38 E "<Row('on_timer', 4)>"]
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)