mik-laj commented on pull request #10373: URL: https://github.com/apache/airflow/pull/10373#issuecomment-676290518
@Craig-Chatfield I prepared a minor change to improve the support for virtual environments with system packages. ```diff From 7ac7b7a0d68abd7f5b5e06d28b4850252f81c679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20Bregu=C5=82a?= <[email protected]> Date: Wed, 19 Aug 2020 14:30:29 +0200 Subject: Improve the support of virtual environments with system packages --- .../providers/google/cloud/hooks/dataflow.py | 18 +++++++- .../google/cloud/operators/dataflow.py | 4 +- .../google/cloud/hooks/test_dataflow.py | 41 +++++++++++-------- 3 files changed, 41 insertions(+), 22 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py index 01996bc12..44dc87727 100644 --- a/airflow/providers/google/cloud/hooks/dataflow.py +++ b/airflow/providers/google/cloud/hooks/dataflow.py @@ -24,6 +24,7 @@ import re import select import shlex import subprocess +import textwrap import time import uuid import warnings @@ -633,7 +634,7 @@ class DataflowHook(GoogleBaseHook): :param py_system_site_packages: Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information. - This option is only relevant if the ``py_requirements`` parameter is passed. + This option is only relevant if the ``py_requirements`` parameter is not None. :type py_interpreter: str :param append_job_name: True if unique suffix has to be appended to job name. :type append_job_name: bool @@ -652,7 +653,20 @@ class DataflowHook(GoogleBaseHook): return ['--labels={}={}'.format(key, value) for key, value in labels_dict.items()] - if py_requirements: + if py_requirements is not None: + if not py_requirements and not py_system_site_packages: + warning_invalid_environment = textwrap.dedent( + """\ + Invalid method invocation. You have disabled inclusion of system packages and empty list + required for installation, so it is not possible to create a valid virtual environment. + In the virtual environment, apache-beam package must be installed for your job to be \ + executed. To fix this problem: + * install apache-beam on the system, then set parameter py_system_site_packages to True, + * add apache-beam to the list of required packages in parameter py_requirements. + """ + ) + raise AirflowException(warning_invalid_environment) + with TemporaryDirectory(prefix='dataflow-venv') as tmp_dir: py_interpreter = prepare_virtualenv( venv_directory=tmp_dir, diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py index 85471e3c7..33adbacf4 100644 --- a/airflow/providers/google/cloud/operators/dataflow.py +++ b/airflow/providers/google/cloud/operators/dataflow.py @@ -470,7 +470,7 @@ class DataflowCreatePythonJobOperator(BaseOperator): :param py_system_site_packages: Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information. - This option is only relevant if the ``py_requirements`` parameter is passed. + This option is only relevant if the ``py_requirements`` parameter is not None. :param gcp_conn_id: The connection ID to use connecting to Google Cloud Platform. :type gcp_conn_id: str :param project_id: Optional, the GCP project ID in which to start a job. @@ -517,7 +517,7 @@ class DataflowCreatePythonJobOperator(BaseOperator): self.options.setdefault('labels', {}).update( {'airflow-version': 'v' + version.replace('.', '-').replace('+', '-')}) self.py_interpreter = py_interpreter - self.py_requirements = py_requirements or [] + self.py_requirements = py_requirements self.py_system_site_packages = py_system_site_packages self.project_id = project_id self.location = location diff --git a/tests/providers/google/cloud/hooks/test_dataflow.py b/tests/providers/google/cloud/hooks/test_dataflow.py index 48b2191c1..50b323ae4 100644 --- a/tests/providers/google/cloud/hooks/test_dataflow.py +++ b/tests/providers/google/cloud/hooks/test_dataflow.py @@ -314,13 +314,25 @@ class TestDataflowHook(unittest.TestCase): self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]), sorted(expected_cmd)) + @parameterized.expand([ + (['foo-bar'], False), + (['foo-bar'], True), + ([], True), + ]) @mock.patch(DATAFLOW_STRING.format('prepare_virtualenv')) @mock.patch(DATAFLOW_STRING.format('uuid.uuid4')) @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController')) @mock.patch(DATAFLOW_STRING.format('_DataflowRunner')) @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn')) - def test_start_python_dataflow_with_non_empty_py_requirements( - self, mock_conn, mock_dataflow, mock_dataflowjob, mock_uuid, mock_virtualenv + def test_start_python_dataflow_with_non_empty_py_requirements_and_without_system_packages( + self, + current_py_requirements, + current_py_system_site_packages, + mock_conn, + mock_dataflow, + mock_dataflowjob, + mock_uuid, + mock_virtualenv, ): mock_uuid.return_value = MOCK_UUID mock_conn.return_value = None @@ -332,7 +344,8 @@ class TestDataflowHook(unittest.TestCase): self.dataflow_hook.start_python_dataflow( # pylint: disable=no-value-for-parameter job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY, dataflow=PY_FILE, py_options=PY_OPTIONS, - py_requirements=['foo-bar'] + py_requirements=current_py_requirements, + py_system_site_packages=current_py_system_site_packages ) expected_cmd = ['/dummy_dir/bin/python', '-m', PY_FILE, '--region=us-central1', @@ -347,7 +360,7 @@ class TestDataflowHook(unittest.TestCase): @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController')) @mock.patch(DATAFLOW_STRING.format('_DataflowRunner')) @mock.patch(DATAFLOW_STRING.format('DataflowHook.get_conn')) - def test_start_python_dataflow_with_empty_py_requirements( + def test_start_python_dataflow_with_empty_py_requirements_and_without_system_packages( self, mock_conn, mock_dataflow, mock_dataflowjob, mock_uuid ): mock_uuid.return_value = MOCK_UUID @@ -356,20 +369,12 @@ class TestDataflowHook(unittest.TestCase): dataflow_instance.wait_for_done.return_value = None dataflowjob_instance = mock_dataflowjob.return_value dataflowjob_instance.wait_for_done.return_value = None - self.dataflow_hook.start_python_dataflow( # pylint: disable=no-value-for-parameter - job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY, - dataflow=PY_FILE, py_options=PY_OPTIONS, - py_requirements=[] - ) - expected_cmd = ['python3', '-m', PY_FILE, - '--region=us-central1', - '--runner=DataflowRunner', - '--project=test', - '--labels=foo=bar', - '--staging_location=gs://test/staging', - '--job_name={}-{}'.format(JOB_NAME, MOCK_UUID)] - self.assertListEqual(sorted(mock_dataflow.call_args[1]["cmd"]), - sorted(expected_cmd)) + with self.assertRaisesRegex(AirflowException, "Invalid method invocation."): + self.dataflow_hook.start_python_dataflow( # pylint: disable=no-value-for-parameter + job_name=JOB_NAME, variables=DATAFLOW_VARIABLES_PY, + dataflow=PY_FILE, py_options=PY_OPTIONS, + py_requirements=[] + ) @mock.patch(DATAFLOW_STRING.format('uuid.uuid4')) @mock.patch(DATAFLOW_STRING.format('_DataflowJobsController')) -- 2.28.0 ``` You can add this change with the command below. ``` curl https://termbin.com/75yd | git am ``` What do you think about it? PS. Today I got the same submission from one Google engineer so would like to resolve this ticket quickly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
