Deprecate <pipeline>.options usage
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/444da273 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/444da273 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/444da273 Branch: refs/heads/gearpump-runner Commit: 444da273ac8d01f2343eef76d8d4de5b0b78b409 Parents: 3bb0f8e Author: Maria Garcia Herrero <[email protected]> Authored: Sat Apr 29 09:43:55 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Tue May 2 17:36:56 2017 -0700 ---------------------------------------------------------------------- .../examples/snippets/snippets_test.py | 11 +- sdks/python/apache_beam/pipeline.py | 25 +++-- sdks/python/apache_beam/pipeline_test.py | 2 +- .../runners/dataflow/dataflow_runner.py | 7 +- .../runners/dataflow/dataflow_runner_test.py | 4 +- .../runners/dataflow/template_runner_test.py | 2 +- .../runners/dataflow/test_dataflow_runner.py | 4 +- .../apache_beam/runners/direct/direct_runner.py | 4 +- sdks/python/apache_beam/transforms/core.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 102 +++++++++---------- 10 files changed, 84 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index afd7918..370b436 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -31,7 +31,7 @@ from apache_beam import pvalue from apache_beam import typehints from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to -from apache_beam.utils.pipeline_options import TypeOptions +from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.examples.snippets import snippets # pylint: disable=expression-not-assigned @@ -245,10 +245,9 @@ class ParDoTest(unittest.TestCase): class TypeHintsTest(unittest.TestCase): def test_bad_types(self): - p = TestPipeline() - evens = None # pylint: disable=unused-variable - # [START type_hints_missing_define_numbers] + p = TestPipeline(options=PipelineOptions(pipeline_type_check=True)) + numbers = p | beam.Create(['1', '2', '3']) # [END type_hints_missing_define_numbers] @@ -269,7 +268,6 @@ class TypeHintsTest(unittest.TestCase): # To catch this early, we can assert what types we expect. with self.assertRaises(typehints.TypeCheckError): # [START type_hints_takes] - p.options.view_as(TypeOptions).pipeline_type_check = True evens = numbers | beam.Filter(lambda x: x % 2 == 0).with_input_types(int) # [END type_hints_takes] @@ -315,10 +313,9 @@ class TypeHintsTest(unittest.TestCase): def test_runtime_checks_on(self): # pylint: disable=expression-not-assigned - p = TestPipeline() + p = TestPipeline(options=PipelineOptions(runtime_type_check=True)) with self.assertRaises(typehints.TypeCheckError): # [START type_hints_runtime_on] - p.options.view_as(TypeOptions).runtime_type_check = True p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str) p.run() # [END type_hints_runtime_on] http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 100c50a..9200363 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -64,6 +64,7 @@ from apache_beam.utils.pipeline_options import SetupOptions from apache_beam.utils.pipeline_options import StandardOptions from apache_beam.utils.pipeline_options import TypeOptions from apache_beam.utils.pipeline_options_validator import PipelineOptionsValidator +from apache_beam.utils.annotations import deprecated class Pipeline(object): @@ -94,25 +95,24 @@ class Pipeline(object): ValueError: if either the runner or options argument is not of the expected type. """ - if options is not None: if isinstance(options, PipelineOptions): - self.options = options + self._options = options else: raise ValueError( 'Parameter options, if specified, must be of type PipelineOptions. ' 'Received : %r', options) elif argv is not None: if isinstance(argv, list): - self.options = PipelineOptions(argv) + self._options = PipelineOptions(argv) else: raise ValueError( 'Parameter argv, if specified, must be a list. Received : %r', argv) else: - self.options = PipelineOptions([]) + self._options = PipelineOptions([]) if runner is None: - runner = self.options.view_as(StandardOptions).runner + runner = self._options.view_as(StandardOptions).runner if runner is None: runner = StandardOptions.DEFAULT_RUNNER logging.info(('Missing pipeline option (runner). Executing pipeline ' @@ -125,7 +125,7 @@ class Pipeline(object): 'name of a registered runner.') # Validate pipeline options - errors = PipelineOptionsValidator(self.options, runner).validate() + errors = PipelineOptionsValidator(self._options, runner).validate() if errors: raise ValueError( 'Pipeline has validations errors: \n' + '\n'.join(errors)) @@ -140,6 +140,13 @@ class Pipeline(object): # then the transform will have to be cloned with a new label. self.applied_labels = set() + @property + @deprecated(since='First stable release', + extra_message='References to <pipeline>.options' + ' will not be supported') + def options(self): + return self._options + def _current_transform(self): """Returns the transform currently on the top of the stack.""" return self.transforms_stack[-1] @@ -154,9 +161,9 @@ class Pipeline(object): # When possible, invoke a round trip through the runner API. if test_runner_api and self._verify_runner_api_compatible(): return Pipeline.from_runner_api( - self.to_runner_api(), self.runner, self.options).run(False) + self.to_runner_api(), self.runner, self._options).run(False) - if self.options.view_as(SetupOptions).save_main_session: + if self._options.view_as(SetupOptions).save_main_session: # If this option is chosen, verify we can pickle the main session early. tmpdir = tempfile.mkdtemp() try: @@ -246,7 +253,7 @@ class Pipeline(object): self._current_transform().add_part(current) self.transforms_stack.append(current) - type_options = self.options.view_as(TypeOptions) + type_options = self._options.view_as(TypeOptions) if type_options.pipeline_type_check: transform.type_check_inputs(pvalueish) http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 12348dc..ebcc43b 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -442,7 +442,7 @@ class RunnerApiTest(unittest.TestCase): p | beam.Create([None]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned proto = p.to_runner_api() - p2 = Pipeline.from_runner_api(proto, p.runner, p.options) + p2 = Pipeline.from_runner_api(proto, p.runner, p._options) p2.run() http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 05f6833..3332033 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -210,6 +210,7 @@ class DataflowRunner(PipelineRunner): return FlattenInputVisitor() + # TODO(mariagh): Make this method take pipepline_options def run(self, pipeline): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. @@ -220,7 +221,7 @@ class DataflowRunner(PipelineRunner): raise ImportError( 'Google Cloud Dataflow runner not available, ' 'please install apache_beam[gcp]') - self.job = apiclient.Job(pipeline.options) + self.job = apiclient.Job(pipeline._options) # Dataflow runner requires a KV type for GBK inputs, hence we enforce that # here. @@ -233,7 +234,7 @@ class DataflowRunner(PipelineRunner): # The superclass's run will trigger a traversal of all reachable nodes. super(DataflowRunner, self).run(pipeline) - standard_options = pipeline.options.view_as(StandardOptions) + standard_options = pipeline._options.view_as(StandardOptions) if standard_options.streaming: job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION else: @@ -241,7 +242,7 @@ class DataflowRunner(PipelineRunner): # Get a Dataflow API client and set its options self.dataflow_client = apiclient.DataflowApplicationClient( - pipeline.options, job_version) + pipeline._options, job_version) # Create the job result = DataflowPipelineResult( http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index f342be5..872dfcd 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -105,7 +105,7 @@ class DataflowRunnerTest(unittest.TestCase): (p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - remote_runner.job = apiclient.Job(p.options) + remote_runner.job = apiclient.Job(p._options) super(DataflowRunner, remote_runner).run(p) def test_remote_runner_display_data(self): @@ -139,7 +139,7 @@ class DataflowRunnerTest(unittest.TestCase): (p | ptransform.Create([1, 2, 3, 4, 5]) | 'Do' >> SpecialParDo(SpecialDoFn(), now)) - remote_runner.job = apiclient.Job(p.options) + remote_runner.job = apiclient.Job(p._options) super(DataflowRunner, remote_runner).run(p) job_dict = json.loads(str(remote_runner.job)) steps = [step http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/template_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py index ee495f9..5eb0f23 100644 --- a/sdks/python/apache_beam/runners/dataflow/template_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/template_runner_test.py @@ -87,7 +87,7 @@ class TemplatingDataflowRunnerTest(unittest.TestCase): '--temp_location=/dev/null', '--template_location=/bad/path', '--no_auth=True'])) - remote_runner.job = apiclient.Job(pipeline.options) + remote_runner.job = apiclient.Job(pipeline._options) with self.assertRaises(IOError): pipeline.run().wait_until_finish() http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py index 4cf4131..290c7ad 100644 --- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py @@ -25,7 +25,7 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner class TestDataflowRunner(DataflowRunner): def run(self, pipeline): """Execute test pipeline and verify test matcher""" - options = pipeline.options.view_as(TestOptions) + options = pipeline._options.view_as(TestOptions) on_success_matcher = options.on_success_matcher # [BEAM-1889] Do not send this to remote workers also, there is no need to @@ -34,7 +34,7 @@ class TestDataflowRunner(DataflowRunner): self.result = super(TestDataflowRunner, self).run(pipeline) if self.result.has_job: - project = pipeline.options.view_as(GoogleCloudOptions).project + project = pipeline._options.view_as(GoogleCloudOptions).project job_id = self.result.job_id() # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs # in some cases. http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index cd0447f..d5aba5a 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -72,8 +72,8 @@ class DirectRunner(PipelineRunner): pipeline.visit(self.consumer_tracking_visitor) evaluation_context = EvaluationContext( - pipeline.options, - BundleFactory(stacked=pipeline.options.view_as(DirectOptions) + pipeline._options, + BundleFactory(stacked=pipeline._options.view_as(DirectOptions) .direct_runner_use_stacked_bundle), self.consumer_tracking_visitor.root_transforms, self.consumer_tracking_visitor.value_to_consumers, http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 8e3c9a2..62a9b97 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -957,7 +957,7 @@ class CombineValues(PTransformWithSideInputs): key_type, _ = input_type.tuple_types runtime_type_check = ( - pcoll.pipeline.options.view_as(TypeOptions).runtime_type_check) + pcoll.pipeline._options.view_as(TypeOptions).runtime_type_check) return pcoll | ParDo( CombineValuesDoFn(key_type, self.fn, runtime_type_check), *args, **kwargs) http://git-wip-us.apache.org/repos/asf/beam/blob/444da273/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 46c340c..ab9417e 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -882,7 +882,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_do_fn_pipeline_runtime_type_check_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True @with_input_types(int, int) @with_output_types(int) @@ -898,7 +898,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_do_fn_pipeline_runtime_type_check_violated(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True @with_input_types(int, int) @with_output_types(typehints.List[int]) @@ -1132,7 +1132,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_pipeline_checking_pardo_insufficient_type_information(self): - self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' + self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' # Type checking is enabled, but 'Create' doesn't pass on any relevant type # information to the ParDo. @@ -1146,7 +1146,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_pipeline_checking_gbk_insufficient_type_information(self): - self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' + self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' # Type checking is enabled, but 'Map' doesn't pass on any relevant type # information to GBK-only. with self.assertRaises(typehints.TypeCheckError) as e: @@ -1161,7 +1161,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_disable_pipeline_type_check(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).pipeline_type_check = False # The pipeline below should raise a TypeError, however pipeline type # checking was disabled above. @@ -1171,8 +1171,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): .with_input_types(str).with_output_types(str)) def test_run_time_type_checking_enabled_type_violation(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True @with_output_types(str) @with_input_types(x=int) @@ -1195,8 +1195,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "instead found some_string, an instance of <type 'str'>.") def test_run_time_type_checking_enabled_types_satisfied(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True @with_output_types(typehints.KV[int, str]) @with_input_types(x=str) @@ -1217,8 +1217,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_pipeline_checking_satisfied_but_run_time_types_violate(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True @with_output_types(typehints.KV[bool, int]) @with_input_types(a=int) @@ -1247,7 +1247,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "instead received an instance of type int.") def test_pipeline_checking_satisfied_run_time_checking_satisfied(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).pipeline_type_check = False @with_output_types(typehints.KV[bool, int]) @with_input_types(a=int) @@ -1265,8 +1265,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_pipeline_runtime_checking_violation_simple_type_input(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False # The type-hinted applied via the 'with_input_types()' method indicates the # ParDo should receive an instance of type 'str', however an 'int' will be @@ -1286,8 +1286,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "instead found 1, an instance of <type 'int'>.") def test_pipeline_runtime_checking_violation_composite_type_input(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False with self.assertRaises(typehints.TypeCheckError) as e: (self.p @@ -1305,8 +1305,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "instead found 3.0, an instance of <type 'float'>.") def test_pipeline_runtime_checking_violation_simple_type_output(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False # The type-hinted applied via the 'returns()' method indicates the ParDo # should output an instance of type 'int', however a 'float' will be @@ -1331,8 +1331,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "an instance of type <type 'float'>.") def test_pipeline_runtime_checking_violation_composite_type_output(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False # The type-hinted applied via the 'returns()' method indicates the ParDo # should return an instance of type: Tuple[float, int]. However, an instance @@ -1354,8 +1354,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "an instance of 'float' was received.") def test_pipline_runtime_checking_violation_with_side_inputs_decorator(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True @with_output_types(int) @with_input_types(a=int, b=int) @@ -1374,8 +1374,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "instead found 1.0, an instance of <type 'float'>.") def test_pipline_runtime_checking_violation_with_side_inputs_via_method(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False with self.assertRaises(typehints.TypeCheckError) as e: (self.p @@ -1444,7 +1444,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_combine_runtime_type_check_satisfied_using_decorators(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).pipeline_type_check = False @with_output_types(int) @with_input_types(ints=typehints.Iterable[int]) @@ -1459,8 +1459,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_combine_runtime_type_check_violation_using_decorators(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True # Combine fn is returning the incorrect type @with_output_types(int) @@ -1497,8 +1497,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_combine_runtime_type_check_using_methods(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create(range(5)).with_output_types(int) @@ -1520,8 +1520,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_combine_runtime_type_check_violation_using_methods(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True with self.assertRaises(typehints.TypeCheckError) as e: (self.p @@ -1539,7 +1539,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): "instead found 0, an instance of <type 'int'>.") def test_combine_insufficient_type_hint_information(self): - self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' + self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' with self.assertRaises(typehints.TypeCheckError) as e: (self.p @@ -1576,7 +1576,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_mean_globally_runtime_checking_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | 'C' >> beam.Create(range(5)).with_output_types(int) @@ -1587,8 +1587,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_mean_globally_runtime_checking_violated(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True with self.assertRaises(typehints.TypeCheckError) as e: (self.p @@ -1633,7 +1633,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_mean_per_key_runtime_checking_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create(range(5)).with_output_types(int) @@ -1646,8 +1646,8 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_mean_per_key_runtime_checking_violated(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).runtime_type_check = True with self.assertRaises(typehints.TypeCheckError) as e: (self.p @@ -1680,7 +1680,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_count_globally_runtime_type_checking_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | 'P' >> beam.Create(range(5)).with_output_types(int) @@ -1714,7 +1714,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_count_perkey_runtime_type_checking_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create(['t', 'e', 's', 't']).with_output_types(str) @@ -1736,7 +1736,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_count_perelement_pipeline_type_checking_violated(self): - self.p.options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' + self.p._options.view_as(TypeOptions).type_check_strictness = 'ALL_REQUIRED' with self.assertRaises(typehints.TypeCheckError) as e: (self.p @@ -1749,7 +1749,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_count_perelement_runtime_type_checking_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create([True, True, False, True, True]) @@ -1771,7 +1771,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_top_of_runtime_checking_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create(list('testing')).with_output_types(str) @@ -1807,7 +1807,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_per_key_runtime_checking_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create(range(21)) @@ -1835,7 +1835,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_sample_globally_runtime_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create([2, 2, 3, 3]).with_output_types(int) @@ -1868,7 +1868,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_sample_per_key_runtime_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | (beam.Create([(1, 2), (1, 2), (2, 3), (2, 3)]) @@ -1901,7 +1901,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_to_list_runtime_check_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | beam.Create(list('test')).with_output_types(str) @@ -1940,7 +1940,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_to_dict_runtime_check_satisfied(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True d = (self.p | (beam.Create([('1', 2), ('3', 4)]) @@ -1952,7 +1952,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): self.p.run() def test_runtime_type_check_python_type_error(self): - self.p.options.view_as(TypeOptions).runtime_type_check = True + self.p._options.view_as(TypeOptions).runtime_type_check = True with self.assertRaises(TypeError) as e: (self.p @@ -2000,11 +2000,11 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): e.exception.message) def test_type_inference_command_line_flag_toggle(self): - self.p.options.view_as(TypeOptions).pipeline_type_check = False + self.p._options.view_as(TypeOptions).pipeline_type_check = False x = self.p | 'C1' >> beam.Create([1, 2, 3, 4]) self.assertIsNone(x.element_type) - self.p.options.view_as(TypeOptions).pipeline_type_check = True + self.p._options.view_as(TypeOptions).pipeline_type_check = True x = self.p | 'C2' >> beam.Create([1, 2, 3, 4]) self.assertEqual(int, x.element_type)
