This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 046b2c8 Move test_sdf_synthetic_source to FnApiRunnerTest
new 44e6fec Merge pull request #8517 from boyuanzz/patch_synthetic
046b2c8 is described below
commit 046b2c884895184eb83a2e4d72ea82c57a28cb9b
Author: Boyuan Zhang <[email protected]>
AuthorDate: Tue May 7 09:18:55 2019 -0700
Move test_sdf_synthetic_source to FnApiRunnerTest
---
.../runners/portability/fn_api_runner_test.py | 56 +++++++++++-----------
1 file changed, 28 insertions(+), 28 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 28e3d1e..c584ef1 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -674,6 +674,34 @@ class FnApiRunnerTest(unittest.TestCase):
event_recorder.cleanup()
self.assertEqual(results, sorted(elements_list))
+ def test_sdf_synthetic_source(self):
+ common_attrs = {
+ 'key_size': 1,
+ 'value_size': 1,
+ 'initial_splitting_num_bundles': 2,
+ 'initial_splitting_desired_bundle_size': 2,
+ 'sleep_per_input_record_sec': 0,
+ 'initial_splitting': 'const'
+ }
+ num_source_description = 5
+ min_num_record = 10
+ max_num_record = 20
+
+ # pylint: disable=unused-variable
+ source_descriptions = ([dict(
+ {'num_records': random.randint(min_num_record, max_num_record)},
+ **common_attrs) for i in range(0, num_source_description)])
+ total_num_records = 0
+ for source in source_descriptions:
+ total_num_records += source['num_records']
+
+ with self.create_pipeline() as p:
+ res = (p
+ | beam.Create(source_descriptions)
+ | beam.ParDo(SyntheticSDFAsSource())
+ | beam.combiners.Count.Globally())
+ assert_that(res, equal_to([total_num_records]))
+
# These tests are kept in a separate group so that they are
# not ran in he FnApiRunnerTestWithBundleRepeat which repeats
@@ -1051,34 +1079,6 @@ class FnApiRunnerMetricsTest(unittest.TestCase):
print(res._monitoring_infos_by_stage)
raise
- def test_sdf_synthetic_source(self):
- common_attrs = {
- 'key_size': 1,
- 'value_size': 1,
- 'initial_splitting_num_bundles': 2,
- 'initial_splitting_desired_bundle_size': 2,
- 'sleep_per_input_record_sec': 0,
- 'initial_splitting': 'const'
- }
- num_source_description = 5
- min_num_record = 10
- max_num_record = 20
-
- # pylint: disable=unused-variable
- source_descriptions = ([dict(
- {'num_records': random.randint(min_num_record, max_num_record)},
- **common_attrs) for i in range(0, num_source_description)])
- total_num_records = 0
- for source in source_descriptions:
- total_num_records += source['num_records']
-
- with self.create_pipeline() as p:
- res = (p
- | beam.Create(source_descriptions)
- | beam.ParDo(SyntheticSDFAsSource())
- | beam.combiners.Count.Globally())
- assert_that(res, equal_to([total_num_records]))
-
class FnApiRunnerTestWithGrpc(FnApiRunnerTest):