This is an automated email from the ASF dual-hosted git repository. kamilbregula pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 4a1503b Add recipe for BeamRunGoPipelineOperator (#22296) 4a1503b is described below commit 4a1503b39b0aaf50940c29ac886c6eeda35a79ff Author: pierrejeambrun <pierrejb...@gmail.com> AuthorDate: Thu Mar 17 04:57:22 2022 +0100 Add recipe for BeamRunGoPipelineOperator (#22296) --- airflow/providers/apache/beam/hooks/beam.py | 10 +++++- .../docker-images-recipes/go-beam.Dockerfile | 37 ++++++++++++++++++++++ docs/docker-stack/recipes.rst | 20 ++++++++++++ tests/providers/apache/beam/hooks/test_beam.py | 21 +++++++++++- 4 files changed, 86 insertions(+), 2 deletions(-) diff --git a/airflow/providers/apache/beam/hooks/beam.py b/airflow/providers/apache/beam/hooks/beam.py index 9be1a75..0644e02 100644 --- a/airflow/providers/apache/beam/hooks/beam.py +++ b/airflow/providers/apache/beam/hooks/beam.py @@ -20,12 +20,13 @@ import json import os import select import shlex +import shutil import subprocess import textwrap from tempfile import TemporaryDirectory from typing import Callable, List, Optional -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowConfigException, AirflowException from airflow.hooks.base import BaseHook from airflow.providers.google.go_module_utils import init_module, install_dependencies from airflow.utils.log.logging_mixin import LoggingMixin @@ -307,6 +308,13 @@ class BeamHook(BaseHook): source with GCSHook. :return: """ + if shutil.which("go") is None: + raise AirflowConfigException( + "You need to have Go installed to run beam go pipeline. See https://go.dev/doc/install " + "installation guide. If you are running airflow in Docker see more info at " + "'https://airflow.apache.org/docs/docker-stack/recipes.html'." + ) + if "labels" in variables: variables["labels"] = json.dumps(variables["labels"], separators=(",", ":")) diff --git a/docs/docker-stack/docker-images-recipes/go-beam.Dockerfile b/docs/docker-stack/docker-images-recipes/go-beam.Dockerfile new file mode 100644 index 0000000..b224fe1 --- /dev/null +++ b/docs/docker-stack/docker-images-recipes/go-beam.Dockerfile @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +ARG BASE_AIRFLOW_IMAGE +FROM ${BASE_AIRFLOW_IMAGE} + +SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"] + +USER 0 + +ARG GO_VERSION=1.16.4 +ENV GO_INSTALL_DIR=/usr/local/go + +# Install Go +RUN if [[ "$(uname -a)" = *"x86_64"* ]] ; then export ARCH=amd64 ; else export ARCH=arm64 ; fi \ + && DOWNLOAD_URL="https://dl.google.com/go/go${GO_VERSION}.linux-${ARCH}.tar.gz" \ + && TMP_DIR="$(mktemp -d)" \ + && curl -fL "${DOWNLOAD_URL}" --output "${TMP_DIR}/go.linux-${ARCH}.tar.gz" \ + && mkdir -p "${GO_INSTALL_DIR}" \ + && tar xzf "${TMP_DIR}/go.linux-${ARCH}.tar.gz" -C "${GO_INSTALL_DIR}" --strip-components=1 \ + && rm -rf "${TMP_DIR}" + +ENV GOROOT=/usr/local/go +ENV PATH="$GOROOT/bin:$PATH" + +USER ${AIRFLOW_UID} diff --git a/docs/docker-stack/recipes.rst b/docs/docker-stack/recipes.rst index a1c5777..1d258ab 100644 --- a/docs/docker-stack/recipes.rst +++ b/docs/docker-stack/recipes.rst @@ -70,3 +70,23 @@ Then build a new image. --pull \ --build-arg BASE_AIRFLOW_IMAGE="apache/airflow:2.0.2" \ --tag my-airflow-image:0.0.1 + +Apache Beam Go Stack installation +--------------------------------- + +To be able to run Beam Go Pipeline with the :class:`~airflow.providers.apache.beam.operators.beam.BeamRunGoPipelineOperator`, +you will need Go in your container. Install airflow with ``apache-airflow-providers-google>=6.5.0`` and ``apache-airflow-providers-apache-beam>=3.2.0`` + +Create a new Dockerfile like the one shown below. + +.. exampleinclude:: /docker-images-recipes/go-beam.Dockerfile + :language: dockerfile + +Then build a new image. + +.. code-block:: bash + + docker build . \ + --pull \ + --build-arg BASE_AIRFLOW_IMAGE="apache/airflow:2.0.2" \ + --tag my-airflow-image:0.0.1 diff --git a/tests/providers/apache/beam/hooks/test_beam.py b/tests/providers/apache/beam/hooks/test_beam.py index 859cf3b..a69560f 100644 --- a/tests/providers/apache/beam/hooks/test_beam.py +++ b/tests/providers/apache/beam/hooks/test_beam.py @@ -233,8 +233,10 @@ class TestBeamHook(unittest.TestCase): ) wait_for_done.assert_called_once_with() + @mock.patch(BEAM_STRING.format('shutil.which')) @mock.patch(BEAM_STRING.format('BeamCommandRunner')) - def test_start_go_pipeline(self, mock_runner): + def test_start_go_pipeline(self, mock_runner, mock_which): + mock_which.return_value = "/some_path/to/go" hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock() @@ -260,6 +262,23 @@ class TestBeamHook(unittest.TestCase): ) wait_for_done.assert_called_once_with() + @mock.patch(BEAM_STRING.format('shutil.which')) + def test_start_go_pipeline_without_go_installed_raises(self, mock_which): + mock_which.return_value = None + hook = BeamHook(runner=DEFAULT_RUNNER) + + with self.assertRaises(AirflowException) as ex_ctx: + hook.start_go_pipeline( + go_file=GO_FILE, + variables=copy.deepcopy(BEAM_VARIABLES_GO), + ) + + assert ( + "You need to have Go installed to run beam go pipeline. See https://go.dev/doc/install " + "installation guide. If you are running airflow in Docker see more info at " + "'https://airflow.apache.org/docs/docker-stack/recipes.html'." == str(ex_ctx.exception) + ) + class TestBeamRunner(unittest.TestCase): @mock.patch('airflow.providers.apache.beam.hooks.beam.BeamCommandRunner.log')