pankajastro commented on code in PR #31471:
URL: https://github.com/apache/airflow/pull/31471#discussion_r1260780302


##########
airflow/providers/apache/beam/hooks/beam.py:
##########
@@ -381,3 +383,185 @@ def start_go_pipeline_with_binary(
             command_prefix=command_prefix,
             process_line_callback=process_line_callback,
         )
+
+
+class BeamAsyncHook(BeamHook):
+    """
+    Asynchronous hook for Apache Beam.
+    :param runner: Runner type.
+    """
+
+    def __init__(
+        self,
+        runner: str,
+    ) -> None:
+        self.runner = runner
+        super().__init__(runner=self.runner)
+
+    @staticmethod
+    async def _create_tmp_dir(prefix: str) -> str:
+        """Helper method to create temporary directory."""
+        # Creating separate thread to create temporary directory
+        loop = asyncio.get_running_loop()
+        partial_func = functools.partial(tempfile.mkdtemp, prefix=prefix)
+        tmp_dir = await loop.run_in_executor(None, partial_func)
+        return tmp_dir
+
+    @staticmethod
+    async def _cleanup_tmp_dir(tmp_dir: str) -> None:
+        """
+        Helper method to delete temporary directory after finishing work with 
it.
+        Is uses `rmtree` method to recursively remove the temporary directory.
+        """
+        shutil.rmtree(tmp_dir)
+
+    async def start_python_pipeline_async(
+        self,
+        variables: dict,
+        py_file: str,
+        py_options: list[str] | None = None,
+        py_interpreter: str = "python3",
+        py_requirements: list[str] | None = None,
+        py_system_site_packages: bool = False,
+    ):
+        """
+        Starts Apache Beam python pipeline.
+
+        :param variables: Variables passed to the pipeline.
+        :param py_file: Path to the python file to execute.
+        :param py_options: Additional options.
+        :param py_interpreter: Python version of the Apache Beam pipeline.
+            If None, this defaults to the python3.
+            To track python versions supported by beam and related
+            issues check: https://issues.apache.org/jira/browse/BEAM-1251
+        :param py_requirements: Additional python package(s) to install.
+            If a value is passed to this parameter, a new virtual environment 
has been created with
+            additional packages installed.
+            You could also install the apache-beam package if it is not 
installed on your system, or you want
+            to use a different version.
+        :param py_system_site_packages: Whether to include 
system_site_packages in your virtualenv.
+            See virtualenv documentation for more information.
+            This option is only relevant if the ``py_requirements`` parameter 
is not None.
+        """
+        if "labels" in variables:
+            variables["labels"] = [f"{key}={value}" for key, value in 
variables["labels"].items()]
+
+        # Creating temporary directory
+        tmp_dir = await self._create_tmp_dir(prefix="apache-beam-venv")
+
+        async with contextlib.AsyncExitStack() as exit_stack:
+            if py_requirements is not None:
+                if not py_requirements and not py_system_site_packages:
+                    warning_invalid_environment = textwrap.dedent(
+                        """\
+                        Invalid method invocation. You have disabled inclusion 
of system packages and empty
+                        list required for installation, so it is not possible 
to create a valid virtual
+                        environment. In the virtual environment, apache-beam 
package must be installed for
+                        your job to be executed.
+
+                        To fix this problem:
+                        * install apache-beam on the system, then set 
parameter py_system_site_packages
+                          to True,
+                        * add apache-beam to the list of required packages in 
parameter py_requirements.
+                        """
+                    )
+                    raise AirflowException(warning_invalid_environment)
+
+                # Pushing asynchronous callback to ensure the cleanup of the 
temporary
+                # directory when the asynchronous context is exited
+                exit_stack.push_async_callback(self._cleanup_tmp_dir, tmp_dir)
+
+                py_interpreter = prepare_virtualenv(
+                    venv_directory=tmp_dir,
+                    python_bin=py_interpreter,
+                    system_site_packages=py_system_site_packages,
+                    requirements=py_requirements,
+                )
+            command_prefix: list[str] = [py_interpreter] + (py_options or []) 
+ [py_file]
+            beam_version = (
+                subprocess.check_output(
+                    [py_interpreter, "-c", "import apache_beam; 
print(apache_beam.__version__)"]
+                )
+                .decode()
+                .strip()
+            )
+            self.log.info("Beam version: %s", beam_version)
+            impersonate_service_account = 
variables.get("impersonate_service_account")
+            if impersonate_service_account:
+                if Version(beam_version) < Version("2.39.0") or True:
+                    raise AirflowException(
+                        "The impersonateServiceAccount option requires Apache 
Beam 2.39.0 or newer."
+                    )
+            return_code = await self.start_pipeline_async(
+                variables=variables,
+                command_prefix=command_prefix,
+            )
+            return return_code
+
+    async def start_pipeline_async(
+        self,
+        variables: dict,
+        command_prefix: list[str],
+        working_directory: str | None = None,
+    ) -> int:
+        cmd = command_prefix + [
+            f"--runner={self.runner}",
+        ]
+        if variables:
+            cmd.extend(beam_options_to_args(variables))
+        return await self.run_beam_command_async(
+            cmd=cmd,
+            working_directory=working_directory,
+            log=self.log,
+        )
+
+    async def run_beam_command_async(
+        self,
+        cmd: list[str],
+        log: logging.Logger,
+        working_directory: str | None = None,
+    ) -> int:
+        """
+        Function responsible for running pipeline command in subprocess.
+
+        :param cmd: Parts of the command to be run in subprocess
+        :param working_directory: Working directory
+        :param log: logger.

Review Comment:
   nit
   ```suggestion
           :param log: logger.
           :param working_directory: Working directory
   ```



##########
airflow/providers/apache/beam/hooks/beam.py:
##########
@@ -381,3 +383,185 @@ def start_go_pipeline_with_binary(
             command_prefix=command_prefix,
             process_line_callback=process_line_callback,
         )
+
+
+class BeamAsyncHook(BeamHook):
+    """
+    Asynchronous hook for Apache Beam.
+    :param runner: Runner type.

Review Comment:
   nit
   ```suggestion
   
       :param runner: Runner type.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to