This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 046b2c8  Move test_sdf_synthetic_source to FnApiRunnerTest
     new 44e6fec  Merge pull request #8517 from boyuanzz/patch_synthetic
046b2c8 is described below

commit 046b2c884895184eb83a2e4d72ea82c57a28cb9b
Author: Boyuan Zhang <[email protected]>
AuthorDate: Tue May 7 09:18:55 2019 -0700

    Move test_sdf_synthetic_source to FnApiRunnerTest
---
 .../runners/portability/fn_api_runner_test.py      | 56 +++++++++++-----------
 1 file changed, 28 insertions(+), 28 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 28e3d1e..c584ef1 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -674,6 +674,34 @@ class FnApiRunnerTest(unittest.TestCase):
     event_recorder.cleanup()
     self.assertEqual(results, sorted(elements_list))
 
+  def test_sdf_synthetic_source(self):
+    common_attrs = {
+        'key_size': 1,
+        'value_size': 1,
+        'initial_splitting_num_bundles': 2,
+        'initial_splitting_desired_bundle_size': 2,
+        'sleep_per_input_record_sec': 0,
+        'initial_splitting': 'const'
+    }
+    num_source_description = 5
+    min_num_record = 10
+    max_num_record = 20
+
+    # pylint: disable=unused-variable
+    source_descriptions = ([dict(
+        {'num_records': random.randint(min_num_record, max_num_record)},
+        **common_attrs) for i in range(0, num_source_description)])
+    total_num_records = 0
+    for source in source_descriptions:
+      total_num_records += source['num_records']
+
+    with self.create_pipeline() as p:
+      res = (p
+             | beam.Create(source_descriptions)
+             | beam.ParDo(SyntheticSDFAsSource())
+             | beam.combiners.Count.Globally())
+      assert_that(res, equal_to([total_num_records]))
+
 
 # These tests are kept in a separate group so that they are
 # not ran in he FnApiRunnerTestWithBundleRepeat which repeats
@@ -1051,34 +1079,6 @@ class FnApiRunnerMetricsTest(unittest.TestCase):
       print(res._monitoring_infos_by_stage)
       raise
 
-  def test_sdf_synthetic_source(self):
-    common_attrs = {
-        'key_size': 1,
-        'value_size': 1,
-        'initial_splitting_num_bundles': 2,
-        'initial_splitting_desired_bundle_size': 2,
-        'sleep_per_input_record_sec': 0,
-        'initial_splitting': 'const'
-    }
-    num_source_description = 5
-    min_num_record = 10
-    max_num_record = 20
-
-    # pylint: disable=unused-variable
-    source_descriptions = ([dict(
-        {'num_records': random.randint(min_num_record, max_num_record)},
-        **common_attrs) for i in range(0, num_source_description)])
-    total_num_records = 0
-    for source in source_descriptions:
-      total_num_records += source['num_records']
-
-    with self.create_pipeline() as p:
-      res = (p
-             | beam.Create(source_descriptions)
-             | beam.ParDo(SyntheticSDFAsSource())
-             | beam.combiners.Count.Globally())
-      assert_that(res, equal_to([total_num_records]))
-
 
 class FnApiRunnerTestWithGrpc(FnApiRunnerTest):
 

Reply via email to