This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 883271c [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature new 53800e8 Merge pull request #11227 from [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature 883271c is described below commit 883271c636f5a9381371c429ed32a50446e7528b Author: Sam Rohde <rohde.sam...@gmail.com> AuthorDate: Wed Mar 25 10:44:18 2020 -0700 [BEAM-9601] Skip the streaming wordcount test because it uses a Python3.5.3+ feature Change-Id: I9caaf395fd0fc58565e54a8458e8289af761815f --- .../runners/interactive/interactive_runner_test.py | 64 +++++++++++----------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index 329e0f0..45ef110 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -26,6 +26,7 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function +import sys import unittest from datetime import timedelta @@ -40,6 +41,7 @@ from apache_beam.runners.interactive import interactive_runner from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython from apache_beam.testing.test_stream import TestStream from apache_beam.transforms.window import GlobalWindow +from apache_beam.transforms.window import IntervalWindow from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.windowed_value import PaneInfo from apache_beam.utils.windowed_value import PaneInfoTiming @@ -150,9 +152,10 @@ class InteractiveRunnerTest(unittest.TestCase): ] self.assertEqual(actual_reified, expected_reified) + @unittest.skipIf( + sys.version_info < (3, 5, 3), + 'The tests require at least Python 3.6 to work.') def test_streaming_wordcount(self): - self.skipTest('[BEAM-9601] Test is breaking PreCommits') - class WordExtractingDoFn(beam.DoFn): def process(self, element): text_line = element.strip() @@ -196,20 +199,17 @@ class InteractiveRunnerTest(unittest.TestCase): # This tests that the data was correctly cached. pane_info = PaneInfo(True, True, PaneInfoTiming.UNKNOWN, 0, 0) expected_data_df = pd.DataFrame([ - ('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info), - ('be', 0, [beam.window.IntervalWindow(0, 10)], pane_info), - ('or', 0, [beam.window.IntervalWindow(0, 10)], pane_info), - ('not', 0, [beam.window.IntervalWindow(0, 10)], pane_info), - ('to', 0, [beam.window.IntervalWindow(0, 10)], pane_info), - ('be', 0, [beam.window.IntervalWindow(0, 10)], pane_info), - ('that', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info), - ('is', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info), - ('the', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info), - ('question', 20000000, [beam.window.IntervalWindow(20, 30)], pane_info) - ], - columns=[ - 0, 'event_time', 'windows', 'pane_info' - ]) + ('to', 0, [IntervalWindow(0, 10)], pane_info), + ('be', 0, [IntervalWindow(0, 10)], pane_info), + ('or', 0, [IntervalWindow(0, 10)], pane_info), + ('not', 0, [IntervalWindow(0, 10)], pane_info), + ('to', 0, [IntervalWindow(0, 10)], pane_info), + ('be', 0, [IntervalWindow(0, 10)], pane_info), + ('that', 20000000, [IntervalWindow(20, 30)], pane_info), + ('is', 20000000, [IntervalWindow(20, 30)], pane_info), + ('the', 20000000, [IntervalWindow(20, 30)], pane_info), + ('question', 20000000, [IntervalWindow(20, 30)], pane_info) + ], columns=[0, 'event_time', 'windows', 'pane_info']) # yapf: disable data_df = ib.collect(data, include_window_info=True) pd.testing.assert_frame_equal(expected_data_df, data_df) @@ -217,23 +217,25 @@ class InteractiveRunnerTest(unittest.TestCase): # This tests that the windowing was passed correctly so that all the data # is aggregated also correctly. pane_info = PaneInfo(True, False, PaneInfoTiming.ON_TIME, 0, 0) - expected_counts_df = pd.DataFrame( - [('to', 2, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info), - ('be', 2, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info), - ('or', 1, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info), - ('not', 1, 9999999, [beam.window.IntervalWindow(0, 10)], pane_info), - ('that', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info), - ('is', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info), - ('the', 1, 29999999, [beam.window.IntervalWindow(20, 30)], pane_info), - ( - 'question', - 1, - 29999999, [beam.window.IntervalWindow(20, 30)], - pane_info)], - columns=[0, 1, 'event_time', 'windows', 'pane_info']) + expected_counts_df = pd.DataFrame([ + ('be', 2, 9999999, [IntervalWindow(0, 10)], pane_info), + ('not', 1, 9999999, [IntervalWindow(0, 10)], pane_info), + ('or', 1, 9999999, [IntervalWindow(0, 10)], pane_info), + ('to', 2, 9999999, [IntervalWindow(0, 10)], pane_info), + ('is', 1, 29999999, [IntervalWindow(20, 30)], pane_info), + ('question', 1, 29999999, [IntervalWindow(20, 30)], pane_info), + ('that', 1, 29999999, [IntervalWindow(20, 30)], pane_info), + ('the', 1, 29999999, [IntervalWindow(20, 30)], pane_info), + ], columns=[0, 1, 'event_time', 'windows', 'pane_info']) # yapf: disable counts_df = ib.collect(counts, include_window_info=True) - pd.testing.assert_frame_equal(expected_counts_df, counts_df) + + # The group by key has no guarantee of order. So we post-process the DF by + # sorting so we can test equality. + sorted_counts_df = (counts_df + .sort_values(['event_time', 0], ascending=True) + .reset_index(drop=True)) # yapf: disable + pd.testing.assert_frame_equal(expected_counts_df, sorted_counts_df) def test_session(self): class MockPipelineRunner(object):