This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch release-2.26.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.26.0 by this push:
     new 6f9422f  [BEAM-11033] Identify Dataflow metrics for portable job path 
based on step name. (#13298) (#13299)
6f9422f is described below

commit 6f9422f4c9a8486c58cf6c1d09b6289bc0123827
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Thu Nov 12 07:30:08 2020 -0800

    [BEAM-11033] Identify Dataflow metrics for portable job path based on step 
name. (#13298) (#13299)
---
 .../python/apache_beam/runners/dataflow/dataflow_metrics.py | 13 ++++---------
 .../apache_beam/runners/dataflow/internal/apiclient.py      |  8 --------
 2 files changed, 4 insertions(+), 17 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
index 9bb1170..c292e24 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
@@ -101,18 +101,13 @@ class DataflowMetrics(MetricResults):
           'Could not translate the internal step name %r since job graph is '
           'not available.' % internal_name)
     user_step_name = None
-    # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.runners.dataflow.internal import apiclient
-    if apiclient._use_unified_worker_portable_job(self._job_graph.options):
+    if (self._job_graph and internal_name in
+        self._job_graph.proto_pipeline.components.transforms.keys()):
       # Dataflow Runner v2 with portable job submission uses proto transform 
map
       # IDs for step names. Also PTransform.unique_name maps to user step 
names.
       # Hence we lookup user step names based on the proto.
-      proto_pipeline = self._job_graph.proto_pipeline
-      for transform_id in proto_pipeline.components.transforms.keys():
-        if internal_name == transform_id:
-          user_step_name = proto_pipeline.components.transforms[
-              transform_id].unique_name
-          break
+      user_step_name = self._job_graph.proto_pipeline.components.transforms[
+          internal_name].unique_name
     else:
       try:
         step = _get_match(
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py 
b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 1dadcb5..b9de09b 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -1048,14 +1048,6 @@ def _use_unified_worker(pipeline_options):
   return debug_options.lookup_experiment(use_unified_worker_flag)
 
 
-def _use_unified_worker_portable_job(pipeline_options):
-  portable_job_flag = 'use_portable_job_submission'
-  debug_options = pipeline_options.view_as(DebugOptions)
-  return (
-      _use_unified_worker(pipeline_options) and
-      debug_options.lookup_experiment(portable_job_flag))
-
-
 def _get_container_image_tag():
   base_version = pkg_resources.parse_version(
       beam_version.__version__).base_version

Reply via email to