robertwb commented on code in PR #31701:
URL: https://github.com/apache/beam/pull/31701#discussion_r1672788011


##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 36,
+      "metadata": {
+        "id": "7DSE6TgWy7PP"
+      },
+      "outputs": [],
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Install the Apache Beam library\n",
+        "\n",
+        "!pip install apache_beam[gcp] --quiet"
+      ],
+      "metadata": {
+        "id": "5W2nuV7uzlPg"
+      },
+      "execution_count": 37,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#The following packages are used to run the example pipelines\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.io import ReadFromText, WriteToText\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "\n",
+        "class CustomClass(beam.DoFn):\n",
+        "  def custom_function(x):\n",
+        "          ...\n",
+        "          # returned_record = 
requests.get(\"http://my-api-call.com\";)\n",
+        "          ...\n",
+        "          # if len(returned_record)!=10:\n",
+        "          # raise ValueError(\"Length of record does not match 
expected length\")\n",
+        "          return x\n",
+        "\n",
+        "  with beam.Pipeline() as p:\n",
+        "    result = (\n",
+        "            p\n",
+        "            | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "            | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+        "            | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "Ktk9EVIFzGfP"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 1**\n"
+      ],
+      "metadata": {
+        "id": "IVjBkewt1sLA"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# This function is going to return the square the integer at the 
first index of our record.\n",
+        "def compute_square(element):\n",
+        "  return int(element[1])**2\n",
+        "\n",
+        "with beam.Pipeline() as p1:\n",
+        "    result = (\n",
+        "        p1\n",
+        "        | 
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+        "        | beam.Map(compute_square)\n",
+        "        | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "oHbSvOUI1pOe"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 2**"
+      ],
+      "metadata": {
+        "id": "Mh3nZZ1_12sX"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "with beam.Pipeline() as p2:\n",
+        "    result = (\n",
+        "        p2\n",
+        "        | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "        | beam.Map(str.strip)\n",
+        "        | WriteToText(\"/content/sample_data/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "hmO1Chl51vPG"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Unit Tests for Pipelines**"
+      ],
+      "metadata": {
+        "id": "uoNJLQl_15gj"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# The following packages are imported for unit testing.\n",
+        "import unittest\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.testing.test_pipeline import TestPipeline\n",
+        "from apache_beam.testing.util import assert_that, equal_to\n",
+        "try:\n",
+        "  from apitools.base.py.exceptions import HttpError\n",
+        "except ImportError:\n",
+        "  HttpError = None\n",
+        "\n",
+        "\n",
+        "@unittest.skipIf(HttpError is None, 'GCP dependencies are not 
installed')\n",

Review Comment:
   It would seems one of the main points of unit testing is to not have 
heavyweight dependencies. I wouldn't say skipping like this is best practices 
unless absolutely necessary. 



##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 36,
+      "metadata": {
+        "id": "7DSE6TgWy7PP"
+      },
+      "outputs": [],
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Install the Apache Beam library\n",
+        "\n",
+        "!pip install apache_beam[gcp] --quiet"
+      ],
+      "metadata": {
+        "id": "5W2nuV7uzlPg"
+      },
+      "execution_count": 37,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#The following packages are used to run the example pipelines\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.io import ReadFromText, WriteToText\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "\n",
+        "class CustomClass(beam.DoFn):\n",
+        "  def custom_function(x):\n",
+        "          ...\n",
+        "          # returned_record = 
requests.get(\"http://my-api-call.com\";)\n",
+        "          ...\n",
+        "          # if len(returned_record)!=10:\n",
+        "          # raise ValueError(\"Length of record does not match 
expected length\")\n",
+        "          return x\n",
+        "\n",
+        "  with beam.Pipeline() as p:\n",
+        "    result = (\n",
+        "            p\n",
+        "            | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "            | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+        "            | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "Ktk9EVIFzGfP"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 1**\n"
+      ],
+      "metadata": {
+        "id": "IVjBkewt1sLA"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# This function is going to return the square the integer at the 
first index of our record.\n",
+        "def compute_square(element):\n",
+        "  return int(element[1])**2\n",
+        "\n",
+        "with beam.Pipeline() as p1:\n",
+        "    result = (\n",
+        "        p1\n",
+        "        | 
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+        "        | beam.Map(compute_square)\n",
+        "        | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "oHbSvOUI1pOe"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 2**"
+      ],
+      "metadata": {
+        "id": "Mh3nZZ1_12sX"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "with beam.Pipeline() as p2:\n",
+        "    result = (\n",
+        "        p2\n",
+        "        | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "        | beam.Map(str.strip)\n",
+        "        | WriteToText(\"/content/sample_data/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "hmO1Chl51vPG"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Unit Tests for Pipelines**"
+      ],
+      "metadata": {
+        "id": "uoNJLQl_15gj"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# The following packages are imported for unit testing.\n",
+        "import unittest\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.testing.test_pipeline import TestPipeline\n",
+        "from apache_beam.testing.util import assert_that, equal_to\n",
+        "try:\n",
+        "  from apitools.base.py.exceptions import HttpError\n",
+        "except ImportError:\n",
+        "  HttpError = None\n",
+        "\n",
+        "\n",
+        "@unittest.skipIf(HttpError is None, 'GCP dependencies are not 
installed')\n",
+        "class TestBeam(unittest.TestCase):\n",
+        "\n",
+        "# This test corresponds to pipeline p1, and is written to confirm the 
compute_square function works as intended.\n",
+        "  def test_compute_square(self):\n",
+        "    expected=[4]\n",
+        "    with TestPipeline() as p:\n",
+        "      output = p | beam.Create([\"1234\"]) \\\n",
+        "                 | beam.Map(compute_square)\n",
+        "      assert_that(output, equal_to(expected))"
+      ],
+      "metadata": {
+        "id": "3-twYhdLTan0"
+      },
+      "execution_count": 41,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# This test corresponds to pipeline p2, and is written to confirm the 
pipeline works as intended.\n",
+        "def test_strip_map(self):\n",
+        "  expected=['Strawberry', 'Carrot', 'Eggplant']\n",
+        "  strings = [' Strawberry   \\n', '   Carrot   \\n', '   Eggplant   
\\n']\n",
+        "  with TestPipeline() as p:\n",
+        "    output = p | beam.Create(strings) \\\n",
+        "               | beam.Map(str.strip)\n",
+        "    assert_that(output, equal_to(expected))"
+      ],
+      "metadata": {
+        "id": "BU9Eil-TrtpE"
+      },
+      "execution_count": 42,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Mocking Example**"
+      ],
+      "metadata": {
+        "id": "58GVMyMa2PwE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "!pip install mock  # Install the 'mock' module"
+      ],
+      "metadata": {
+        "id": "ESclJ_G-6JcW"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# We import the mock package for mocking functionality.\n",
+        "import mock\n",

Review Comment:
   Mocking often interacts poorly with serialization; I would avoid this when 
possible. (Also, are these examples automatically tested? 



##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+  "nbformat": 4,

Review Comment:
   Are you referencing the notebook in your blog? Or is this an (older?) copy 
of your blog code?



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+### Example pipeline 1
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+    result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+               | beam.ParDo(lambda x: custom_function(x))
+               | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+    def custom_function(x):
+         ...
+         returned_record = requests.get("http://my-api-call.com";)
+         ...
+         if len(returned_record)!=10:
+            raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors 
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors 
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the 
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding 
tests for any lambda functions utilized within these Beam primitives. 
Additionally, even if you’re using a predefined function, treat the entire 
transform as a unit, and test it.
+
+Let’s use the following pipeline as an example. Because we have a function, we 
should write a unit test to ensure that our function works as intended.
+
+    def compute_square(element):
+        return element**2
+
+    with beam.Pipeline(argv=self.args) as p1:
+        result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(compute_square)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+
+Now let’s use the following pipeline as another example. Because we use a 
predefined function, we don’t need to unit test the function, as `str.strip`, 
is tested elsewhere. However, we do need to test the output of the `beam.Map` 
function.
+
+    with beam.Pipeline(argv=self.args) as p2:
+        result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(str.strip)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+    # The following packages are imported for unit testing.
+    import unittest
+    import apache_beam as beam
+
+
+    @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+    class TestBeam(unittest.TestCase):
+
+    # This test corresponds to pipeline p1, and is written to confirm the      
    compute_square function works as intended.
+        def test_compute_square(self):
+            numbers=[1,2,3]
+
+
+        with TestPipeline() as p:
+            output = p | beam.Create([1,2,3])
+                       |beam.Map(compute_square)
+        assert_that(output, equal_to([1,4,9]))
+
+
+
+    # This test corresponds to pipeline p2, and is written to confirm that our 
map operation works as intended.
+       def test_strip_map(self):
+               strings= [' Strawberry   \n','   Carrot   \n','   Eggplant   
\n']
+               with TestPipeline() as p:
+                       output = p | beam.Create(strings)
+                                      | beam.Map(str.strip)
+        assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:

Review Comment:
   Users should generally use assert_that, equal_to, and (for streaming) test 
stream.



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                   | beam.ParDo(lambda x: custom_function(x))
+                   | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+    def custom_function(x):
+         ...
+         returned_record = requests.get("http://my-api-call.com";)
+         ...
+         if len(returned_record)!=10:
+            raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors 
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors 
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the 
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding 
tests for any lambda functions utilized within these Beam primitives. 
Additionally, even if you’re using a predefined function, treat the entire 
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we 
should write a unit test to ensure that our function works as intended.
+
+    def compute_square(element):
+        return element**2
+
+    with beam.Pipeline(argv=self.args) as p1:
+        result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(compute_square)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a 
predefined function, we don’t need to unit test the function, as `str.strip`, 
is tested elsewhere. However, we do need to test the output of the `beam.Map` 
function.
+
+    with beam.Pipeline(argv=self.args) as p2:
+        result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(str.strip)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+    # The following packages are imported for unit testing.
+    import unittest
+    import apache_beam as beam
+
+
+    @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')

Review Comment:
   My comment above also applies here.



##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 36,
+      "metadata": {
+        "id": "7DSE6TgWy7PP"
+      },
+      "outputs": [],
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Install the Apache Beam library\n",
+        "\n",
+        "!pip install apache_beam[gcp] --quiet"
+      ],
+      "metadata": {
+        "id": "5W2nuV7uzlPg"
+      },
+      "execution_count": 37,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#The following packages are used to run the example pipelines\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.io import ReadFromText, WriteToText\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "\n",
+        "class CustomClass(beam.DoFn):\n",
+        "  def custom_function(x):\n",
+        "          ...\n",
+        "          # returned_record = 
requests.get(\"http://my-api-call.com\";)\n",
+        "          ...\n",
+        "          # if len(returned_record)!=10:\n",
+        "          # raise ValueError(\"Length of record does not match 
expected length\")\n",
+        "          return x\n",
+        "\n",
+        "  with beam.Pipeline() as p:\n",
+        "    result = (\n",
+        "            p\n",
+        "            | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "            | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+        "            | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "Ktk9EVIFzGfP"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 1**\n"
+      ],
+      "metadata": {
+        "id": "IVjBkewt1sLA"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# This function is going to return the square the integer at the 
first index of our record.\n",
+        "def compute_square(element):\n",
+        "  return int(element[1])**2\n",
+        "\n",
+        "with beam.Pipeline() as p1:\n",
+        "    result = (\n",
+        "        p1\n",
+        "        | 
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+        "        | beam.Map(compute_square)\n",
+        "        | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "oHbSvOUI1pOe"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 2**"
+      ],
+      "metadata": {
+        "id": "Mh3nZZ1_12sX"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "with beam.Pipeline() as p2:\n",
+        "    result = (\n",
+        "        p2\n",
+        "        | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "        | beam.Map(str.strip)\n",
+        "        | WriteToText(\"/content/sample_data/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "hmO1Chl51vPG"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Unit Tests for Pipelines**"
+      ],
+      "metadata": {
+        "id": "uoNJLQl_15gj"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# The following packages are imported for unit testing.\n",
+        "import unittest\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.testing.test_pipeline import TestPipeline\n",
+        "from apache_beam.testing.util import assert_that, equal_to\n",
+        "try:\n",
+        "  from apitools.base.py.exceptions import HttpError\n",
+        "except ImportError:\n",
+        "  HttpError = None\n",
+        "\n",
+        "\n",
+        "@unittest.skipIf(HttpError is None, 'GCP dependencies are not 
installed')\n",
+        "class TestBeam(unittest.TestCase):\n",
+        "\n",
+        "# This test corresponds to pipeline p1, and is written to confirm the 
compute_square function works as intended.\n",
+        "  def test_compute_square(self):\n",

Review Comment:
   If `compute_square` is an ordinary Python function, I would recommend 
writing "ordinary" unit tests for it rather than testing it as part of a 
pipeline. 



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                   | beam.ParDo(lambda x: custom_function(x))

Review Comment:
   You cand `ParDo(lambda)`. I would look into how snippets are used in the 
programming guide to ensure the code is (and remains) correct. 



##########
examples/notebooks/blogposts/unittests_in_beam.ipynb:
##########
@@ -0,0 +1,259 @@
+{
+  "nbformat": 4,
+  "nbformat_minor": 0,
+  "metadata": {
+    "colab": {
+      "provenance": [],
+      "authorship_tag": "ABX9TyP+whTO0l5Xd2TU4xa2Z7KC",
+      "include_colab_link": true
+    },
+    "kernelspec": {
+      "name": "python3",
+      "display_name": "Python 3"
+    },
+    "language_info": {
+      "name": "python"
+    }
+  },
+  "cells": [
+    {
+      "cell_type": "markdown",
+      "metadata": {
+        "id": "view-in-github",
+        "colab_type": "text"
+      },
+      "source": [
+        "<a 
href=\"https://colab.research.google.com/github/apache/beam/blob/testing_blog_post/examples/notebooks/blogposts/unittests_in_beam.ipynb\";
 target=\"_parent\"><img 
src=\"https://colab.research.google.com/assets/colab-badge.svg\"; alt=\"Open In 
Colab\"/></a>"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "execution_count": 36,
+      "metadata": {
+        "id": "7DSE6TgWy7PP"
+      },
+      "outputs": [],
+      "source": [
+        "# @title ###### Licensed to the Apache Software Foundation (ASF), 
Version 2.0 (the \"License\")\n",
+        "\n",
+        "# Licensed to the Apache Software Foundation (ASF) under one\n",
+        "# or more contributor license agreements. See the NOTICE file\n",
+        "# distributed with this work for additional information\n",
+        "# regarding copyright ownership. The ASF licenses this file\n",
+        "# to you under the Apache License, Version 2.0 (the\n",
+        "# \"License\"); you may not use this file except in compliance\n",
+        "# with the License. You may obtain a copy of the License at\n",
+        "#\n",
+        "#   http://www.apache.org/licenses/LICENSE-2.0\n";,
+        "#\n",
+        "# Unless required by applicable law or agreed to in writing,\n",
+        "# software distributed under the License is distributed on an\n",
+        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+        "# KIND, either express or implied. See the License for the\n",
+        "# specific language governing permissions and limitations\n",
+        "# under the License"
+      ]
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# Install the Apache Beam library\n",
+        "\n",
+        "!pip install apache_beam[gcp] --quiet"
+      ],
+      "metadata": {
+        "id": "5W2nuV7uzlPg"
+      },
+      "execution_count": 37,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "#The following packages are used to run the example pipelines\n",
+        "\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.io import ReadFromText, WriteToText\n",
+        "from apache_beam.options.pipeline_options import PipelineOptions\n",
+        "\n",
+        "class CustomClass(beam.DoFn):\n",
+        "  def custom_function(x):\n",
+        "          ...\n",
+        "          # returned_record = 
requests.get(\"http://my-api-call.com\";)\n",
+        "          ...\n",
+        "          # if len(returned_record)!=10:\n",
+        "          # raise ValueError(\"Length of record does not match 
expected length\")\n",
+        "          return x\n",
+        "\n",
+        "  with beam.Pipeline() as p:\n",
+        "    result = (\n",
+        "            p\n",
+        "            | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "            | beam.ParDo(lambda x: CustomClass.custom_function(x))\n",
+        "            | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "Ktk9EVIFzGfP"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 1**\n"
+      ],
+      "metadata": {
+        "id": "IVjBkewt1sLA"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# This function is going to return the square the integer at the 
first index of our record.\n",
+        "def compute_square(element):\n",
+        "  return int(element[1])**2\n",
+        "\n",
+        "with beam.Pipeline() as p1:\n",
+        "    result = (\n",
+        "        p1\n",
+        "        | 
ReadFromText(\"/content/sample_data/california_housing_test.csv\",skip_header_lines=1)\n",
+        "        | beam.Map(compute_square)\n",
+        "        | WriteToText(\"/content/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "oHbSvOUI1pOe"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Example Pipeline 2**"
+      ],
+      "metadata": {
+        "id": "Mh3nZZ1_12sX"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "with beam.Pipeline() as p2:\n",
+        "    result = (\n",
+        "        p2\n",
+        "        | ReadFromText(\"/content/sample_data/anscombe.json\")\n",
+        "        | beam.Map(str.strip)\n",
+        "        | WriteToText(\"/content/sample_data/\")\n",
+        "    )"
+      ],
+      "metadata": {
+        "id": "hmO1Chl51vPG"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Unit Tests for Pipelines**"
+      ],
+      "metadata": {
+        "id": "uoNJLQl_15gj"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# The following packages are imported for unit testing.\n",
+        "import unittest\n",
+        "import apache_beam as beam\n",
+        "from apache_beam.testing.test_pipeline import TestPipeline\n",
+        "from apache_beam.testing.util import assert_that, equal_to\n",
+        "try:\n",
+        "  from apitools.base.py.exceptions import HttpError\n",
+        "except ImportError:\n",
+        "  HttpError = None\n",
+        "\n",
+        "\n",
+        "@unittest.skipIf(HttpError is None, 'GCP dependencies are not 
installed')\n",
+        "class TestBeam(unittest.TestCase):\n",
+        "\n",
+        "# This test corresponds to pipeline p1, and is written to confirm the 
compute_square function works as intended.\n",
+        "  def test_compute_square(self):\n",
+        "    expected=[4]\n",
+        "    with TestPipeline() as p:\n",
+        "      output = p | beam.Create([\"1234\"]) \\\n",
+        "                 | beam.Map(compute_square)\n",
+        "      assert_that(output, equal_to(expected))"
+      ],
+      "metadata": {
+        "id": "3-twYhdLTan0"
+      },
+      "execution_count": 41,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# This test corresponds to pipeline p2, and is written to confirm the 
pipeline works as intended.\n",
+        "def test_strip_map(self):\n",
+        "  expected=['Strawberry', 'Carrot', 'Eggplant']\n",
+        "  strings = [' Strawberry   \\n', '   Carrot   \\n', '   Eggplant   
\\n']\n",
+        "  with TestPipeline() as p:\n",
+        "    output = p | beam.Create(strings) \\\n",
+        "               | beam.Map(str.strip)\n",
+        "    assert_that(output, equal_to(expected))"
+      ],
+      "metadata": {
+        "id": "BU9Eil-TrtpE"
+      },
+      "execution_count": 42,
+      "outputs": []
+    },
+    {
+      "cell_type": "markdown",
+      "source": [
+        "**Mocking Example**"
+      ],
+      "metadata": {
+        "id": "58GVMyMa2PwE"
+      }
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "!pip install mock  # Install the 'mock' module"
+      ],
+      "metadata": {
+        "id": "ESclJ_G-6JcW"
+      },
+      "execution_count": null,
+      "outputs": []
+    },
+    {
+      "cell_type": "code",
+      "source": [
+        "# We import the mock package for mocking functionality.\n",
+        "import mock\n",
+        "\n",
+        "@mock.patch.object(CustomClass, 'custom_function')\n",
+        "def test_error_message_wrong_length(self, get_record):\n",
+        "  record = [\"field1\",\"field2\"]\n",
+        "  CustomClass.custom_function.return_value = record\n",
+        "  with self.assertRaisesRegex(ValueError,\n",
+        "                              \"Length of record does not match 
expected length'\"):\n",
+        "      p = beam.Pipeline()\n",
+        "      result = p | beam.ParDo(CustomClass.custom_function())\n",

Review Comment:
   `CustomClass.custom_function()` returns a `DoFn`? I'm a bit confused at what 
you're trying to test here. 



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                   | beam.ParDo(lambda x: custom_function(x))
+                   | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+    def custom_function(x):
+         ...
+         returned_record = requests.get("http://my-api-call.com";)
+         ...
+         if len(returned_record)!=10:
+            raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors 
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors 
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the 
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding 
tests for any lambda functions utilized within these Beam primitives. 
Additionally, even if you’re using a predefined function, treat the entire 
transform as a unit, and test it.

Review Comment:
   I disagree with this recommendation. If one has `Map(some_function)` than we 
should (1) ensure that `some_function` itself is well tested and (2) ensure 
that `Map(...)` is well tested in the Beam codebase. Users should not have to 
validate that `Map(...)` works as intended. 
   
   If you write a `DoFn`, testing it by placing it in a pipeline is what I 
would recommend. 



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")

Review Comment:
   So, the problem is one can't really test this pipeline without reproducing 
it in the test. I think there are a couple of ways of re-structuring the code 
to make it more testable (and realistic). 
   
   (1) For an end-to-end test, put your pipeline in a function that takes the 
input and output paths as parameters. Your production code will call this with, 
e.g. gcs paths, but your test could create temporary directories and files and 
validate that. This tests your whole pipeline, including IOs, is structured 
correct. 
   
   (2) Factor out the "processing" part of your pipeline into its own 
PTransform, or at least function. E.g. your pipeline would be
   
   ```
   with beam.Pipeline(argv=self.args) as p:
     _ = (p
          | beam.io.ReadFromText("gs://my-storage-bucket/csv_location.csv")
          | ProcessData(...)
          | beam.io.WriteToText()
   ```
   
   and then your unit test would look like
   
   ```
   with beam.Pipeline(argv=self.args) as p:
     _ = (p
          | beam.Create(["some", "sample", "elements"])
          | ProcessData(...)
          | AssertEqualTo(["expected", "outputs"]))
   ```
   
   or (equivalently, but less parallel)
   
   ```
   with beam.Pipeline(argv=self.args) as p:
     output_pcoll = (p
          | beam.Create(["some", "sample", "elements"])
          | ProcessData(...))
       assert_that(output_pcoll, equal_to(...))
   ```
   
   If writing a custom ProcessData PTransform is too much work, one could at 
least have
   
   ```
   output_pcoll = process_data(input_pcoll)
   ```



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                   | beam.ParDo(lambda x: custom_function(x))
+                   | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+    def custom_function(x):
+         ...
+         returned_record = requests.get("http://my-api-call.com";)
+         ...
+         if len(returned_record)!=10:
+            raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors 
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors 
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the 
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding 
tests for any lambda functions utilized within these Beam primitives. 
Additionally, even if you’re using a predefined function, treat the entire 
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we 
should write a unit test to ensure that our function works as intended.
+
+    def compute_square(element):
+        return element**2
+
+    with beam.Pipeline(argv=self.args) as p1:
+        result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(compute_square)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a 
predefined function, we don’t need to unit test the function, as `str.strip`, 
is tested elsewhere. However, we do need to test the output of the `beam.Map` 
function.
+
+    with beam.Pipeline(argv=self.args) as p2:
+        result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(str.strip)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+    # The following packages are imported for unit testing.
+    import unittest
+    import apache_beam as beam
+
+
+    @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+    class TestBeam(unittest.TestCase):
+
+    # This test corresponds to pipeline p1, and is written to confirm the 
compute_square function works as intended.
+        def test_compute_square(self):
+            numbers=[1,2,3]
+
+
+        with TestPipeline() as p:
+            output = p | beam.Create([1,2,3])
+                       | beam.Map(compute_square)
+        assert_that(output, equal_to([1,4,9]))
+
+
+
+    # This test corresponds to pipeline p2, and is written to confirm that our 
map operation works as intended.
+       def test_strip_map(self):
+               strings= [' Strawberry   \n','   Carrot   \n','   Eggplant   
\n']
+               with TestPipeline() as p:
+                       output = p | beam.Create(strings)
+                               | beam.Map(str.strip)
+        assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:
+
+1. Test all error messages you raise.
+2. Cover any edge cases that might be present in your data.
+3. Notice that in pipeline 1, we could have written the `beam.Map` step as  
`beam.Map(lambda x: x**2)`, instead of `beam.Map(compute_square)`. The latter 
(separating the lambda into a helper function) is the recommended approach for 
more testable code, as changes to the function would be modularized.
+4. Use the `assert_that` statement to ensure that PCollection values match up 
correctly, such as the following example:
+
+
+      class TestBeam(unittest.TestCase):
+          def test_custom_function(self):
+              with TestPipeline() as p:
+                input = p | beam.ParDo(custom_function(("1","2","3"))

Review Comment:
   Nit: If we want ints, lets use ints, if we want strings, let's use strings 
(like `"a", "b", "c"` or fruit names or whatever, not numeric strings).



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText

Review Comment:
   Java-style imports?



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                   | beam.ParDo(lambda x: custom_function(x))
+                   | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+    def custom_function(x):
+         ...
+         returned_record = requests.get("http://my-api-call.com";)
+         ...
+         if len(returned_record)!=10:
+            raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors 
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors 
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the 
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding 
tests for any lambda functions utilized within these Beam primitives. 
Additionally, even if you’re using a predefined function, treat the entire 
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we 
should write a unit test to ensure that our function works as intended.
+
+    def compute_square(element):
+        return element**2
+
+    with beam.Pipeline(argv=self.args) as p1:
+        result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(compute_square)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a 
predefined function, we don’t need to unit test the function, as `str.strip`, 
is tested elsewhere. However, we do need to test the output of the `beam.Map` 
function.
+
+    with beam.Pipeline(argv=self.args) as p2:
+        result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(str.strip)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+    # The following packages are imported for unit testing.
+    import unittest
+    import apache_beam as beam
+
+
+    @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+    class TestBeam(unittest.TestCase):
+
+    # This test corresponds to pipeline p1, and is written to confirm the 
compute_square function works as intended.
+        def test_compute_square(self):
+            numbers=[1,2,3]
+
+
+        with TestPipeline() as p:
+            output = p | beam.Create([1,2,3])
+                       | beam.Map(compute_square)
+        assert_that(output, equal_to([1,4,9]))
+
+
+
+    # This test corresponds to pipeline p2, and is written to confirm that our 
map operation works as intended.
+       def test_strip_map(self):
+               strings= [' Strawberry   \n','   Carrot   \n','   Eggplant   
\n']
+               with TestPipeline() as p:
+                       output = p | beam.Create(strings)
+                               | beam.Map(str.strip)
+        assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:
+
+1. Test all error messages you raise.
+2. Cover any edge cases that might be present in your data.
+3. Notice that in pipeline 1, we could have written the `beam.Map` step as  
`beam.Map(lambda x: x**2)`, instead of `beam.Map(compute_square)`. The latter 
(separating the lambda into a helper function) is the recommended approach for 
more testable code, as changes to the function would be modularized.
+4. Use the `assert_that` statement to ensure that PCollection values match up 
correctly, such as the following example:
+
+
+      class TestBeam(unittest.TestCase):
+          def test_custom_function(self):
+              with TestPipeline() as p:
+                input = p | beam.ParDo(custom_function(("1","2","3"))
+              assert_that(input, equal_to(["1","2","3"]))
+
+
+5. If needed, use mocking to mock any API calls that might be present in your 
ParDo function. The purpose of mocking is to test your functionality 
extensively, even if this testing requires a specific response from an API call.
+
+The following snippet is based off of example pipeline 1, from the top of this 
blog post.
+
+      # We import the mock package for mocking functionality
+      import mock
+
+
+      @mock.patch.object(CustomFunction, 'get_record')
+      def test_error_message_wrong_length(self, get_record):
+        record = ["field1","field2",...]
+        get_record.return_value = record

Review Comment:
   I am not following this at all. Where is get_record being called? Why is it 
being mocked? 



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                   | beam.ParDo(lambda x: custom_function(x))
+                   | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+    def custom_function(x):
+         ...
+         returned_record = requests.get("http://my-api-call.com";)
+         ...
+         if len(returned_record)!=10:
+            raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors 
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors 
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the 
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding 
tests for any lambda functions utilized within these Beam primitives. 
Additionally, even if you’re using a predefined function, treat the entire 
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we 
should write a unit test to ensure that our function works as intended.
+
+    def compute_square(element):
+        return element**2
+
+    with beam.Pipeline(argv=self.args) as p1:
+        result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(compute_square)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a 
predefined function, we don’t need to unit test the function, as `str.strip`, 
is tested elsewhere. However, we do need to test the output of the `beam.Map` 
function.
+
+    with beam.Pipeline(argv=self.args) as p2:
+        result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(str.strip)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+    # The following packages are imported for unit testing.
+    import unittest
+    import apache_beam as beam
+
+
+    @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+    class TestBeam(unittest.TestCase):
+
+    # This test corresponds to pipeline p1, and is written to confirm the 
compute_square function works as intended.
+        def test_compute_square(self):
+            numbers=[1,2,3]
+
+
+        with TestPipeline() as p:
+            output = p | beam.Create([1,2,3])
+                       | beam.Map(compute_square)
+        assert_that(output, equal_to([1,4,9]))
+
+
+
+    # This test corresponds to pipeline p2, and is written to confirm that our 
map operation works as intended.
+       def test_strip_map(self):
+               strings= [' Strawberry   \n','   Carrot   \n','   Eggplant   
\n']
+               with TestPipeline() as p:
+                       output = p | beam.Create(strings)
+                               | beam.Map(str.strip)
+        assert_that(output,['Strawberry','Carrot','Eggplant'])
+
+
+
+The following cover other testing best practices:
+
+1. Test all error messages you raise.
+2. Cover any edge cases that might be present in your data.
+3. Notice that in pipeline 1, we could have written the `beam.Map` step as  
`beam.Map(lambda x: x**2)`, instead of `beam.Map(compute_square)`. The latter 
(separating the lambda into a helper function) is the recommended approach for 
more testable code, as changes to the function would be modularized.
+4. Use the `assert_that` statement to ensure that PCollection values match up 
correctly, such as the following example:
+
+
+      class TestBeam(unittest.TestCase):
+          def test_custom_function(self):
+              with TestPipeline() as p:
+                input = p | beam.ParDo(custom_function(("1","2","3"))
+              assert_that(input, equal_to(["1","2","3"]))

Review Comment:
   I don't think this works, but I'm also trying to understand what it's even 
trying to do. You can't apply a ParDo to a raw pipeline object. ParDo accepts a 
DoFn, is `custom_function(some_tuple)` returning a DoFn that yields the tuple? 
   
   Again, if we actually ran this code we'd uncover these issues. 



##########
website/www/site/content/en/blog/unit-testing-blog.md:
##########
@@ -0,0 +1,149 @@
+---
+title:  "So You Want to Write Tests on Your Beam Pipeline?"
+date:   2024-07-08 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - svetakvsundhar
+
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+## So You Want to Write Tests on your Beam pipeline?
+Testing remains one of the most fundamental components of software 
engineering. In this blog post, we shed light on some of the constructs that 
Apache Beam provides to allow for testing. We cover an opinionated set of best 
practices to write unit tests for your data pipeline in this post. Note that 
this post does not include integration tests, and those should be authored 
separately. The examples used in this post are in Python, but the concepts 
translate broadly across SDKs.
+
+Suppose we write a particular PTransform that reads data from a CSV file, gets 
passed through a custom function for parsing, and is written back to another 
Google Cloud Storage bucket (we need to do some custom data formatting to have 
data prepared for a downstream application).
+
+
+The pipeline is structured as follows:
+
+    #The following packages are used to run the example pipelines
+    import apache_beam as beam
+    import apache_beam.io.textio.ReadFromText
+    import apache_beam.io.textio.WriteToText
+
+    with beam.Pipeline(argv=self.args) as p:
+        result = p | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                   | beam.ParDo(lambda x: custom_function(x))
+                   | WriteToText("gs://my-output-bucket-location/")
+
+We then add a custom function to our code:
+
+    def custom_function(x):
+         ...
+         returned_record = requests.get("http://my-api-call.com";)
+         ...
+         if len(returned_record)!=10:
+            raise ValueError("Length of record does not match expected length")
+
+In this scenario, we recommend the following best practices:
+
+1. You don’t need to write any unit tests for the already supported connectors 
in the Beam Library, such as ReadFromBigQuery and WriteToGCS. These connectors 
are already tested in Beam’s test suite to ensure correct functionality.
+2. You should write unit tests for any custom operations introduced in the 
pipeline, such as Map, FlatMap, Filter, ParDo, and so on. We recommend adding 
tests for any lambda functions utilized within these Beam primitives. 
Additionally, even if you’re using a predefined function, treat the entire 
transform as a unit, and test it.
+
+### Example Pipeline 1
+Let’s use the following pipeline as an example. Because we have a function, we 
should write a unit test to ensure that our function works as intended.
+
+    def compute_square(element):
+        return element**2
+
+    with beam.Pipeline(argv=self.args) as p1:
+        result = p1 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(compute_square)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+### Example Pipeline 2
+
+Now let’s use the following pipeline as another example. Because we use a 
predefined function, we don’t need to unit test the function, as `str.strip`, 
is tested elsewhere. However, we do need to test the output of the `beam.Map` 
function.
+
+    with beam.Pipeline(argv=self.args) as p2:
+        result = p2 | ReadFromText("gs://my-storage-bucket/csv_location.csv")
+                    | beam.Map(str.strip)
+                    | WriteToText("gs://my-output-bucket-location/")
+
+
+Here are the corresponding tests for both pipelines:
+
+    # The following packages are imported for unit testing.
+    import unittest
+    import apache_beam as beam
+
+
+    @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
+    class TestBeam(unittest.TestCase):
+
+    # This test corresponds to pipeline p1, and is written to confirm the 
compute_square function works as intended.
+        def test_compute_square(self):
+            numbers=[1,2,3]
+
+
+        with TestPipeline() as p:
+            output = p | beam.Create([1,2,3])
+                       | beam.Map(compute_square)
+        assert_that(output, equal_to([1,4,9]))
+
+
+
+    # This test corresponds to pipeline p2, and is written to confirm that our 
map operation works as intended.
+       def test_strip_map(self):
+               strings= [' Strawberry   \n','   Carrot   \n','   Eggplant   
\n']
+               with TestPipeline() as p:
+                       output = p | beam.Create(strings)
+                               | beam.Map(str.strip)
+        assert_that(output,['Strawberry','Carrot','Eggplant'])

Review Comment:
   I commented on this above, but let's ensure they're continuously run. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to