[ 
https://issues.apache.org/jira/browse/BEAM-5462?focusedWorklogId=172021&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172021
 ]

ASF GitHub Bot logged work on BEAM-5462:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Dec/18 19:00
            Start Date: 04/Dec/18 19:00
    Worklog Time Spent: 10m 
      Work Description: chamikaramj closed pull request #6930: [BEAM-5462] get 
rid of <pipeline>.options deprecation warnings in tests
URL: https://github.com/apache/beam/pull/6930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_test.py
index a95ffc6e5423..04f8802287c4 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py
@@ -35,6 +35,7 @@
 from apache_beam.io.gcp.pubsub import WriteToPubSub
 from apache_beam.io.gcp.pubsub import _PubSubSink
 from apache_beam.io.gcp.pubsub import _PubSubSource
+from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.runners.direct import transform_evaluator
 from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
@@ -109,8 +110,9 @@ def test_repr(self):
 class TestReadFromPubSubOverride(unittest.TestCase):
 
   def test_expand_with_topic(self):
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/a_topic',
                               None, 'a_label', with_attributes=False,
@@ -119,7 +121,7 @@ def test_expand_with_topic(self):
     self.assertEqual(bytes, pcoll.element_type)
 
     # Apply the necessary PTransformOverrides.
-    overrides = _get_transform_overrides(p.options)
+    overrides = _get_transform_overrides(options)
     p.replace_all(overrides)
 
     # Note that the direct output of ReadFromPubSub will be replaced
@@ -132,8 +134,9 @@ def test_expand_with_topic(self):
     self.assertEqual('a_label', source.id_label)
 
   def test_expand_with_subscription(self):
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub(
                  None, 'projects/fakeprj/subscriptions/a_subscription',
@@ -142,7 +145,7 @@ def test_expand_with_subscription(self):
     self.assertEqual(bytes, pcoll.element_type)
 
     # Apply the necessary PTransformOverrides.
-    overrides = _get_transform_overrides(p.options)
+    overrides = _get_transform_overrides(options)
     p.replace_all(overrides)
 
     # Note that the direct output of ReadFromPubSub will be replaced
@@ -167,8 +170,9 @@ def test_expand_with_both_topic_and_subscription(self):
                      with_attributes=False, timestamp_attribute=None)
 
   def test_expand_with_other_options(self):
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/a_topic',
                               None, 'a_label', with_attributes=True,
@@ -177,7 +181,7 @@ def test_expand_with_other_options(self):
     self.assertEqual(PubsubMessage, pcoll.element_type)
 
     # Apply the necessary PTransformOverrides.
-    overrides = _get_transform_overrides(p.options)
+    overrides = _get_transform_overrides(options)
     p.replace_all(overrides)
 
     # Note that the direct output of ReadFromPubSub will be replaced
@@ -193,15 +197,16 @@ def test_expand_with_other_options(self):
 @unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
 class TestWriteStringsToPubSubOverride(unittest.TestCase):
   def test_expand_deprecated(self):
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/baz')
              | WriteStringsToPubSub('projects/fakeprj/topics/a_topic')
              | beam.Map(lambda x: x))
 
     # Apply the necessary PTransformOverrides.
-    overrides = _get_transform_overrides(p.options)
+    overrides = _get_transform_overrides(options)
     p.replace_all(overrides)
 
     # Note that the direct output of ReadFromPubSub will be replaced
@@ -212,8 +217,9 @@ def test_expand_deprecated(self):
     self.assertEqual('a_topic', write_transform.dofn.short_topic_name)
 
   def test_expand(self):
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/baz')
              | WriteToPubSub('projects/fakeprj/topics/a_topic',
@@ -221,7 +227,7 @@ def test_expand(self):
              | beam.Map(lambda x: x))
 
     # Apply the necessary PTransformOverrides.
-    overrides = _get_transform_overrides(p.options)
+    overrides = _get_transform_overrides(options)
     p.replace_all(overrides)
 
     # Note that the direct output of ReadFromPubSub will be replaced
@@ -342,8 +348,9 @@ def test_read_messages_success(self, mock_pubsub):
                           [window.GlobalWindow()])]
     mock_pubsub.return_value.pull.return_value = pull_response
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/a_topic',
                               None, None, with_attributes=True))
@@ -362,8 +369,9 @@ def test_read_strings_success(self, mock_pubsub):
     expected_elements = [data]
     mock_pubsub.return_value.pull.return_value = pull_response
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadStringsFromPubSub('projects/fakeprj/topics/a_topic',
                                      None, None))
@@ -380,8 +388,9 @@ def test_read_data_success(self, mock_pubsub):
     expected_elements = [data_encoded]
     mock_pubsub.return_value.pull.return_value = pull_response
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
     assert_that(pcoll, equal_to(expected_elements))
@@ -407,8 +416,9 @@ def 
test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
     ]
     mock_pubsub.return_value.pull.return_value = pull_response
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub(
                  'projects/fakeprj/topics/a_topic', None, None,
@@ -436,8 +446,9 @@ def 
test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
     ]
     mock_pubsub.return_value.pull.return_value = pull_response
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub(
                  'projects/fakeprj/topics/a_topic', None, None,
@@ -466,8 +477,9 @@ def test_read_messages_timestamp_attribute_missing(self, 
mock_pubsub):
     ]
     mock_pubsub.return_value.pull.return_value = pull_response
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     pcoll = (p
              | ReadFromPubSub(
                  'projects/fakeprj/topics/a_topic', None, None,
@@ -489,8 +501,9 @@ def test_read_messages_timestamp_attribute_fail_parse(self, 
mock_pubsub):
     ])
     mock_pubsub.return_value.pull.return_value = pull_response
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p
          | ReadFromPubSub(
              'projects/fakeprj/topics/a_topic', None, None,
@@ -501,8 +514,9 @@ def test_read_messages_timestamp_attribute_fail_parse(self, 
mock_pubsub):
 
   def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
     # id_label is unsupported in DirectRunner.
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 
'a_label'))
     with self.assertRaisesRegexp(NotImplementedError,
                                  r'id_label is not supported'):
@@ -517,8 +531,9 @@ def test_write_messages_success(self, mock_pubsub):
     data = 'data'
     payloads = [data]
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p
          | Create(payloads)
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
@@ -531,8 +546,9 @@ def test_write_messages_deprecated(self, mock_pubsub):
     data = 'data'
     payloads = [data]
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p
          | Create(payloads)
          | WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
@@ -545,8 +561,9 @@ def test_write_messages_with_attributes_success(self, 
mock_pubsub):
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p
          | Create(payloads)
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
@@ -560,8 +577,9 @@ def test_write_messages_with_attributes_error(self, 
mock_pubsub):
     # Sending raw data when WriteToPubSub expects a PubsubMessage object.
     payloads = [data]
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p
          | Create(payloads)
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
@@ -575,8 +593,9 @@ def test_write_messages_unsupported_features(self, 
mock_pubsub):
     attributes = {'key': 'value'}
     payloads = [PubsubMessage(data, attributes)]
 
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p
          | Create(payloads)
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
@@ -584,8 +603,9 @@ def test_write_messages_unsupported_features(self, 
mock_pubsub):
     with self.assertRaisesRegexp(NotImplementedError,
                                  r'id_label is not supported'):
       p.run()
-    p = TestPipeline()
-    p.options.view_as(StandardOptions).streaming = True
+    options = PipelineOptions([])
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
     _ = (p
          | Create(payloads)
          | WriteToPubSub('projects/fakeprj/topics/a_topic',
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 10f4a02c065d..4618c2c6cbd9 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -415,7 +415,7 @@ def run(self, test_runner_api=True):
         pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))
       finally:
         shutil.rmtree(tmpdir)
-    return self.runner.run_pipeline(self)
+    return self.runner.run_pipeline(self, self._options)
 
   def __enter__(self):
     return self
@@ -512,7 +512,7 @@ def apply(self, transform, pvalueish=None, label=None):
     if type_options.pipeline_type_check:
       transform.type_check_inputs(pvalueish)
 
-    pvalueish_result = self.runner.apply(transform, pvalueish)
+    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
 
     if type_options is not None and type_options.pipeline_type_check:
       transform.type_check_outputs(pvalueish_result)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index a135251fa8e2..88b03d47a304 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -311,7 +311,7 @@ def visit_transform(self, transform_node):
 
     return FlattenInputVisitor()
 
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Import here to avoid adding the dependency for local running scenarios.
     try:
@@ -323,7 +323,7 @@ def run_pipeline(self, pipeline):
           'please install apache_beam[gcp]')
 
     # Convert all side inputs into a form acceptable to Dataflow.
-    if apiclient._use_fnapi(pipeline._options):
+    if apiclient._use_fnapi(options):
       pipeline.visit(self.side_input_visitor())
 
     # Performing configured PTransform overrides.  Note that this is currently
@@ -336,7 +336,7 @@ def run_pipeline(self, pipeline):
         return_context=True)
 
     # Add setup_options for all the BeamPlugin imports
-    setup_options = pipeline._options.view_as(SetupOptions)
+    setup_options = options.view_as(SetupOptions)
     plugins = BeamPlugin.get_all_plugin_paths()
     if setup_options.beam_plugins is not None:
       plugins = list(set(plugins + setup_options.beam_plugins))
@@ -344,15 +344,15 @@ def run_pipeline(self, pipeline):
 
     # Elevate "min_cpu_platform" to pipeline option, but using the existing
     # experiment.
-    debug_options = pipeline._options.view_as(DebugOptions)
-    worker_options = pipeline._options.view_as(WorkerOptions)
+    debug_options = options.view_as(DebugOptions)
+    worker_options = options.view_as(WorkerOptions)
     if worker_options.min_cpu_platform:
       experiments = ["min_cpu_platform=%s" % worker_options.min_cpu_platform]
       if debug_options.experiments is not None:
         experiments = list(set(experiments + debug_options.experiments))
       debug_options.experiments = experiments
 
-    self.job = apiclient.Job(pipeline._options, self.proto_pipeline)
+    self.job = apiclient.Job(options, self.proto_pipeline)
 
     # Dataflow runner requires a KV type for GBK inputs, hence we enforce that
     # here.
@@ -363,20 +363,19 @@ def run_pipeline(self, pipeline):
     pipeline.visit(self.flatten_input_visitor())
 
     # The superclass's run will trigger a traversal of all reachable nodes.
-    super(DataflowRunner, self).run_pipeline(pipeline)
+    super(DataflowRunner, self).run_pipeline(pipeline, options)
 
-    test_options = pipeline._options.view_as(TestOptions)
+    test_options = options.view_as(TestOptions)
     # If it is a dry run, return without submitting the job.
     if test_options.dry_run:
       return None
 
     # Get a Dataflow API client and set its options
-    self.dataflow_client = apiclient.DataflowApplicationClient(
-        pipeline._options)
+    self.dataflow_client = apiclient.DataflowApplicationClient(options)
 
     dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None)
     if dataflow_worker_jar is not None:
-      if not apiclient._use_fnapi(pipeline._options):
+      if not apiclient._use_fnapi(options):
         logging.fatal(
             'Typical end users should not use this worker jar feature. '
             'It can only be used when fnapi is enabled.')
@@ -509,9 +508,8 @@ def _add_singleton_step(
         self.serialize_windowing_strategy(windowing_strategy))
     return step
 
-  def run_Impulse(self, transform_node):
-    standard_options = (
-        
transform_node.outputs[None].pipeline._options.view_as(StandardOptions))
+  def run_Impulse(self, transform_node, options):
+    standard_options = options.view_as(StandardOptions)
     step = self._add_step(
         TransformNames.READ, transform_node.full_label, transform_node)
     if standard_options.streaming:
@@ -535,7 +533,7 @@ def run_Impulse(self, transform_node):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
-  def run_Flatten(self, transform_node):
+  def run_Flatten(self, transform_node, options):
     step = self._add_step(TransformNames.FLATTEN,
                           transform_node.full_label, transform_node)
     inputs = []
@@ -554,16 +552,16 @@ def run_Flatten(self, transform_node):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
-  def apply_WriteToBigQuery(self, transform, pcoll):
+  def apply_WriteToBigQuery(self, transform, pcoll, options):
     # Make sure this is the WriteToBigQuery class that we expected
     if not isinstance(transform, beam.io.WriteToBigQuery):
-      return self.apply_PTransform(transform, pcoll)
-    standard_options = pcoll.pipeline._options.view_as(StandardOptions)
+      return self.apply_PTransform(transform, pcoll, options)
+    standard_options = options.view_as(StandardOptions)
     if standard_options.streaming:
       if (transform.write_disposition ==
           beam.io.BigQueryDisposition.WRITE_TRUNCATE):
         raise RuntimeError('Can not use write truncation mode in streaming')
-      return self.apply_PTransform(transform, pcoll)
+      return self.apply_PTransform(transform, pcoll, options)
     else:
       return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
           beam.io.BigQuerySink(
@@ -574,7 +572,7 @@ def apply_WriteToBigQuery(self, transform, pcoll):
               transform.create_disposition,
               transform.write_disposition))
 
-  def apply_GroupByKey(self, transform, pcoll):
+  def apply_GroupByKey(self, transform, pcoll, options):
     # Infer coder of parent.
     #
     # TODO(ccy): make Coder inference and checking less specialized and more
@@ -594,7 +592,7 @@ def apply_GroupByKey(self, transform, pcoll):
 
     return pvalue.PCollection(pcoll.pipeline)
 
-  def run_GroupByKey(self, transform_node):
+  def run_GroupByKey(self, transform_node, options):
     input_tag = transform_node.inputs[0].tag
     input_step = self._cache.get_pvalue(transform_node.inputs[0])
     step = self._add_step(
@@ -617,7 +615,7 @@ def run_GroupByKey(self, transform_node):
         PropertyNames.SERIALIZED_FN,
         self.serialize_windowing_strategy(windowing))
 
-  def run_ParDo(self, transform_node):
+  def run_ParDo(self, transform_node, options):
     transform = transform_node.transform
     input_tag = transform_node.inputs[0].tag
     input_step = self._cache.get_pvalue(transform_node.inputs[0])
@@ -672,7 +670,7 @@ def run_ParDo(self, transform_node):
     from apache_beam.runners.dataflow.internal import apiclient
     transform_proto = self.proto_context.transforms.get_proto(transform_node)
     transform_id = self.proto_context.transforms.get_id(transform_node)
-    if (apiclient._use_fnapi(transform_node.inputs[0].pipeline._options)
+    if (apiclient._use_fnapi(options)
         and transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
       # Patch side input ids to be unique across a given pipeline.
       if (label_renames and
@@ -742,10 +740,10 @@ def _pardo_fn_data(transform_node, get_label):
     return (transform.fn, transform.args, transform.kwargs, si_tags_and_types,
             transform_node.inputs[0].windowing)
 
-  def apply_CombineValues(self, transform, pcoll):
+  def apply_CombineValues(self, transform, pcoll, options):
     return pvalue.PCollection(pcoll.pipeline)
 
-  def run_CombineValues(self, transform_node):
+  def run_CombineValues(self, transform_node, options):
     transform = transform_node.transform
     input_tag = transform_node.inputs[0].tag
     input_step = self._cache.get_pvalue(transform_node.inputs[0])
@@ -755,7 +753,7 @@ def run_CombineValues(self, transform_node):
     # The data transmitted in SERIALIZED_FN is different depending on whether
     # this is a fnapi pipeline or not.
     from apache_beam.runners.dataflow.internal import apiclient
-    if apiclient._use_fnapi(transform_node.inputs[0].pipeline._options):
+    if apiclient._use_fnapi(options):
       # Fnapi pipelines send the transform ID of the CombineValues transform's
       # parent composite because Dataflow expects the ID of a CombinePerKey
       # transform.
@@ -792,28 +790,30 @@ def run_CombineValues(self, transform_node):
          PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
     step.add_property(PropertyNames.OUTPUT_INFO, outputs)
 
-  def apply_Read(self, transform, pbegin):
+  def apply_Read(self, transform, pbegin, options):
     if hasattr(transform.source, 'format'):
       # Consider native Read to be a primitive for dataflow.
       return beam.pvalue.PCollection(pbegin.pipeline)
     else:
-      options = pbegin.pipeline.options.view_as(DebugOptions)
-      if options.experiments and 'beam_fn_api' in options.experiments:
+      debug_options = options.view_as(DebugOptions)
+      if (
+          debug_options.experiments and
+          'beam_fn_api' in debug_options.experiments
+      ):
         # Expand according to FnAPI primitives.
-        return self.apply_PTransform(transform, pbegin)
+        return self.apply_PTransform(transform, pbegin, options)
       else:
         # Custom Read is also a primitive for non-FnAPI on dataflow.
         return beam.pvalue.PCollection(pbegin.pipeline)
 
-  def run_Read(self, transform_node):
+  def run_Read(self, transform_node, options):
     transform = transform_node.transform
     step = self._add_step(
         TransformNames.READ, transform_node.full_label, transform_node)
     # TODO(mairbek): refactor if-else tree to use registerable functions.
     # Initialize the source specific properties.
 
-    standard_options = transform_node.inputs[0].pipeline.options.view_as(
-        StandardOptions)
+    standard_options = options.view_as(StandardOptions)
     if not hasattr(transform.source, 'format'):
       # If a format is not set, we assume the source to be a custom source.
       source_dict = {}
@@ -919,7 +919,7 @@ def run_Read(self, transform_node):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
-  def run__NativeWrite(self, transform_node):
+  def run__NativeWrite(self, transform_node, options):
     transform = transform_node.transform
     input_tag = transform_node.inputs[0].tag
     input_step = self._cache.get_pvalue(transform_node.inputs[0])
@@ -965,8 +965,7 @@ def run__NativeWrite(self, transform_node):
         step.add_property(
             PropertyNames.BIGQUERY_SCHEMA, transform.sink.schema_as_json())
     elif transform.sink.format == 'pubsub':
-      standard_options = (
-          transform_node.inputs[0].pipeline.options.view_as(StandardOptions))
+      standard_options = options.view_as(StandardOptions)
       if not standard_options.streaming:
         raise ValueError('Cloud Pub/Sub is currently available for use '
                          'only in streaming pipelines.')
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index b1a845a2784f..219f34b879e3 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -38,18 +38,19 @@
 
 
 class TestDataflowRunner(DataflowRunner):
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     """Execute test pipeline and verify test matcher"""
-    options = pipeline._options.view_as(TestOptions)
-    on_success_matcher = options.on_success_matcher
-    wait_duration = options.wait_until_finish_duration
+    test_options = options.view_as(TestOptions)
+    on_success_matcher = test_options.on_success_matcher
+    wait_duration = test_options.wait_until_finish_duration
     is_streaming = options.view_as(StandardOptions).streaming
 
     # [BEAM-1889] Do not send this to remote workers also, there is no need to
     # send this option to remote executors.
-    options.on_success_matcher = None
+    test_options.on_success_matcher = None
 
-    self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
+    self.result = super(TestDataflowRunner, self).run_pipeline(
+        pipeline, options)
     if self.result.has_job:
       # TODO(markflyhigh)(BEAM-1890): Use print since Nose dosen't show logs
       # in some cases.
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index d410992ab1d9..b882eb385ea7 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -68,11 +68,11 @@ class SwitchingDirectRunner(PipelineRunner):
   implemented in the FnApiRunner.
   """
 
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     use_fnapi_runner = True
 
     # Streaming mode is not yet supported on the FnApiRunner.
-    if pipeline._options.view_as(StandardOptions).streaming:
+    if options.view_as(StandardOptions).streaming:
       use_fnapi_runner = False
 
     from apache_beam.pipeline import PipelineVisitor
@@ -136,7 +136,7 @@ def visit_transform(self, applied_ptransform):
     else:
       runner = BundleBasedDirectRunner()
 
-    return runner.run_pipeline(pipeline)
+    return runner.run_pipeline(pipeline, options)
 
 
 # Type variables.
@@ -346,7 +346,7 @@ def get_replacement_transform(self, transform):
 class BundleBasedDirectRunner(PipelineRunner):
   """Executes a single pipeline on the local machine."""
 
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 
     # TODO: Move imports to top. Pipeline <-> Runner dependency cause problems
@@ -362,7 +362,7 @@ def run_pipeline(self, pipeline):
     from apache_beam.testing.test_stream import TestStream
 
     # Performing configured PTransform overrides.
-    pipeline.replace_all(_get_transform_overrides(pipeline.options))
+    pipeline.replace_all(_get_transform_overrides(options))
 
     # If the TestStream I/O is used, use a mock test clock.
     class _TestStreamUsageVisitor(PipelineVisitor):
@@ -387,8 +387,8 @@ def visit_transform(self, applied_ptransform):
     pipeline.visit(self.consumer_tracking_visitor)
 
     evaluation_context = EvaluationContext(
-        pipeline._options,
-        BundleFactory(stacked=pipeline._options.view_as(DirectOptions)
+        options,
+        BundleFactory(stacked=options.view_as(DirectOptions)
                       .direct_runner_use_stacked_bundle),
         self.consumer_tracking_visitor.root_transforms,
         self.consumer_tracking_visitor.value_to_consumers,
diff --git a/sdks/python/apache_beam/runners/direct/test_direct_runner.py 
b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
index 23dfeabc2abe..04dbe5093870 100644
--- a/sdks/python/apache_beam/runners/direct/test_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/test_direct_runner.py
@@ -30,17 +30,17 @@
 
 
 class TestDirectRunner(DirectRunner):
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     """Execute test pipeline and verify test matcher"""
-    options = pipeline._options.view_as(TestOptions)
-    on_success_matcher = options.on_success_matcher
+    test_options = options.view_as(TestOptions)
+    on_success_matcher = test_options.on_success_matcher
     is_streaming = options.view_as(StandardOptions).streaming
 
     # [BEAM-1889] Do not send this to remote workers also, there is no need to
     # send this option to remote executors.
-    options.on_success_matcher = None
+    test_options.on_success_matcher = None
 
-    self.result = super(TestDirectRunner, self).run_pipeline(pipeline)
+    self.result = super(TestDirectRunner, self).run_pipeline(pipeline, options)
 
     try:
       if not is_streaming:
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner.py 
b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
index 45bc246d4acb..391f3f0451f3 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_runner.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_runner.py
@@ -98,11 +98,11 @@ def end_session(self):
   def cleanup(self):
     self._cache_manager.cleanup()
 
-  def apply(self, transform, pvalueish):
+  def apply(self, transform, pvalueish, options):
     # TODO(qinyeli, BEAM-646): Remove runner interception of apply.
-    return self._underlying_runner.apply(transform, pvalueish)
+    return self._underlying_runner.apply(transform, pvalueish, options)
 
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     if not hasattr(self, '_desired_cache_labels'):
       self._desired_cache_labels = set()
 
@@ -111,7 +111,7 @@ def run_pipeline(self, pipeline):
     pipeline = beam.pipeline.Pipeline.from_runner_api(
         pipeline.to_runner_api(use_fake_coders=True),
         pipeline.runner,
-        pipeline._options)
+        options)
 
     # Snapshot the pipeline in a portable proto before mutating it.
     pipeline_proto, original_context = pipeline.to_runner_api(
@@ -121,7 +121,7 @@ def run_pipeline(self, pipeline):
     analyzer = pipeline_analyzer.PipelineAnalyzer(self._cache_manager,
                                                   pipeline_proto,
                                                   self._underlying_runner,
-                                                  pipeline._options,
+                                                  options,
                                                   self._desired_cache_labels)
     # Should be only accessed for debugging purpose.
     self._analyzer = analyzer
@@ -129,7 +129,7 @@ def run_pipeline(self, pipeline):
     pipeline_to_execute = beam.pipeline.Pipeline.from_runner_api(
         analyzer.pipeline_proto_to_execute(),
         self._underlying_runner,
-        pipeline._options)
+        options)
 
     display = display_manager.DisplayManager(
         pipeline_proto=pipeline_proto,
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index e8e1f9063a59..76155492fe57 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -234,7 +234,7 @@ def _next_uid(self):
     self._last_uid += 1
     return str(self._last_uid)
 
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     MetricsEnvironment.set_metrics_supported(False)
     RuntimeValueProvider.set_runtime_options({})
     # This is sometimes needed if type checking is disabled
@@ -242,10 +242,10 @@ def run_pipeline(self, pipeline):
     # are known to be KVs.
     from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
     pipeline.visit(DataflowRunner.group_by_key_input_visitor())
-    self._bundle_repeat = self._bundle_repeat or pipeline._options.view_as(
+    self._bundle_repeat = self._bundle_repeat or options.view_as(
         pipeline_options.DirectOptions).direct_runner_bundle_repeat
     self._profiler_factory = profiler.Profile.factory_from_options(
-        pipeline._options.view_as(pipeline_options.ProfilingOptions))
+        options.view_as(pipeline_options.ProfilingOptions))
     return self.run_via_runner_api(pipeline.to_runner_api())
 
   def run_via_runner_api(self, pipeline_proto):
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 0b539af4535a..025cb1066780 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -105,14 +105,14 @@ def _create_environment(options):
               env=(config.get('env') or '')
           ).SerializeToString())
 
-  def run_pipeline(self, pipeline):
-    portable_options = pipeline.options.view_as(PortableOptions)
+  def run_pipeline(self, pipeline, options):
+    portable_options = options.view_as(PortableOptions)
     job_endpoint = portable_options.job_endpoint
 
     # TODO: https://issues.apache.org/jira/browse/BEAM-5525
     # portable runner specific default
-    if pipeline.options.view_as(SetupOptions).sdk_location == 'default':
-      pipeline.options.view_as(SetupOptions).sdk_location = 'container'
+    if options.view_as(SetupOptions).sdk_location == 'default':
+      options.view_as(SetupOptions).sdk_location = 'container'
 
     if not job_endpoint:
       docker = DockerizedJobServer()
@@ -134,9 +134,9 @@ def run_pipeline(self, pipeline):
 
     # TODO: Define URNs for options.
     # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
-    options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
-               for k, v in pipeline._options.get_all_options().items()
-               if v is not None}
+    p_options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)
+                 for k, v in options.get_all_options().items()
+                 if v is not None}
 
     channel = grpc.insecure_channel(job_endpoint)
     grpc.channel_ready_future(channel).result()
@@ -153,7 +153,7 @@ def send_prepare_request(max_retries=5):
           return job_service.Prepare(
               beam_job_api_pb2.PrepareJobRequest(
                   job_name='job', pipeline=proto_pipeline,
-                  pipeline_options=job_utils.dict_to_struct(options)))
+                  pipeline_options=job_utils.dict_to_struct(p_options)))
         except grpc._channel._Rendezvous as e:
           num_retries += 1
           if num_retries > max_retries:
@@ -165,7 +165,7 @@ def send_prepare_request(max_retries=5):
           
grpc.insecure_channel(prepare_response.artifact_staging_endpoint.url),
           prepare_response.staging_session_token)
       retrieval_token, _ = stager.stage_job_resources(
-          pipeline._options,
+          options,
           staging_location='')
     else:
       retrieval_token = None
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index b0bafa55d68c..71845620d81d 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -151,7 +151,7 @@ def run_async(self, transform, options=None):
       transform(PBegin(p))
     return p.run()
 
-  def run_pipeline(self, pipeline):
+  def run_pipeline(self, pipeline, options):
     """Execute the entire pipeline or the sub-DAG reachable from a node.
 
     Runners should override this method.
@@ -168,14 +168,14 @@ def __init__(self, runner):
 
       def visit_transform(self, transform_node):
         try:
-          self.runner.run_transform(transform_node)
+          self.runner.run_transform(transform_node, options)
         except:
           logging.error('Error while visiting %s', transform_node.full_label)
           raise
 
     pipeline.visit(RunVisitor(self))
 
-  def apply(self, transform, input):
+  def apply(self, transform, input, options):
     """Runner callback for a pipeline.apply call.
 
     Args:
@@ -190,15 +190,15 @@ def apply(self, transform, input):
     for cls in transform.__class__.mro():
       m = getattr(self, 'apply_%s' % cls.__name__, None)
       if m:
-        return m(transform, input)
+        return m(transform, input, options)
     raise NotImplementedError(
         'Execution of [%s] not implemented in runner %s.' % (transform, self))
 
-  def apply_PTransform(self, transform, input):
+  def apply_PTransform(self, transform, input, options):
     # The base case of apply is to call the transform's expand.
     return transform.expand(input)
 
-  def run_transform(self, transform_node):
+  def run_transform(self, transform_node, options):
     """Runner callback for a pipeline.run call.
 
     Args:
@@ -211,7 +211,7 @@ def run_transform(self, transform_node):
     for cls in transform_node.transform.__class__.mro():
       m = getattr(self, 'run_%s' % cls.__name__, None)
       if m:
-        return m(transform_node)
+        return m(transform_node, options)
     raise NotImplementedError(
         'Execution of [%s] not implemented in runner %s.' % (
             transform_node.transform, self))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 172021)
    Time Spent: 7h 10m  (was: 7h)

> get rid of <pipeline>.options deprecation warnings in tests
> -----------------------------------------------------------
>
>                 Key: BEAM-5462
>                 URL: https://issues.apache.org/jira/browse/BEAM-5462
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Heejong Lee
>            Priority: Minor
>          Time Spent: 7h 10m
>  Remaining Estimate: 0h
>
> Messages look like:
> {{/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py:360:
>  DeprecationWarning: options is deprecated since First stable release. 
> References to <pipeline>.options will not be supported}}
> {{pipeline.replace_all(_get_transform_overrides(pipeline.options))}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to