robertwb commented on code in PR #31701:
URL: https://github.com/apache/beam/pull/31701#discussion_r1672788011
##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 36,
+ "metadata": {
+ "id": "7DSE6TgWy7PP"
+ },
+ "outputs": [],
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Install the Apache Beam library\n",
+ "\n",
+ "!pip install apache_beam[gcp] --quiet"
+ ],
+ "metadata": {
+ "id": "5W2nuV7uzlPg"
+ },
+ "execution_count": 37,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#The following packages are used to run the example pipelines\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.io import ReadFromText, WriteToText\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "\n",
+ "class CustomClass(beam.DoFn):\n",
+ " def custom_function(x):\n",
+ " ...\n",
+ " # returned_record =
requests.get(\"http://my-api-call.com\")\n",
+ " ...\n",
+ " # if len(returned_record)!=10:\n",
+ " # raise ValueError(\"Length of record does not match
expected length\")\n",
+ " return x\n",
+ "\n",
+ " with beam.Pipeline() as p:\n",
+ " result = (\n",
+ " p\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "Ktk9EVIFzGfP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 1**\n"
+ ],
+ "metadata": {
+ "id": "IVjBkewt1sLA"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# This function is going to return the square the integer at the
first index of our record.\n",
+ "def compute_square(element):\n",
+ " return int(element[1])**2\n",
+ "\n",
+ "with beam.Pipeline() as p1:\n",
+ " result = (\n",
+ " p1\n",
+ " |
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+ " | beam.Map(compute_square)\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "oHbSvOUI1pOe"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 2**"
+ ],
+ "metadata": {
+ "id": "Mh3nZZ1_12sX"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "with beam.Pipeline() as p2:\n",
+ " result = (\n",
+ " p2\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.Map(str.strip)\n",
+ " | WriteToText(\"/content/sample_data/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "hmO1Chl51vPG"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Unit Tests for Pipelines**"
+ ],
+ "metadata": {
+ "id": "uoNJLQl_15gj"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# The following packages are imported for unit testing.\n",
+ "import unittest\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.testing.test_pipeline import TestPipeline\n",
+ "from apache_beam.testing.util import assert_that, equal_to\n",
+ "try:\n",
+ " from apitools.base.py.exceptions import HttpError\n",
+ "except ImportError:\n",
+ " HttpError = None\n",
+ "\n",
+ "\n",
+ "@unittest.skipIf(HttpError is None, 'GCP dependencies are not
installed')\n",
Review Comment:
It would seems one of the main points of unit testing is to not have
heavyweight dependencies. I wouldn't say skipping like this is best practices
unless absolutely necessary.
##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 36,
+ "metadata": {
+ "id": "7DSE6TgWy7PP"
+ },
+ "outputs": [],
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Install the Apache Beam library\n",
+ "\n",
+ "!pip install apache_beam[gcp] --quiet"
+ ],
+ "metadata": {
+ "id": "5W2nuV7uzlPg"
+ },
+ "execution_count": 37,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#The following packages are used to run the example pipelines\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.io import ReadFromText, WriteToText\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "\n",
+ "class CustomClass(beam.DoFn):\n",
+ " def custom_function(x):\n",
+ " ...\n",
+ " # returned_record =
requests.get(\"http://my-api-call.com\")\n",
+ " ...\n",
+ " # if len(returned_record)!=10:\n",
+ " # raise ValueError(\"Length of record does not match
expected length\")\n",
+ " return x\n",
+ "\n",
+ " with beam.Pipeline() as p:\n",
+ " result = (\n",
+ " p\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "Ktk9EVIFzGfP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 1**\n"
+ ],
+ "metadata": {
+ "id": "IVjBkewt1sLA"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# This function is going to return the square the integer at the
first index of our record.\n",
+ "def compute_square(element):\n",
+ " return int(element[1])**2\n",
+ "\n",
+ "with beam.Pipeline() as p1:\n",
+ " result = (\n",
+ " p1\n",
+ " |
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+ " | beam.Map(compute_square)\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "oHbSvOUI1pOe"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 2**"
+ ],
+ "metadata": {
+ "id": "Mh3nZZ1_12sX"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "with beam.Pipeline() as p2:\n",
+ " result = (\n",
+ " p2\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.Map(str.strip)\n",
+ " | WriteToText(\"/content/sample_data/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "hmO1Chl51vPG"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Unit Tests for Pipelines**"
+ ],
+ "metadata": {
+ "id": "uoNJLQl_15gj"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# The following packages are imported for unit testing.\n",
+ "import unittest\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.testing.test_pipeline import TestPipeline\n",
+ "from apache_beam.testing.util import assert_that, equal_to\n",
+ "try:\n",
+ " from apitools.base.py.exceptions import HttpError\n",
+ "except ImportError:\n",
+ " HttpError = None\n",
+ "\n",
+ "\n",
+ "@unittest.skipIf(HttpError is None, 'GCP dependencies are not
installed')\n",
+ "class TestBeam(unittest.TestCase):\n",
+ "\n",
+ "# This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.\n",
+ " def test_compute_square(self):\n",
+ " expected=[4]\n",
+ " with TestPipeline() as p:\n",
+ " output = p | beam.Create([\"1234\"]) \\\n",
+ " | beam.Map(compute_square)\n",
+ " assert_that(output, equal_to(expected))"
+ ],
+ "metadata": {
+ "id": "3-twYhdLTan0"
+ },
+ "execution_count": 41,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# This test corresponds to pipeline p2, and is written to confirm the
pipeline works as intended.\n",
+ "def test_strip_map(self):\n",
+ " expected=['Strawberry', 'Carrot', 'Eggplant']\n",
+ " strings = [' Strawberry \\n', ' Carrot \\n', ' Eggplant
\\n']\n",
+ " with TestPipeline() as p:\n",
+ " output = p | beam.Create(strings) \\\n",
+ " | beam.Map(str.strip)\n",
+ " assert_that(output, equal_to(expected))"
+ ],
+ "metadata": {
+ "id": "BU9Eil-TrtpE"
+ },
+ "execution_count": 42,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Mocking Example**"
+ ],
+ "metadata": {
+ "id": "58GVMyMa2PwE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!pip install mock # Install the 'mock' module"
+ ],
+ "metadata": {
+ "id": "ESclJ_G-6JcW"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# We import the mock package for mocking functionality.\n",
+ "import mock\n",
Review Comment:
Mocking often interacts poorly with serialization; I would avoid this when
possible. (Also, are these examples automatically tested?
##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+ "nbformat": 4,
Review Comment:
Are you referencing the notebook in your blog? Or is this an (older?) copy
of your blog code?
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+### Example pipeline 1
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
+ | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+ def custom_function(x):
+ ...
+ returned_record = requests.get("http://my-api-call.com")
+ ...
+ if len(returned_record)!=10:
+ raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding
tests for any lambda functions utilized within these Beam primitives.
Additionally, even if you’re using a predefined function, treat the entire
transform as a unit, and test it.
+
+Let’s use the following pipeline as an example. Because we have a function, we
should write a unit test to ensure that our function works as intended.
+
+ def compute_square(element):
+ return element**2
+
+ with beam.Pipeline(argv=self.args) as p1:
+ result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(compute_square)
+ | WriteToText("gs://my-output-bucket-location/")
+
+
+Now let’s use the following pipeline as another example. Because we use a
predefined function, we don’t need to unit test the function, as `str.strip`,
is tested elsewhere. However, we do need to test the output of the `beam.Map`
function.
+
+ with beam.Pipeline(argv=self.args) as p2:
+ result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(str.strip)
+ | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+ # The following packages are imported for unit testing.
+ import unittest
+ import apache_beam as beam
+
+
+ @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+ class TestBeam(unittest.TestCase):
+
+ # This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.
+ def test_compute_square(self):
+ numbers=[1,2,3]
+
+
+ with TestPipeline() as p:
+ output = p | beam.Create([1,2,3])
+ |beam.Map(compute_square)
+ assert_that(output, equal_to([1,4,9]))
+
+
+
+ # This test corresponds to pipeline p2, and is written to confirm that our
map operation works as intended.
+ def test_strip_map(self):
+ strings= [' Strawberry \n',' Carrot \n',' Eggplant
\n']
+ with TestPipeline() as p:
+ output = p | beam.Create(strings)
+ | beam.Map(str.strip)
+ assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:
Review Comment:
Users should generally use assert_that, equal_to, and (for streaming) test
stream.
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
+ | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+ def custom_function(x):
+ ...
+ returned_record = requests.get("http://my-api-call.com")
+ ...
+ if len(returned_record)!=10:
+ raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding
tests for any lambda functions utilized within these Beam primitives.
Additionally, even if you’re using a predefined function, treat the entire
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we
should write a unit test to ensure that our function works as intended.
+
+ def compute_square(element):
+ return element**2
+
+ with beam.Pipeline(argv=self.args) as p1:
+ result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(compute_square)
+ | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a
predefined function, we don’t need to unit test the function, as `str.strip`,
is tested elsewhere. However, we do need to test the output of the `beam.Map`
function.
+
+ with beam.Pipeline(argv=self.args) as p2:
+ result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(str.strip)
+ | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+ # The following packages are imported for unit testing.
+ import unittest
+ import apache_beam as beam
+
+
+ @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
Review Comment:
My comment above also applies here.
##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 36,
+ "metadata": {
+ "id": "7DSE6TgWy7PP"
+ },
+ "outputs": [],
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Install the Apache Beam library\n",
+ "\n",
+ "!pip install apache_beam[gcp] --quiet"
+ ],
+ "metadata": {
+ "id": "5W2nuV7uzlPg"
+ },
+ "execution_count": 37,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#The following packages are used to run the example pipelines\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.io import ReadFromText, WriteToText\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "\n",
+ "class CustomClass(beam.DoFn):\n",
+ " def custom_function(x):\n",
+ " ...\n",
+ " # returned_record =
requests.get(\"http://my-api-call.com\")\n",
+ " ...\n",
+ " # if len(returned_record)!=10:\n",
+ " # raise ValueError(\"Length of record does not match
expected length\")\n",
+ " return x\n",
+ "\n",
+ " with beam.Pipeline() as p:\n",
+ " result = (\n",
+ " p\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "Ktk9EVIFzGfP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 1**\n"
+ ],
+ "metadata": {
+ "id": "IVjBkewt1sLA"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# This function is going to return the square the integer at the
first index of our record.\n",
+ "def compute_square(element):\n",
+ " return int(element[1])**2\n",
+ "\n",
+ "with beam.Pipeline() as p1:\n",
+ " result = (\n",
+ " p1\n",
+ " |
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+ " | beam.Map(compute_square)\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "oHbSvOUI1pOe"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 2**"
+ ],
+ "metadata": {
+ "id": "Mh3nZZ1_12sX"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "with beam.Pipeline() as p2:\n",
+ " result = (\n",
+ " p2\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.Map(str.strip)\n",
+ " | WriteToText(\"/content/sample_data/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "hmO1Chl51vPG"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Unit Tests for Pipelines**"
+ ],
+ "metadata": {
+ "id": "uoNJLQl_15gj"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# The following packages are imported for unit testing.\n",
+ "import unittest\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.testing.test_pipeline import TestPipeline\n",
+ "from apache_beam.testing.util import assert_that, equal_to\n",
+ "try:\n",
+ " from apitools.base.py.exceptions import HttpError\n",
+ "except ImportError:\n",
+ " HttpError = None\n",
+ "\n",
+ "\n",
+ "@unittest.skipIf(HttpError is None, 'GCP dependencies are not
installed')\n",
+ "class TestBeam(unittest.TestCase):\n",
+ "\n",
+ "# This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.\n",
+ " def test_compute_square(self):\n",
Review Comment:
If `compute_square` is an ordinary Python function, I would recommend
writing "ordinary" unit tests for it rather than testing it as part of a
pipeline.
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
Review Comment:
You cand `ParDo(lambda)`. I would look into how snippets are used in the
programming guide to ensure the code is (and remains) correct.
##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "provenance": [],
+ "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+ "include_colab_link": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ },
+ "language_info": {
+ "name": "python"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "view-in-github",
+ "colab_type": "text"
+ },
+ "source": [
+ "<a
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\"
target=\"_parent\"><img
src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open In
Colab\"/></a>"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 36,
+ "metadata": {
+ "id": "7DSE6TgWy7PP"
+ },
+ "outputs": [],
+ "source": [
+ "# @title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# Install the Apache Beam library\n",
+ "\n",
+ "!pip install apache_beam[gcp] --quiet"
+ ],
+ "metadata": {
+ "id": "5W2nuV7uzlPg"
+ },
+ "execution_count": 37,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "#The following packages are used to run the example pipelines\n",
+ "\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.io import ReadFromText, WriteToText\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions\n",
+ "\n",
+ "class CustomClass(beam.DoFn):\n",
+ " def custom_function(x):\n",
+ " ...\n",
+ " # returned_record =
requests.get(\"http://my-api-call.com\")\n",
+ " ...\n",
+ " # if len(returned_record)!=10:\n",
+ " # raise ValueError(\"Length of record does not match
expected length\")\n",
+ " return x\n",
+ "\n",
+ " with beam.Pipeline() as p:\n",
+ " result = (\n",
+ " p\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "Ktk9EVIFzGfP"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 1**\n"
+ ],
+ "metadata": {
+ "id": "IVjBkewt1sLA"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# This function is going to return the square the integer at the
first index of our record.\n",
+ "def compute_square(element):\n",
+ " return int(element[1])**2\n",
+ "\n",
+ "with beam.Pipeline() as p1:\n",
+ " result = (\n",
+ " p1\n",
+ " |
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+ " | beam.Map(compute_square)\n",
+ " | WriteToText(\"/content/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "oHbSvOUI1pOe"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Example Pipeline 2**"
+ ],
+ "metadata": {
+ "id": "Mh3nZZ1_12sX"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "with beam.Pipeline() as p2:\n",
+ " result = (\n",
+ " p2\n",
+ " | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+ " | beam.Map(str.strip)\n",
+ " | WriteToText(\"/content/sample_data/\")\n",
+ " )"
+ ],
+ "metadata": {
+ "id": "hmO1Chl51vPG"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Unit Tests for Pipelines**"
+ ],
+ "metadata": {
+ "id": "uoNJLQl_15gj"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# The following packages are imported for unit testing.\n",
+ "import unittest\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.testing.test_pipeline import TestPipeline\n",
+ "from apache_beam.testing.util import assert_that, equal_to\n",
+ "try:\n",
+ " from apitools.base.py.exceptions import HttpError\n",
+ "except ImportError:\n",
+ " HttpError = None\n",
+ "\n",
+ "\n",
+ "@unittest.skipIf(HttpError is None, 'GCP dependencies are not
installed')\n",
+ "class TestBeam(unittest.TestCase):\n",
+ "\n",
+ "# This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.\n",
+ " def test_compute_square(self):\n",
+ " expected=[4]\n",
+ " with TestPipeline() as p:\n",
+ " output = p | beam.Create([\"1234\"]) \\\n",
+ " | beam.Map(compute_square)\n",
+ " assert_that(output, equal_to(expected))"
+ ],
+ "metadata": {
+ "id": "3-twYhdLTan0"
+ },
+ "execution_count": 41,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# This test corresponds to pipeline p2, and is written to confirm the
pipeline works as intended.\n",
+ "def test_strip_map(self):\n",
+ " expected=['Strawberry', 'Carrot', 'Eggplant']\n",
+ " strings = [' Strawberry \\n', ' Carrot \\n', ' Eggplant
\\n']\n",
+ " with TestPipeline() as p:\n",
+ " output = p | beam.Create(strings) \\\n",
+ " | beam.Map(str.strip)\n",
+ " assert_that(output, equal_to(expected))"
+ ],
+ "metadata": {
+ "id": "BU9Eil-TrtpE"
+ },
+ "execution_count": 42,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "source": [
+ "**Mocking Example**"
+ ],
+ "metadata": {
+ "id": "58GVMyMa2PwE"
+ }
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "!pip install mock # Install the 'mock' module"
+ ],
+ "metadata": {
+ "id": "ESclJ_G-6JcW"
+ },
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "code",
+ "source": [
+ "# We import the mock package for mocking functionality.\n",
+ "import mock\n",
+ "\n",
+ "@mock.patch.object(CustomClass, 'custom_function')\n",
+ "def test_error_message_wrong_length(self, get_record):\n",
+ " record = [\"field1\",\"field2\"]\n",
+ " CustomClass.custom_function.return_value = record\n",
+ " with self.assertRaisesRegex(ValueError,\n",
+ " \"Length of record does not match
expected length'\"):\n",
+ " p = beam.Pipeline()\n",
+ " result = p | beam.ParDo(CustomClass.custom_function())\n",
Review Comment:
`CustomClass.custom_function()` returns a `DoFn`? I'm a bit confused at what
you're trying to test here.
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
+ | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+ def custom_function(x):
+ ...
+ returned_record = requests.get("http://my-api-call.com")
+ ...
+ if len(returned_record)!=10:
+ raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding
tests for any lambda functions utilized within these Beam primitives.
Additionally, even if you’re using a predefined function, treat the entire
transform as a unit, and test it.
Review Comment:
I disagree with this recommendation. If one has `Map(some_function)` than we
should (1) ensure that `some_function` itself is well tested and (2) ensure
that `Map(...)` is well tested in the Beam codebase. Users should not have to
validate that `Map(...)` works as intended.
If you write a `DoFn`, testing it by placing it in a pipeline is what I
would recommend.
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
Review Comment:
So, the problem is one can't really test this pipeline without reproducing
it in the test. I think there are a couple of ways of re-structuring the code
to make it more testable (and realistic).
(1) For an end-to-end test, put your pipeline in a function that takes the
input and output paths as parameters. Your production code will call this with,
e.g. gcs paths, but your test could create temporary directories and files and
validate that. This tests your whole pipeline, including IOs, is structured
correct.
(2) Factor out the "processing" part of your pipeline into its own
PTransform, or at least function. E.g. your pipeline would be
```
with beam.Pipeline(argv=self.args) as p:
_ = (p
| beam.io.ReadFromText("gs://my-storage-bucket/csv_location.csv")
| ProcessData(...)
| beam.io.WriteToText()
```
and then your unit test would look like
```
with beam.Pipeline(argv=self.args) as p:
_ = (p
| beam.Create(["some", "sample", "elements"])
| ProcessData(...)
| AssertEqualTo(["expected", "outputs"]))
```
or (equivalently, but less parallel)
```
with beam.Pipeline(argv=self.args) as p:
output_pcoll = (p
| beam.Create(["some", "sample", "elements"])
| ProcessData(...))
assert_that(output_pcoll, equal_to(...))
```
If writing a custom ProcessData PTransform is too much work, one could at
least have
```
output_pcoll = process_data(input_pcoll)
```
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
+ | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+ def custom_function(x):
+ ...
+ returned_record = requests.get("http://my-api-call.com")
+ ...
+ if len(returned_record)!=10:
+ raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding
tests for any lambda functions utilized within these Beam primitives.
Additionally, even if you’re using a predefined function, treat the entire
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we
should write a unit test to ensure that our function works as intended.
+
+ def compute_square(element):
+ return element**2
+
+ with beam.Pipeline(argv=self.args) as p1:
+ result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(compute_square)
+ | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a
predefined function, we don’t need to unit test the function, as `str.strip`,
is tested elsewhere. However, we do need to test the output of the `beam.Map`
function.
+
+ with beam.Pipeline(argv=self.args) as p2:
+ result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(str.strip)
+ | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+ # The following packages are imported for unit testing.
+ import unittest
+ import apache_beam as beam
+
+
+ @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+ class TestBeam(unittest.TestCase):
+
+ # This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.
+ def test_compute_square(self):
+ numbers=[1,2,3]
+
+
+ with TestPipeline() as p:
+ output = p | beam.Create([1,2,3])
+ | beam.Map(compute_square)
+ assert_that(output, equal_to([1,4,9]))
+
+
+
+ # This test corresponds to pipeline p2, and is written to confirm that our
map operation works as intended.
+ def test_strip_map(self):
+ strings= [' Strawberry \n',' Carrot \n',' Eggplant
\n']
+ with TestPipeline() as p:
+ output = p | beam.Create(strings)
+ | beam.Map(str.strip)
+ assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:
+
+1. Test all error messages you raise.
+2. Cover any edge cases that might be present in your data.
+3. Notice that in pipeline 1, we could have written the `beam.Map` step as
`beam.Map(lambda x: x**2)`, instead of `beam.Map(compute_square)`. The latter
(separating the lambda into a helper function) is the recommended approach for
more testable code, as changes to the function would be modularized.
+4. Use the `assert_that` statement to ensure that PCollection values match up
correctly, such as the following example:
+
+
+ class TestBeam(unittest.TestCase):
+ def test_custom_function(self):
+ with TestPipeline() as p:
+ input = p | beam.ParDo(custom_function(("1","2","3"))
Review Comment:
Nit: If we want ints, lets use ints, if we want strings, let's use strings
(like `"a", "b", "c"` or fruit names or whatever, not numeric strings).
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
Review Comment:
Java-style imports?
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
+ | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+ def custom_function(x):
+ ...
+ returned_record = requests.get("http://my-api-call.com")
+ ...
+ if len(returned_record)!=10:
+ raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding
tests for any lambda functions utilized within these Beam primitives.
Additionally, even if you’re using a predefined function, treat the entire
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we
should write a unit test to ensure that our function works as intended.
+
+ def compute_square(element):
+ return element**2
+
+ with beam.Pipeline(argv=self.args) as p1:
+ result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(compute_square)
+ | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a
predefined function, we don’t need to unit test the function, as `str.strip`,
is tested elsewhere. However, we do need to test the output of the `beam.Map`
function.
+
+ with beam.Pipeline(argv=self.args) as p2:
+ result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(str.strip)
+ | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+ # The following packages are imported for unit testing.
+ import unittest
+ import apache_beam as beam
+
+
+ @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+ class TestBeam(unittest.TestCase):
+
+ # This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.
+ def test_compute_square(self):
+ numbers=[1,2,3]
+
+
+ with TestPipeline() as p:
+ output = p | beam.Create([1,2,3])
+ | beam.Map(compute_square)
+ assert_that(output, equal_to([1,4,9]))
+
+
+
+ # This test corresponds to pipeline p2, and is written to confirm that our
map operation works as intended.
+ def test_strip_map(self):
+ strings= [' Strawberry \n',' Carrot \n',' Eggplant
\n']
+ with TestPipeline() as p:
+ output = p | beam.Create(strings)
+ | beam.Map(str.strip)
+ assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:
+
+1. Test all error messages you raise.
+2. Cover any edge cases that might be present in your data.
+3. Notice that in pipeline 1, we could have written the `beam.Map` step as
`beam.Map(lambda x: x**2)`, instead of `beam.Map(compute_square)`. The latter
(separating the lambda into a helper function) is the recommended approach for
more testable code, as changes to the function would be modularized.
+4. Use the `assert_that` statement to ensure that PCollection values match up
correctly, such as the following example:
+
+
+ class TestBeam(unittest.TestCase):
+ def test_custom_function(self):
+ with TestPipeline() as p:
+ input = p | beam.ParDo(custom_function(("1","2","3"))
+ assert_that(input, equal_to(["1","2","3"]))
+
+
+5. If needed, use mocking to mock any API calls that might be present in your
ParDo function. The purpose of mocking is to test your functionality
extensively, even if this testing requires a specific response from an API call.
+
+The following snippet is based off of example pipeline 1, from the top of this
blog post.
+
+ # We import the mock package for mocking functionality
+ import mock
+
+
+ @mock.patch.object(CustomFunction, 'get_record')
+ def test_error_message_wrong_length(self, get_record):
+ record = ["field1","field2",...]
+ get_record.return_value = record
Review Comment:
I am not following this at all. Where is get_record being called? Why is it
being mocked?
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
+ | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+ def custom_function(x):
+ ...
+ returned_record = requests.get("http://my-api-call.com")
+ ...
+ if len(returned_record)!=10:
+ raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding
tests for any lambda functions utilized within these Beam primitives.
Additionally, even if you’re using a predefined function, treat the entire
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we
should write a unit test to ensure that our function works as intended.
+
+ def compute_square(element):
+ return element**2
+
+ with beam.Pipeline(argv=self.args) as p1:
+ result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(compute_square)
+ | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a
predefined function, we don’t need to unit test the function, as `str.strip`,
is tested elsewhere. However, we do need to test the output of the `beam.Map`
function.
+
+ with beam.Pipeline(argv=self.args) as p2:
+ result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(str.strip)
+ | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+ # The following packages are imported for unit testing.
+ import unittest
+ import apache_beam as beam
+
+
+ @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+ class TestBeam(unittest.TestCase):
+
+ # This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.
+ def test_compute_square(self):
+ numbers=[1,2,3]
+
+
+ with TestPipeline() as p:
+ output = p | beam.Create([1,2,3])
+ | beam.Map(compute_square)
+ assert_that(output, equal_to([1,4,9]))
+
+
+
+ # This test corresponds to pipeline p2, and is written to confirm that our
map operation works as intended.
+ def test_strip_map(self):
+ strings= [' Strawberry \n',' Carrot \n',' Eggplant
\n']
+ with TestPipeline() as p:
+ output = p | beam.Create(strings)
+ | beam.Map(str.strip)
+ assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:
+
+1. Test all error messages you raise.
+2. Cover any edge cases that might be present in your data.
+3. Notice that in pipeline 1, we could have written the `beam.Map` step as
`beam.Map(lambda x: x**2)`, instead of `beam.Map(compute_square)`. The latter
(separating the lambda into a helper function) is the recommended approach for
more testable code, as changes to the function would be modularized.
+4. Use the `assert_that` statement to ensure that PCollection values match up
correctly, such as the following example:
+
+
+ class TestBeam(unittest.TestCase):
+ def test_custom_function(self):
+ with TestPipeline() as p:
+ input = p | beam.ParDo(custom_function(("1","2","3"))
+ assert_that(input, equal_to(["1","2","3"]))
Review Comment:
I don't think this works, but I'm also trying to understand what it's even
trying to do. You can't apply a ParDo to a raw pipeline object. ParDo accepts a
DoFn, is `custom_function(some_tuple)` returning a DoFn that yields the tuple?
Again, if we actually ran this code we'd uncover these issues.
##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title: "So You Want to Write Tests on Your Beam Pipeline?"
+date: 2024-07-08 00:00:01 -0800
+categories:
+ - blog
+authors:
+ - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software
engineering. In this blog post, we shed light on some of the constructs that
Apache Beam provides to allow for testing. We cover an opinionated set of best
practices to write unit tests for your data pipeline in this post. Note that
this post does not include integration tests, and those should be authored
separately. The examples used in this post are in Python, but the concepts
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets
passed through a custom function for parsing, and is written back to another
Google Cloud Storage bucket (we need to do some custom data formatting to have
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+ #The following packages are used to run the example pipelines
+ import apache_beam as beam
+ import apache_beam.io.textio.ReadFromText
+ import apache_beam.io.textio.WriteToText
+
+ with beam.Pipeline(argv=self.args) as p:
+ result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.ParDo(lambda x: custom_function(x))
+ | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+ def custom_function(x):
+ ...
+ returned_record = requests.get("http://my-api-call.com")
+ ...
+ if len(returned_record)!=10:
+ raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding
tests for any lambda functions utilized within these Beam primitives.
Additionally, even if you’re using a predefined function, treat the entire
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we
should write a unit test to ensure that our function works as intended.
+
+ def compute_square(element):
+ return element**2
+
+ with beam.Pipeline(argv=self.args) as p1:
+ result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(compute_square)
+ | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a
predefined function, we don’t need to unit test the function, as `str.strip`,
is tested elsewhere. However, we do need to test the output of the `beam.Map`
function.
+
+ with beam.Pipeline(argv=self.args) as p2:
+ result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+ | beam.Map(str.strip)
+ | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+ # The following packages are imported for unit testing.
+ import unittest
+ import apache_beam as beam
+
+
+ @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+ class TestBeam(unittest.TestCase):
+
+ # This test corresponds to pipeline p1, and is written to confirm the
compute_square function works as intended.
+ def test_compute_square(self):
+ numbers=[1,2,3]
+
+
+ with TestPipeline() as p:
+ output = p | beam.Create([1,2,3])
+ | beam.Map(compute_square)
+ assert_that(output, equal_to([1,4,9]))
+
+
+
+ # This test corresponds to pipeline p2, and is written to confirm that our
map operation works as intended.
+ def test_strip_map(self):
+ strings= [' Strawberry \n',' Carrot \n',' Eggplant
\n']
+ with TestPipeline() as p:
+ output = p | beam.Create(strings)
+ | beam.Map(str.strip)
+ assert_that(output,['Strawberry','Carrot','Eggplant'])
Review Comment:
I commented on this above, but let's ensure they're continuously run.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]