pierrejeambrun commented on a change in pull request #20386:
URL: https://github.com/apache/airflow/pull/20386#discussion_r777552948



##########
File path: airflow/providers/apache/beam/operators/beam.py
##########
@@ -185,56 +289,30 @@ def __init__(
         dataflow_config: Optional[Union[DataflowConfiguration, dict]] = None,
         **kwargs,
     ) -> None:
-        super().__init__(**kwargs)
+        super().__init__(
+            runner=runner,
+            default_pipeline_options=default_pipeline_options,
+            pipeline_options=pipeline_options,
+            gcp_conn_id=gcp_conn_id,
+            delegate_to=delegate_to,
+            dataflow_config=dataflow_config,
+            **kwargs,
+        )
 
         self.py_file = py_file
-        self.runner = runner
         self.py_options = py_options or []
-        self.default_pipeline_options = default_pipeline_options or {}
-        self.pipeline_options = pipeline_options or {}
-        self.pipeline_options.setdefault("labels", {}).update(
-            {"airflow-version": "v" + version.replace(".", "-").replace("+", 
"-")}
-        )
         self.py_interpreter = py_interpreter
         self.py_requirements = py_requirements
         self.py_system_site_packages = py_system_site_packages
-        self.gcp_conn_id = gcp_conn_id
-        self.delegate_to = delegate_to
-        self.beam_hook: Optional[BeamHook] = None
-        self.dataflow_hook: Optional[DataflowHook] = None
-        self.dataflow_job_id: Optional[str] = None
-
-        if dataflow_config is None:
-            self.dataflow_config = DataflowConfiguration()
-        elif isinstance(dataflow_config, dict):
-            self.dataflow_config = DataflowConfiguration(**dataflow_config)
-        else:
-            self.dataflow_config = dataflow_config
-
-        if self.dataflow_config and self.runner.lower() != 
BeamRunnerType.DataflowRunner.lower():
-            self.log.warning(
-                "dataflow_config is defined but runner is different than 
DataflowRunner (%s)", self.runner
-            )
 
     def execute(self, context):
         """Execute the Apache Beam Pipeline."""
-        self.beam_hook = BeamHook(runner=self.runner)
-        pipeline_options = self.default_pipeline_options.copy()
-        process_line_callback: Optional[Callable] = None
-        is_dataflow = self.runner.lower() == 
BeamRunnerType.DataflowRunner.lower()
-        dataflow_job_name: Optional[str] = None
-
-        if is_dataflow:
-            dataflow_job_name, pipeline_options, process_line_callback = 
self._set_dataflow(
-                pipeline_options=pipeline_options, 
job_name_variable_key="job_name"
-            )
-
-        pipeline_options.update(self.pipeline_options)
-
-        # Convert argument names from lowerCamelCase to snake case.
-        formatted_pipeline_options = {
-            convert_camel_to_snake(key): pipeline_options[key] for key in 
pipeline_options
-        }
+        (
+            is_dataflow,
+            dataflow_job_name,
+            formatted_pipeline_options,
+            process_line_callback,
+        ) = self._init_pipeline_options(format_pipeline_options=True, 
job_name_variable_key="job_name")

Review comment:
       This is an existing logic that allows us to set an alternate key in the 
pipeline options to set the job name (only for dataflow). This will set the key 
to our `job_name_variable_key` and the value to the actual `job_name`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to