mik-laj commented on pull request #10373:
URL: https://github.com/apache/airflow/pull/10373#issuecomment-676290518


   @Craig-Chatfield  I prepared a minor change to improve the support for 
virtual environments with system packages. 
   ```diff
   From 7ac7b7a0d68abd7f5b5e06d28b4850252f81c679 Mon Sep 17 00:00:00 2001
   From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= <[email protected]>
   Date: Wed, 19 Aug 2020 14:30:29 +0200
   Subject: Improve the support of virtual environments with system packages
   
   ---
    .../providers/google/cloud/hooks/dataflow.py  | 18 +++++++-
    .../google/cloud/operators/dataflow.py        |  4 +-
    .../google/cloud/hooks/test_dataflow.py       | 41 +++++++++++--------
    3 files changed, 41 insertions(+), 22 deletions(-)
   
   diff --git a/airflow/providers/google/cloud/hooks/dataflow.py 
b/airflow/providers/google/cloud/hooks/dataflow.py
   index 01996bc12..44dc87727 100644
   --- a/airflow/providers/google/cloud/hooks/dataflow.py
   +++ b/airflow/providers/google/cloud/hooks/dataflow.py
   @@ -24,6 +24,7 @@ import re
    import select
    import shlex
    import subprocess
   +import textwrap
    import time
    import uuid
    import warnings
   @@ -633,7 +634,7 @@ class DataflowHook(GoogleBaseHook):
            :param py_system_site_packages: Whether to include 
system_site_packages in your virtualenv.
                See virtualenv documentation for more information.
    
   -            This option is only relevant if the ``py_requirements`` 
parameter is passed.
   +            This option is only relevant if the ``py_requirements`` 
parameter is not None.
            :type py_interpreter: str
            :param append_job_name: True if unique suffix has to be appended to 
job name.
            :type append_job_name: bool
   @@ -652,7 +653,20 @@ class DataflowHook(GoogleBaseHook):
                return ['--labels={}={}'.format(key, value)
                        for key, value in labels_dict.items()]
    
   -        if py_requirements:
   +        if py_requirements is not None:
   +            if not py_requirements and not py_system_site_packages:
   +                warning_invalid_environment = textwrap.dedent(
   +                    """\
   +                    Invalid method invocation. You have disabled inclusion 
of system packages and empty list
   +                    required for installation, so it is not possible to 
create a valid virtual environment.
   +                    In the virtual environment, apache-beam package must be 
installed for your job to be \
   +                    executed. To fix this problem:
   +                    * install apache-beam on the system, then set parameter 
py_system_site_packages to True,
   +                    * add apache-beam to the list of required packages in 
parameter py_requirements.
   +                    """
   +                )
   +                raise AirflowException(warning_invalid_environment)
   +
                with TemporaryDirectory(prefix='dataflow-venv') as tmp_dir:
                    py_interpreter = prepare_virtualenv(
                        venv_directory=tmp_dir,
   diff --git a/airflow/providers/google/cloud/operators/dataflow.py 
b/airflow/providers/google/cloud/operators/dataflow.py
   index 85471e3c7..33adbacf4 100644
   --- a/airflow/providers/google/cloud/operators/dataflow.py
   +++ b/airflow/providers/google/cloud/operators/dataflow.py
   @@ -470,7 +470,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
        :param py_system_site_packages: Whether to include system_site_packages 
in your virtualenv.
            See virtualenv documentation for more information.
    
   -        This option is only relevant if the ``py_requirements`` parameter 
is passed.
   +        This option is only relevant if the ``py_requirements`` parameter 
is not None.
        :param gcp_conn_id: The connection ID to use connecting to Google Cloud 
Platform.
        :type gcp_conn_id: str
        :param project_id: Optional, the GCP project ID in which to start a job.
   @@ -517,7 +517,7 @@ class DataflowCreatePythonJobOperator(BaseOperator):
            self.options.setdefault('labels', {}).update(
                {'airflow-version': 'v' + version.replace('.', 
'-').replace('+', '-')})
            self.py_interpreter = py_interpreter
   -        self.py_requirements = py_requirements or []
   +        self.py_requirements = py_requirements
            self.py_system_site_packages = py_system_site_packages
            self.project_id = project_id
            self.location = location
   diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py 
b/tests/providers/google/cloud/hooks/test_dataflow.py
   index 48b2191c1..50b323ae4 100644
   --- a/tests/providers/google/cloud/hooks/test_dataflow.py
   +++ b/tests/providers/google/cloud/hooks/test_dataflow.py
   @@ -314,13 +314,25 @@ class TestDataflowHook(unittest.TestCase):
            self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
                                 sorted(expected_cmd))
    
   +    @parameterized.expand([
   +        (['foo-bar'], False),
   +        (['foo-bar'], True),
   +        ([], True),
   +    ])
        @mock.patch(DATAFLOW_STRING.format('prepare_virtualenv'))
        @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
        @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
        @mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
        @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
   -    def test_start_python_dataflow_with_non_empty_py_requirements(
   -        self, mock_conn, mock_dataflow, mock_dataflowjob, mock_uuid, 
mock_virtualenv
   +    def 
test_start_python_dataflow_with_non_empty_py_requirements_and_without_system_packages(
   +        self,
   +        current_py_requirements,
   +        current_py_system_site_packages,
   +        mock_conn,
   +        mock_dataflow,
   +        mock_dataflowjob,
   +        mock_uuid,
   +        mock_virtualenv,
        ):
            mock_uuid.return_value = MOCK_UUID
            mock_conn.return_value = None
   @@ -332,7 +344,8 @@ class TestDataflowHook(unittest.TestCase):
            self.dataflow_hook.start_python_dataflow(  # pylint: 
disable=no-value-for-parameter
                job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
                dataflow=PY_FILE, py_options=PY_OPTIONS,
   -            py_requirements=['foo-bar']
   +            py_requirements=current_py_requirements,
   +            py_system_site_packages=current_py_system_site_packages
            )
            expected_cmd = ['/dummy_dir/bin/python', '-m', PY_FILE,
                            '--region=us-central1',
   @@ -347,7 +360,7 @@ class TestDataflowHook(unittest.TestCase):
        @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
        @mock.patch(DATAFLOW_STRING.format('_DataflowRunner'))
        @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn'))
   -    def test_start_python_dataflow_with_empty_py_requirements(
   +    def 
test_start_python_dataflow_with_empty_py_requirements_and_without_system_packages(
            self, mock_conn, mock_dataflow, mock_dataflowjob, mock_uuid
        ):
            mock_uuid.return_value = MOCK_UUID
   @@ -356,20 +369,12 @@ class TestDataflowHook(unittest.TestCase):
            dataflow_instance.wait_for_done.return_value = None
            dataflowjob_instance = mock_dataflowjob.return_value
            dataflowjob_instance.wait_for_done.return_value = None
   -        self.dataflow_hook.start_python_dataflow(  # pylint: 
disable=no-value-for-parameter
   -            job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
   -            dataflow=PY_FILE, py_options=PY_OPTIONS,
   -            py_requirements=[]
   -        )
   -        expected_cmd = ['python3', '-m', PY_FILE,
   -                        '--region=us-central1',
   -                        '--runner=DataflowRunner',
   -                        '--project=test',
   -                        '--labels=foo=bar',
   -                        '--staging_location=gs://test/staging',
   -                        '--job_name={}-{}'.format(JOB_NAME, MOCK_UUID)]
   -        self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]),
   -                             sorted(expected_cmd))
   +        with self.assertRaisesRegex(AirflowException, "Invalid method 
invocation."):
   +            self.dataflow_hook.start_python_dataflow(  # pylint: 
disable=no-value-for-parameter
   +                job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY,
   +                dataflow=PY_FILE, py_options=PY_OPTIONS,
   +                py_requirements=[]
   +            )
    
        @mock.patch(DATAFLOW_STRING.format('uuid.uuid4'))
        @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController'))
   -- 
   2.28.0
   
   ```
   You can add this change with the command below.
   ```
   curl https://termbin.com/75yd | git am
   ```
   What do you think about it?
   
   PS. Today I got the same submission from one Google engineer so would like 
to resolve this ticket quickly.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to