yeandy commented on code in PR #23094:
URL: https://github.com/apache/beam/pull/23094#discussion_r967110450
##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/src/ingest.py:
##########
@@ -0,0 +1,57 @@
+"""Dummy ingestion function that fetches data from one file and simply copies
it to another."""
+
+import argparse
+from pathlib import Path
+import time
+
+
+def parse_args():
+ """Parse ingestion arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--ingested-dataset-path", type=str,
+ help="Path to save the ingested dataset to.")
+ parser.add_argument(
+ "--base-artifact-path", type=str,
+ help="Base path to store pipeline artifacts.")
+ return parser.parse_args()
+
+
+def dummy_ingest_data(
+ ingested_dataset_path: str,
+ base_artifact_path: str):
+ """Dummy data ingestion step that returns an uri
+ to the data it has 'ingested' as jsonlines.
+
+ Args:
+ data_ingestion_target (str): uri to the data that was scraped and
+ ingested by the component"""
+ # timestamp as unique id for the component execution
+ timestamp = int(time.time())
+
+ # create directory to store the actual data
+ target_path =
f"{base_artifact_path}/ingestion/ingested_dataset_{timestamp}.jsonl"
+ # if the target path is a google cloud storage path convert the path to the
gcsfuse path
+ target_path_gcsfuse = target_path.replace("gs://", "/gcs/")
+ Path(target_path_gcsfuse).parent.mkdir(parents=True, exist_ok=True)
+
+ with open(target_path_gcsfuse, 'w') as f:
+ f.writelines([
+ """{"image_id": 318556, "id": 255, "caption": "An angled view of a
beautifully decorated bathroom.", "image_url":
"http://farm4.staticflickr.com/3133/3378902101_3c9fa16b84_z.jpg", "image_name":
"COCO_train2014_000000318556.jpg", "image_license":
"Attribution-NonCommercial-ShareAlike License"}\n""",
+ """{"image_id": 476220, "id": 314, "caption": "An empty kitchen with
white and black appliances.", "image_url":
"http://farm7.staticflickr.com/6173/6207941582_b69380c020_z.jpg", "image_name":
"COCO_train2014_000000476220.jpg", "image_license": "Attribution-NonCommercial
License"}\n""",
+ """{"image_id": 134754, "id": 425, "caption": "Two people carrying surf
boards on a beach.", "image_url":
"http://farm9.staticflickr.com/8500/8398513396_b6a1f11a4b_z.jpg", "image_name":
"COCO_train2014_000000134754.jpg", "image_license":
"Attribution-NonCommercial-NoDerivs License"}"""
+ ])
+
+ # the directory where the output file is created may or may not exists
+ # so we have to create it.
+ # KFP v1 components can only write output to files. The outpout of this
Review Comment:
```suggestion
# KFP v1 components can only write output to files. The output of this
```
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices
and guiding principles that aim to make the development and maintenance of
machine learning systems seamless and efficient. Simply put, MLOps is most
often about automating machine learning workflows throughout the model and data
lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow
Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/),
[Apache
Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and
[TFX](https://www.tensorflow.org/tfx/guide).
+
+So what does all of this have to do with Beam? Well, since we established that
Beam is a great tool for a range of ML tasks, a beam pipeline can either be
used as a standalone data processing job or can be part of a larger sequence of
steps in such a workflow. In the latter case, the beam DAG is just one node in
the overarching DAG composed by the workflow orchestrator. This results in a
DAG in a DAG, as illustrated by the example below.
+
+
+
+It is important to understand the key difference between the Beam DAG and the
orchestrating DAG. The Beam DAG processes data and passes that data between the
nodes of its DAG. The focus of Beam is on parallelization and enabling both
batch and streaming jobs. In contrast, the orchestration DAG schedules and
monitors steps in the workflow and passed between the nodes of the DAG are
execution parameters, metadata and artifacts. An example of such an artifact
could be a trained model or a dataset. Such artifacts are often passed by a
reference URI and not by value.
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be
executed. [Natively supported orchestrators for
TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow,
Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the
[TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):
+> "Several TFX components rely on Beam for distributed data processing. In
addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG.
Beam orchestrator uses a different BeamRunner than the one which is used for
component data processing."
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used
in production environments. It simply enables to debug the TFX pipeline locally
on Beam’s DirectRunner without the need for the extra setup that is needed for
Airflow or Kubeflow.
+
+## Preprocessing example
+
+Let’s get practical and take a look at two such orchestrated ML workflows, one
with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two
frameworks achieve the same goal of creating workflows, but have their own
distinct advantages and disadvantages: KFP requires you to create your workflow
components from scratch and requires a user to explicitly indicate which
artifacts should be passed between components and in what way. In contrast, TFX
offers a number of prebuilt components and takes care of the artifact passing
more implicitly. Clearly, there is a trade-off to be considered between
flexibility and programming overhead when choosing between the two frameworks.
We will start by looking at an example with KFP and then transition to TFX to
show TFX takes care of a lot of functionality that we had to define by hand in
the KFP example.
+
+To not overcomplicate things, the workflows are limited to three components:
data ingestion, data preprocessing and model training. Depending on the
scenario, a range of extra components could be added such as model evaluation,
model deployment… We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
+
+The dataset we will use consists image-caption pairs, i.e. images paired with
a textual caption describing the content of the image. These pairs are taken
from captions subset of the [MSCOCO 2014
dataset](https://cocodataset.org/#home). This multi-modal data (image + text)
gives us the opportunity to experiment with preprocessing operations for both
modalities.
Review Comment:
```suggestion
The dataset we will use consists of image-caption pairs, i.e. images paired
with a textual caption describing the content of the image. These pairs are
taken from captions subset of the [MSCOCO 2014
dataset](https://cocodataset.org/#home). This multi-modal data (image + text)
gives us the opportunity to experiment with preprocessing operations for both
modalities.
```
##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/src/train.py:
##########
@@ -0,0 +1,62 @@
+"""Dummy training function that loads a pretrained model from the torch hub
and saves it."""
+
+import argparse
+from pathlib import Path
+import time
+
+import torch
+
+
+def parse_args():
+ """Parse ingestion arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--preprocessed-dataset-path", type=str,
+ help="Path to the preprocessed dataset.")
+ parser.add_argument(
+ "--trained-model-path", type=str,
+ help="Output path to the trained model.")
+ parser.add_argument(
+ "--base-artifact-path", type=str,
+ help="Base path to store pipeline artifacts.")
+ return parser.parse_args()
+
+
+def train_model(
+ preprocessed_dataset_path: str,
+ trained_model_path: str,
+ base_artifact_path: str):
+ """Dummy to load a model from the torch hub and save it.
+
+ Args:
+ preprocessed_dataset_path (str): Path to the preprocessed dataset
+ trained_model_path (str): Output path for the trained model
+ base_artifact_path (str): path to the base directory of where artifacts
can be stored for
+ this component
+ """
+ # timestamp for the component execution
+ timestamp = time.time()
+
+ # create model or load a pretrained one
+ model = torch.hub.load('pytorch/vision:v0.10.0', 'vgg16', pretrained=True)
+
+ # TODO: train on preprocessed dataset
Review Comment:
Is this a TODO for demonstration purposes (i.e. the user needs to train the
model here), or is it a TODO for this PR?
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices
and guiding principles that aim to make the development and maintenance of
machine learning systems seamless and efficient. Simply put, MLOps is most
often about automating machine learning workflows throughout the model and data
lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow
Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/),
[Apache
Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and
[TFX](https://www.tensorflow.org/tfx/guide).
+
+So what does all of this have to do with Beam? Well, since we established that
Beam is a great tool for a range of ML tasks, a beam pipeline can either be
used as a standalone data processing job or can be part of a larger sequence of
steps in such a workflow. In the latter case, the beam DAG is just one node in
the overarching DAG composed by the workflow orchestrator. This results in a
DAG in a DAG, as illustrated by the example below.
+
+
+
+It is important to understand the key difference between the Beam DAG and the
orchestrating DAG. The Beam DAG processes data and passes that data between the
nodes of its DAG. The focus of Beam is on parallelization and enabling both
batch and streaming jobs. In contrast, the orchestration DAG schedules and
monitors steps in the workflow and passed between the nodes of the DAG are
execution parameters, metadata and artifacts. An example of such an artifact
could be a trained model or a dataset. Such artifacts are often passed by a
reference URI and not by value.
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be
executed. [Natively supported orchestrators for
TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow,
Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the
[TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):
+> "Several TFX components rely on Beam for distributed data processing. In
addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG.
Beam orchestrator uses a different BeamRunner than the one which is used for
component data processing."
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used
in production environments. It simply enables to debug the TFX pipeline locally
on Beam’s DirectRunner without the need for the extra setup that is needed for
Airflow or Kubeflow.
+
+## Preprocessing example
+
+Let’s get practical and take a look at two such orchestrated ML workflows, one
with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two
frameworks achieve the same goal of creating workflows, but have their own
distinct advantages and disadvantages: KFP requires you to create your workflow
components from scratch and requires a user to explicitly indicate which
artifacts should be passed between components and in what way. In contrast, TFX
offers a number of prebuilt components and takes care of the artifact passing
more implicitly. Clearly, there is a trade-off to be considered between
flexibility and programming overhead when choosing between the two frameworks.
We will start by looking at an example with KFP and then transition to TFX to
show TFX takes care of a lot of functionality that we had to define by hand in
the KFP example.
+
+To not overcomplicate things, the workflows are limited to three components:
data ingestion, data preprocessing and model training. Depending on the
scenario, a range of extra components could be added such as model evaluation,
model deployment… We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
+
+The dataset we will use consists image-caption pairs, i.e. images paired with
a textual caption describing the content of the image. These pairs are taken
from captions subset of the [MSCOCO 2014
dataset](https://cocodataset.org/#home). This multi-modal data (image + text)
gives us the opportunity to experiment with preprocessing operations for both
modalities.
+
+### Kubeflow pipelines (KFP)
+
+In order to execute our ML workflow with KFP we must perform three steps:
+
+1. Create the KFP components by specifying the interface to the components and
by writing and containerizing the implementation of the component logic
+2. Create the KFP pipeline by connecting the created components and specifying
how inputs and outputs should be passed from between components and compiling
the pipeline definition to a full pipeline definition.
+3. Execute the KFP pipeline by submitting it to a KFP client endpoint.
+
+The full example code can be found
[here](sdks/python/apache_beam/examples/ml-orchestration/kfp/)
+
+#### Create the KFP components
+
+This is our target file structure:
+
+ kfp
+ ├── pipeline.py
+ ├── components
+ │ ├── ingestion
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── ingest.py
+ │ ├── preprocessing
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── preprocess.py
+ │ └── train
+ │ ├── Dockerfile
+ │ ├── component.yaml
+ │ ├── requirements.txt
+ │ └── src
+ │ └── train.py
+ └── requirements.txt
+
+Let’s start with the component specifications. The full preprocessing
component specification is illustrated below. The inputs are the path where the
ingested dataset was saved by the ingest component and a path to a directory
where the component can store artifacts. The specifications for the ingestion
and train component are similar and can be found here and here, respectively.
+
+>Note: we are using the KFP v1 SDK, because v2 is still in
[beta](https://www.kubeflow.org/docs/started/support/#application-status). The
v2 SDK introduces some new options for specifying the component interface with
more native support for input and output artifacts. To see how to migrate
components from v1 to v2, consult the [KFP
docs](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/v2-component-io/).
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
preprocessing_component_definition >}}
+{{< /highlight >}}
+
+In this case, each component shares an identical Dockerfile but extra
component-specific dependencies could be added where necessary.
+
+{{< highlight language="Dockerfile"
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
component_dockerfile >}}
+{{< /highlight >}}
+
+With the component specification and containerization out of the way we can
look at the actual implementation of the preprocessing component.
+
+Since KFP provides the input and output arguments as command-line arguments,
an argumentparser is needed.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kf/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
preprocess_component_argparse >}}
+{{< /highlight >}}
+
+The implementation of the `preprocess_dataset` function contains the Beam
pipeline code and the Beam pipeline options to select the desired runner. The
executed preprocessing involves downloading the image bytes from their url,
converting them to a Torch Tensor and resizing to the desired size. The caption
undergoes a series of string manipulations to ensure that our model receives
clean uniform image descriptions (Tokenization is not yet done here, but could
be included here as well if the vocabulary is known). Finally each element is
serialized and written to [Avro](https://avro.apache.org/docs/1.2.0/) files
(Alternative files formats could be used as well, e.g. TFRecords).
+
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
deploy_preprocessing_beam_pipeline >}}
+{{< /highlight >}}
+
+It also contains the necessary code to perform the component IO. First, a
target path is constructed to store the preprocessed dataset based on the
component input parameter `base_artifact_path` and a timestamp. Output values
from components can only be returned as files so we write the value of the
constructed target path to an output file that was provided by KFP to our
component.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
kfp_component_input_output >}}
+{{< /highlight >}}
+
+Since we are mainly interested in the preprocessing component to show how a
Beam pipeline can be integrated into a larger ML workflow, we will not cover
the implementation of the ingestion and train component in depth.
Implementations of dummy components that mock their behavior are provided in
the full example code.
+
+#### Create the pipeline definition
+
+`pipeline.py` first loads the created components from their specification
`.yaml` file.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
load_kfp_components >}}
+{{< /highlight >}}
+
+After that, the pipeline is created and the required components inputs and
outputs are specified manually.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
define_kfp_pipeline >}}
+{{< /highlight >}}
+
+Finally, the defined pipeline is compiled and a `pipeline.json` specification
file is generated.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
compile_kfp_pipeline >}}
+{{< /highlight >}}
+
+
+#### Execute the KFP pipeline
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+client = kfp.Client()
+try:
+ experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
+except:
+ experiment = client.create_experiment(EXPERIMENT_NAME)
+arguments = {}
+
+run_result = client.run_pipeline(experiment.id,
+ RUN_NAME,
+ PIPELINE_FILENAME,
+ arguments)
+{{< /highlight >}}
+
+
+### Tensorflow Extended (TFX)
+
+The way of working for TFX is similar to the approach for KFP as illustrated
above: Define the individual workflow components, connect them in a pipeline
object and run the pipeline in the target environment. However, what makes TFX
different is that it has already built a set of Python packages that are
libraries to create workflow components. So unlike the KFP example, we do not
need to start from scratch by writing and containerizing our code. What is left
for the users to do is pick which of those TFX components are relevant to their
specific workflow and adapt their functionality to the specific use case using
the library. The image below shows the available components and their
corresponding libraries. The link with Apache Beam is that TFX relies heavily
on it to implement data-parallel pipelines in these libraries. This means that
components created with these libraries will need to be run on one of the
support Beam runners. The full example code can again be found [here](sdk
s/python/apache_beam/examples/ml-orchestration/tfx/)
Review Comment:
```suggestion
The way of working for TFX is similar to the approach for KFP as illustrated
above: Define the individual workflow components, connect them in a pipeline
object, and run the pipeline in the target environment. However, what makes TFX
different is that it has already built a set of Python packages that are
libraries to create workflow components. So unlike the KFP example, we do not
need to start from scratch by writing and containerizing our code. What is left
for the users to do is pick which of those TFX components are relevant to their
specific workflow and adapt their functionality to the specific use case using
the library. The image below shows the available components and their
corresponding libraries. The link with Apache Beam is that TFX relies heavily
on it to implement data-parallel pipelines in these libraries. This means that
components created with these libraries will need to be run on one of the
support Beam runners. The full example code can again be found [here](
sdks/python/apache_beam/examples/ml-orchestration/tfx/)
```
##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/requirements.txt:
##########
@@ -0,0 +1,2 @@
+kfp==1.8.13
+google-cloud-aiplatform==1.15
Review Comment:
I may have missed this, but is it explained how this file is used?
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
Review Comment:
```suggestion
Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. A full ML workflow will often contain a
range of other steps including data ingestion, data validation, data
preprocessing, model evaluation, model deployment, data drift detection, etc.
Furthermore, it’s essential to keep track of metadata and artifacts from your
experiments to answer important questions like:
- What data was this model trained on and with which training parameters?
- When was this model deployed and what accuracy did it get on a test
dataset?
Without this knowledge at your disposal, it will become increasingly
difficult to troubleshoot, monitor and improve your ML solutions as they grow
in size.
```
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices
and guiding principles that aim to make the development and maintenance of
machine learning systems seamless and efficient. Simply put, MLOps is most
often about automating machine learning workflows throughout the model and data
lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow
Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/),
[Apache
Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and
[TFX](https://www.tensorflow.org/tfx/guide).
+
+So what does all of this have to do with Beam? Well, since we established that
Beam is a great tool for a range of ML tasks, a beam pipeline can either be
used as a standalone data processing job or can be part of a larger sequence of
steps in such a workflow. In the latter case, the beam DAG is just one node in
the overarching DAG composed by the workflow orchestrator. This results in a
DAG in a DAG, as illustrated by the example below.
+
+
+
+It is important to understand the key difference between the Beam DAG and the
orchestrating DAG. The Beam DAG processes data and passes that data between the
nodes of its DAG. The focus of Beam is on parallelization and enabling both
batch and streaming jobs. In contrast, the orchestration DAG schedules and
monitors steps in the workflow and passed between the nodes of the DAG are
execution parameters, metadata and artifacts. An example of such an artifact
could be a trained model or a dataset. Such artifacts are often passed by a
reference URI and not by value.
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be
executed. [Natively supported orchestrators for
TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow,
Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the
[TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):
+> "Several TFX components rely on Beam for distributed data processing. In
addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG.
Beam orchestrator uses a different BeamRunner than the one which is used for
component data processing."
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used
in production environments. It simply enables to debug the TFX pipeline locally
on Beam’s DirectRunner without the need for the extra setup that is needed for
Airflow or Kubeflow.
+
+## Preprocessing example
+
+Let’s get practical and take a look at two such orchestrated ML workflows, one
with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two
frameworks achieve the same goal of creating workflows, but have their own
distinct advantages and disadvantages: KFP requires you to create your workflow
components from scratch and requires a user to explicitly indicate which
artifacts should be passed between components and in what way. In contrast, TFX
offers a number of prebuilt components and takes care of the artifact passing
more implicitly. Clearly, there is a trade-off to be considered between
flexibility and programming overhead when choosing between the two frameworks.
We will start by looking at an example with KFP and then transition to TFX to
show TFX takes care of a lot of functionality that we had to define by hand in
the KFP example.
+
+To not overcomplicate things, the workflows are limited to three components:
data ingestion, data preprocessing and model training. Depending on the
scenario, a range of extra components could be added such as model evaluation,
model deployment… We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
+
+The dataset we will use consists image-caption pairs, i.e. images paired with
a textual caption describing the content of the image. These pairs are taken
from captions subset of the [MSCOCO 2014
dataset](https://cocodataset.org/#home). This multi-modal data (image + text)
gives us the opportunity to experiment with preprocessing operations for both
modalities.
+
+### Kubeflow pipelines (KFP)
+
+In order to execute our ML workflow with KFP we must perform three steps:
+
+1. Create the KFP components by specifying the interface to the components and
by writing and containerizing the implementation of the component logic
+2. Create the KFP pipeline by connecting the created components and specifying
how inputs and outputs should be passed from between components and compiling
the pipeline definition to a full pipeline definition.
+3. Execute the KFP pipeline by submitting it to a KFP client endpoint.
+
+The full example code can be found
[here](sdks/python/apache_beam/examples/ml-orchestration/kfp/)
Review Comment:
I assume this link works? Does a relative reference work? I can't test since
those files are also added to this PR.
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices
and guiding principles that aim to make the development and maintenance of
machine learning systems seamless and efficient. Simply put, MLOps is most
often about automating machine learning workflows throughout the model and data
lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow
Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/),
[Apache
Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and
[TFX](https://www.tensorflow.org/tfx/guide).
+
+So what does all of this have to do with Beam? Well, since we established that
Beam is a great tool for a range of ML tasks, a beam pipeline can either be
used as a standalone data processing job or can be part of a larger sequence of
steps in such a workflow. In the latter case, the beam DAG is just one node in
the overarching DAG composed by the workflow orchestrator. This results in a
DAG in a DAG, as illustrated by the example below.
+
+
+
+It is important to understand the key difference between the Beam DAG and the
orchestrating DAG. The Beam DAG processes data and passes that data between the
nodes of its DAG. The focus of Beam is on parallelization and enabling both
batch and streaming jobs. In contrast, the orchestration DAG schedules and
monitors steps in the workflow and passed between the nodes of the DAG are
execution parameters, metadata and artifacts. An example of such an artifact
could be a trained model or a dataset. Such artifacts are often passed by a
reference URI and not by value.
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be
executed. [Natively supported orchestrators for
TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow,
Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the
[TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):
+> "Several TFX components rely on Beam for distributed data processing. In
addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG.
Beam orchestrator uses a different BeamRunner than the one which is used for
component data processing."
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used
in production environments. It simply enables to debug the TFX pipeline locally
on Beam’s DirectRunner without the need for the extra setup that is needed for
Airflow or Kubeflow.
+
+## Preprocessing example
+
+Let’s get practical and take a look at two such orchestrated ML workflows, one
with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two
frameworks achieve the same goal of creating workflows, but have their own
distinct advantages and disadvantages: KFP requires you to create your workflow
components from scratch and requires a user to explicitly indicate which
artifacts should be passed between components and in what way. In contrast, TFX
offers a number of prebuilt components and takes care of the artifact passing
more implicitly. Clearly, there is a trade-off to be considered between
flexibility and programming overhead when choosing between the two frameworks.
We will start by looking at an example with KFP and then transition to TFX to
show TFX takes care of a lot of functionality that we had to define by hand in
the KFP example.
+
+To not overcomplicate things, the workflows are limited to three components:
data ingestion, data preprocessing and model training. Depending on the
scenario, a range of extra components could be added such as model evaluation,
model deployment… We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
Review Comment:
```suggestion
For simplicity, we will showcase workflows with only three components: data
ingestion, data preprocessing and model training. Depending on the scenario, a
range of extra components could be added such as model evaluation, model
deployment, etc. We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
```
##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py:
##########
@@ -0,0 +1,160 @@
+"""Dummy ingestion function that fetches data from one file and simply copies
it to another."""
+import re
+import json
+import io
+import argparse
+import time
+from pathlib import Path
+
+import requests
+from PIL import Image, UnidentifiedImageError
+import numpy as np
+import torch
+import torchvision.transforms as T
+import torchvision.transforms.functional as TF
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+PROJECT_ID = "<project-id>"
+LOCATION = "<project-location>"
+STAGING_DIR = "<uri-to-data-flow-staging-dir>"
+BEAM_RUNNER = "<beam-runner>"
+
+# [START preprocess_component_argparse]
+def parse_args():
+ """Parse preprocessing arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--ingested-dataset-path", type=str,
+ help="Path to the ingested dataset")
+ parser.add_argument(
+ "--preprocessed-dataset-path", type=str,
+ help="The target directory for the ingested dataset.")
+ parser.add_argument(
+ "--base-artifact-path", type=str,
+ help="Base path to store pipeline artifacts.")
+ return parser.parse_args()
Review Comment:
Do we want to make these required?
##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/src/train.py:
##########
@@ -0,0 +1,62 @@
+"""Dummy training function that loads a pretrained model from the torch hub
and saves it."""
+
+import argparse
+from pathlib import Path
+import time
+
+import torch
+
+
+def parse_args():
+ """Parse ingestion arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--preprocessed-dataset-path", type=str,
+ help="Path to the preprocessed dataset.")
+ parser.add_argument(
+ "--trained-model-path", type=str,
+ help="Output path to the trained model.")
+ parser.add_argument(
+ "--base-artifact-path", type=str,
+ help="Base path to store pipeline artifacts.")
+ return parser.parse_args()
+
+
+def train_model(
Review Comment:
Can we add something like # [START train_component] and highlight this
section in the orchestration.md docs?
##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/src/ingest.py:
##########
@@ -0,0 +1,57 @@
+"""Dummy ingestion function that fetches data from one file and simply copies
it to another."""
+
+import argparse
+from pathlib import Path
+import time
+
+
+def parse_args():
+ """Parse ingestion arguments."""
+ parser = argparse.ArgumentParser()
+ parser.add_argument(
+ "--ingested-dataset-path", type=str,
+ help="Path to save the ingested dataset to.")
+ parser.add_argument(
+ "--base-artifact-path", type=str,
+ help="Base path to store pipeline artifacts.")
+ return parser.parse_args()
+
+
+def dummy_ingest_data(
Review Comment:
Can we add something like `# [START ingest_component]` and highlight this
section in the `orchestration.md` docs?
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices
and guiding principles that aim to make the development and maintenance of
machine learning systems seamless and efficient. Simply put, MLOps is most
often about automating machine learning workflows throughout the model and data
lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow
Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/),
[Apache
Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and
[TFX](https://www.tensorflow.org/tfx/guide).
+
+So what does all of this have to do with Beam? Well, since we established that
Beam is a great tool for a range of ML tasks, a beam pipeline can either be
used as a standalone data processing job or can be part of a larger sequence of
steps in such a workflow. In the latter case, the beam DAG is just one node in
the overarching DAG composed by the workflow orchestrator. This results in a
DAG in a DAG, as illustrated by the example below.
+
+
+
+It is important to understand the key difference between the Beam DAG and the
orchestrating DAG. The Beam DAG processes data and passes that data between the
nodes of its DAG. The focus of Beam is on parallelization and enabling both
batch and streaming jobs. In contrast, the orchestration DAG schedules and
monitors steps in the workflow and passed between the nodes of the DAG are
execution parameters, metadata and artifacts. An example of such an artifact
could be a trained model or a dataset. Such artifacts are often passed by a
reference URI and not by value.
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be
executed. [Natively supported orchestrators for
TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow,
Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the
[TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):
+> "Several TFX components rely on Beam for distributed data processing. In
addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG.
Beam orchestrator uses a different BeamRunner than the one which is used for
component data processing."
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used
in production environments. It simply enables to debug the TFX pipeline locally
on Beam’s DirectRunner without the need for the extra setup that is needed for
Airflow or Kubeflow.
+
+## Preprocessing example
+
+Let’s get practical and take a look at two such orchestrated ML workflows, one
with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two
frameworks achieve the same goal of creating workflows, but have their own
distinct advantages and disadvantages: KFP requires you to create your workflow
components from scratch and requires a user to explicitly indicate which
artifacts should be passed between components and in what way. In contrast, TFX
offers a number of prebuilt components and takes care of the artifact passing
more implicitly. Clearly, there is a trade-off to be considered between
flexibility and programming overhead when choosing between the two frameworks.
We will start by looking at an example with KFP and then transition to TFX to
show TFX takes care of a lot of functionality that we had to define by hand in
the KFP example.
+
+To not overcomplicate things, the workflows are limited to three components:
data ingestion, data preprocessing and model training. Depending on the
scenario, a range of extra components could be added such as model evaluation,
model deployment… We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
+
+The dataset we will use consists image-caption pairs, i.e. images paired with
a textual caption describing the content of the image. These pairs are taken
from captions subset of the [MSCOCO 2014
dataset](https://cocodataset.org/#home). This multi-modal data (image + text)
gives us the opportunity to experiment with preprocessing operations for both
modalities.
+
+### Kubeflow pipelines (KFP)
+
+In order to execute our ML workflow with KFP we must perform three steps:
+
+1. Create the KFP components by specifying the interface to the components and
by writing and containerizing the implementation of the component logic
+2. Create the KFP pipeline by connecting the created components and specifying
how inputs and outputs should be passed from between components and compiling
the pipeline definition to a full pipeline definition.
+3. Execute the KFP pipeline by submitting it to a KFP client endpoint.
+
+The full example code can be found
[here](sdks/python/apache_beam/examples/ml-orchestration/kfp/)
+
+#### Create the KFP components
+
+This is our target file structure:
+
+ kfp
+ ├── pipeline.py
+ ├── components
+ │ ├── ingestion
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── ingest.py
+ │ ├── preprocessing
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── preprocess.py
+ │ └── train
+ │ ├── Dockerfile
+ │ ├── component.yaml
+ │ ├── requirements.txt
+ │ └── src
+ │ └── train.py
+ └── requirements.txt
+
+Let’s start with the component specifications. The full preprocessing
component specification is illustrated below. The inputs are the path where the
ingested dataset was saved by the ingest component and a path to a directory
where the component can store artifacts. The specifications for the ingestion
and train component are similar and can be found here and here, respectively.
+
+>Note: we are using the KFP v1 SDK, because v2 is still in
[beta](https://www.kubeflow.org/docs/started/support/#application-status). The
v2 SDK introduces some new options for specifying the component interface with
more native support for input and output artifacts. To see how to migrate
components from v1 to v2, consult the [KFP
docs](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/v2-component-io/).
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
preprocessing_component_definition >}}
+{{< /highlight >}}
+
+In this case, each component shares an identical Dockerfile but extra
component-specific dependencies could be added where necessary.
+
+{{< highlight language="Dockerfile"
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
component_dockerfile >}}
+{{< /highlight >}}
+
+With the component specification and containerization out of the way we can
look at the actual implementation of the preprocessing component.
+
+Since KFP provides the input and output arguments as command-line arguments,
an argumentparser is needed.
Review Comment:
```suggestion
Since KFP provides the input and output arguments as command-line arguments,
an `argumentparser` is needed.
```
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices
and guiding principles that aim to make the development and maintenance of
machine learning systems seamless and efficient. Simply put, MLOps is most
often about automating machine learning workflows throughout the model and data
lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow
Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/),
[Apache
Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and
[TFX](https://www.tensorflow.org/tfx/guide).
+
+So what does all of this have to do with Beam? Well, since we established that
Beam is a great tool for a range of ML tasks, a beam pipeline can either be
used as a standalone data processing job or can be part of a larger sequence of
steps in such a workflow. In the latter case, the beam DAG is just one node in
the overarching DAG composed by the workflow orchestrator. This results in a
DAG in a DAG, as illustrated by the example below.
+
+
+
+It is important to understand the key difference between the Beam DAG and the
orchestrating DAG. The Beam DAG processes data and passes that data between the
nodes of its DAG. The focus of Beam is on parallelization and enabling both
batch and streaming jobs. In contrast, the orchestration DAG schedules and
monitors steps in the workflow and passed between the nodes of the DAG are
execution parameters, metadata and artifacts. An example of such an artifact
could be a trained model or a dataset. Such artifacts are often passed by a
reference URI and not by value.
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be
executed. [Natively supported orchestrators for
TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow,
Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the
[TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):
+> "Several TFX components rely on Beam for distributed data processing. In
addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG.
Beam orchestrator uses a different BeamRunner than the one which is used for
component data processing."
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used
in production environments. It simply enables to debug the TFX pipeline locally
on Beam’s DirectRunner without the need for the extra setup that is needed for
Airflow or Kubeflow.
+
+## Preprocessing example
+
+Let’s get practical and take a look at two such orchestrated ML workflows, one
with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two
frameworks achieve the same goal of creating workflows, but have their own
distinct advantages and disadvantages: KFP requires you to create your workflow
components from scratch and requires a user to explicitly indicate which
artifacts should be passed between components and in what way. In contrast, TFX
offers a number of prebuilt components and takes care of the artifact passing
more implicitly. Clearly, there is a trade-off to be considered between
flexibility and programming overhead when choosing between the two frameworks.
We will start by looking at an example with KFP and then transition to TFX to
show TFX takes care of a lot of functionality that we had to define by hand in
the KFP example.
+
+To not overcomplicate things, the workflows are limited to three components:
data ingestion, data preprocessing and model training. Depending on the
scenario, a range of extra components could be added such as model evaluation,
model deployment… We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
+
+The dataset we will use consists image-caption pairs, i.e. images paired with
a textual caption describing the content of the image. These pairs are taken
from captions subset of the [MSCOCO 2014
dataset](https://cocodataset.org/#home). This multi-modal data (image + text)
gives us the opportunity to experiment with preprocessing operations for both
modalities.
+
+### Kubeflow pipelines (KFP)
+
+In order to execute our ML workflow with KFP we must perform three steps:
+
+1. Create the KFP components by specifying the interface to the components and
by writing and containerizing the implementation of the component logic
+2. Create the KFP pipeline by connecting the created components and specifying
how inputs and outputs should be passed from between components and compiling
the pipeline definition to a full pipeline definition.
+3. Execute the KFP pipeline by submitting it to a KFP client endpoint.
+
+The full example code can be found
[here](sdks/python/apache_beam/examples/ml-orchestration/kfp/)
+
+#### Create the KFP components
+
+This is our target file structure:
+
+ kfp
+ ├── pipeline.py
+ ├── components
+ │ ├── ingestion
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── ingest.py
+ │ ├── preprocessing
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── preprocess.py
+ │ └── train
+ │ ├── Dockerfile
+ │ ├── component.yaml
+ │ ├── requirements.txt
+ │ └── src
+ │ └── train.py
+ └── requirements.txt
+
+Let’s start with the component specifications. The full preprocessing
component specification is illustrated below. The inputs are the path where the
ingested dataset was saved by the ingest component and a path to a directory
where the component can store artifacts. The specifications for the ingestion
and train component are similar and can be found here and here, respectively.
+
+>Note: we are using the KFP v1 SDK, because v2 is still in
[beta](https://www.kubeflow.org/docs/started/support/#application-status). The
v2 SDK introduces some new options for specifying the component interface with
more native support for input and output artifacts. To see how to migrate
components from v1 to v2, consult the [KFP
docs](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/v2-component-io/).
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
preprocessing_component_definition >}}
+{{< /highlight >}}
+
+In this case, each component shares an identical Dockerfile but extra
component-specific dependencies could be added where necessary.
+
+{{< highlight language="Dockerfile"
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
component_dockerfile >}}
+{{< /highlight >}}
+
+With the component specification and containerization out of the way we can
look at the actual implementation of the preprocessing component.
+
+Since KFP provides the input and output arguments as command-line arguments,
an argumentparser is needed.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kf/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
preprocess_component_argparse >}}
+{{< /highlight >}}
+
+The implementation of the `preprocess_dataset` function contains the Beam
pipeline code and the Beam pipeline options to select the desired runner. The
executed preprocessing involves downloading the image bytes from their url,
converting them to a Torch Tensor and resizing to the desired size. The caption
undergoes a series of string manipulations to ensure that our model receives
clean uniform image descriptions (Tokenization is not yet done here, but could
be included here as well if the vocabulary is known). Finally each element is
serialized and written to [Avro](https://avro.apache.org/docs/1.2.0/) files
(Alternative files formats could be used as well, e.g. TFRecords).
+
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
deploy_preprocessing_beam_pipeline >}}
+{{< /highlight >}}
+
+It also contains the necessary code to perform the component IO. First, a
target path is constructed to store the preprocessed dataset based on the
component input parameter `base_artifact_path` and a timestamp. Output values
from components can only be returned as files so we write the value of the
constructed target path to an output file that was provided by KFP to our
component.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
kfp_component_input_output >}}
+{{< /highlight >}}
+
+Since we are mainly interested in the preprocessing component to show how a
Beam pipeline can be integrated into a larger ML workflow, we will not cover
the implementation of the ingestion and train component in depth.
Implementations of dummy components that mock their behavior are provided in
the full example code.
+
+#### Create the pipeline definition
+
+`pipeline.py` first loads the created components from their specification
`.yaml` file.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
load_kfp_components >}}
+{{< /highlight >}}
+
+After that, the pipeline is created and the required components inputs and
outputs are specified manually.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
define_kfp_pipeline >}}
+{{< /highlight >}}
+
+Finally, the defined pipeline is compiled and a `pipeline.json` specification
file is generated.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
compile_kfp_pipeline >}}
+{{< /highlight >}}
+
+
+#### Execute the KFP pipeline
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+client = kfp.Client()
+try:
+ experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
+except:
+ experiment = client.create_experiment(EXPERIMENT_NAME)
+arguments = {}
+
+run_result = client.run_pipeline(experiment.id,
+ RUN_NAME,
+ PIPELINE_FILENAME,
+ arguments)
+{{< /highlight >}}
Review Comment:
Is this code supposed to be in pipeline.py? Can you clarify where this is
supposed to be run?
##########
website/www/site/content/en/documentation/ml/orchestration.md:
##########
@@ -0,0 +1,227 @@
+---
+title: "Orchestration"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Workflow orchestration
+
+## Understanding the Beam DAG
+
+
+Apache Beam is an open source, unified model for defining both batch and
streaming data-parallel processing pipelines. One of the central concepts to
the Beam programming model is the DAG (= Directed Acyclic Graph). Each Beam
pipeline is a DAG that can be constructed through the Beam SDK in your
programming language of choice (from the set of supported beam SDKs). Each node
of this DAG represents a processing step (PTransform) that accepts a collection
of data as input (PCollection) and outputs a transformed collection of data
(PCollection). The edges define how data flows through the pipeline from one
processing step to another. The image below shows an example of such a
pipeline.
+
+
+
+Note that simply defining a pipeline and the corresponding DAG does not mean
that data will start flowing through the pipeline. To actually execute the
pipeline, it has to be deployed to one of the [supported Beam
runners](https://beam.apache.org/documentation/runners/capability-matrix/).
These distributed processing back-ends include Apache Flink, Apache Spark and
Google Cloud Dataflow. A [Direct
Runner](https://beam.apache.org/documentation/runners/direct/) is also provided
to execute the pipeline locally on your machine for development and debugging
purposes. Make sure to check out the [runner capability
matrix](https://beam.apache.org/documentation/runners/capability-matrix/) to
guarantee that the chosen runner supports the data processing steps defined in
your pipeline, especially when using the Direct Runner.
+
+## Orchestrating frameworks
+
+Successfully delivering machine learning projects is about a lot more than
training a model and calling it a day. In addition, a full ML workflow will
often contain a range of other steps including data ingestion, data validation,
data preprocessing, model evaluation, model deployment, data drift detection…
On top of that, it’s essential to keep track of metadata and artifacts from
your experiments to answer important questions like: What data was this model
trained on and with which training parameters? When was this model deployed and
which accuracy did it get on a test dataset? Without this knowledge at your
disposal, it will become increasingly difficult to troubleshoot, monitor and
improve your ML solutions as they grow in size.
+
+The solution: MLOps. MLOps is an umbrella term used to describe best practices
and guiding principles that aim to make the development and maintenance of
machine learning systems seamless and efficient. Simply put, MLOps is most
often about automating machine learning workflows throughout the model and data
lifecycle. Popular frameworks to create these workflow DAGs are [Kubeflow
Pipelines](https://www.kubeflow.org/docs/components/pipelines/introduction/),
[Apache
Airflow](https://airflow.apache.org/docs/apache-airflow/stable/index.html) and
[TFX](https://www.tensorflow.org/tfx/guide).
+
+So what does all of this have to do with Beam? Well, since we established that
Beam is a great tool for a range of ML tasks, a beam pipeline can either be
used as a standalone data processing job or can be part of a larger sequence of
steps in such a workflow. In the latter case, the beam DAG is just one node in
the overarching DAG composed by the workflow orchestrator. This results in a
DAG in a DAG, as illustrated by the example below.
+
+
+
+It is important to understand the key difference between the Beam DAG and the
orchestrating DAG. The Beam DAG processes data and passes that data between the
nodes of its DAG. The focus of Beam is on parallelization and enabling both
batch and streaming jobs. In contrast, the orchestration DAG schedules and
monitors steps in the workflow and passed between the nodes of the DAG are
execution parameters, metadata and artifacts. An example of such an artifact
could be a trained model or a dataset. Such artifacts are often passed by a
reference URI and not by value.
+
+Note: TFX creates a workflow DAG, which needs an orchestrator of its own to be
executed. [Natively supported orchestrators for
TFX](https://www.tensorflow.org/tfx/guide/custom_orchestrator) are Airflow,
Kubeflow Pipelines and, here’s the kicker, Beam itself! As mentioned by the
[TFX docs](https://www.tensorflow.org/tfx/guide/beam_orchestrator):
+> "Several TFX components rely on Beam for distributed data processing. In
addition, TFX can use Apache Beam to orchestrate and execute the pipeline DAG.
Beam orchestrator uses a different BeamRunner than the one which is used for
component data processing."
+
+Caveat: The Beam orchestrator is not meant to be a TFX orchestrator to be used
in production environments. It simply enables to debug the TFX pipeline locally
on Beam’s DirectRunner without the need for the extra setup that is needed for
Airflow or Kubeflow.
+
+## Preprocessing example
+
+Let’s get practical and take a look at two such orchestrated ML workflows, one
with Kubeflow Pipelines (KFP) and one with Tensorflow Extended (TFX). These two
frameworks achieve the same goal of creating workflows, but have their own
distinct advantages and disadvantages: KFP requires you to create your workflow
components from scratch and requires a user to explicitly indicate which
artifacts should be passed between components and in what way. In contrast, TFX
offers a number of prebuilt components and takes care of the artifact passing
more implicitly. Clearly, there is a trade-off to be considered between
flexibility and programming overhead when choosing between the two frameworks.
We will start by looking at an example with KFP and then transition to TFX to
show TFX takes care of a lot of functionality that we had to define by hand in
the KFP example.
+
+To not overcomplicate things, the workflows are limited to three components:
data ingestion, data preprocessing and model training. Depending on the
scenario, a range of extra components could be added such as model evaluation,
model deployment… We will focus our attention on the preprocessing component,
since it showcases how to use Apache beam in an ML workflow for efficient and
parallel processing of your ML data.
+
+The dataset we will use consists image-caption pairs, i.e. images paired with
a textual caption describing the content of the image. These pairs are taken
from captions subset of the [MSCOCO 2014
dataset](https://cocodataset.org/#home). This multi-modal data (image + text)
gives us the opportunity to experiment with preprocessing operations for both
modalities.
+
+### Kubeflow pipelines (KFP)
+
+In order to execute our ML workflow with KFP we must perform three steps:
+
+1. Create the KFP components by specifying the interface to the components and
by writing and containerizing the implementation of the component logic
+2. Create the KFP pipeline by connecting the created components and specifying
how inputs and outputs should be passed from between components and compiling
the pipeline definition to a full pipeline definition.
+3. Execute the KFP pipeline by submitting it to a KFP client endpoint.
+
+The full example code can be found
[here](sdks/python/apache_beam/examples/ml-orchestration/kfp/)
+
+#### Create the KFP components
+
+This is our target file structure:
+
+ kfp
+ ├── pipeline.py
+ ├── components
+ │ ├── ingestion
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── ingest.py
+ │ ├── preprocessing
+ │ │ ├── Dockerfile
+ │ │ ├── component.yaml
+ │ │ ├── requirements.txt
+ │ │ └── src
+ │ │ └── preprocess.py
+ │ └── train
+ │ ├── Dockerfile
+ │ ├── component.yaml
+ │ ├── requirements.txt
+ │ └── src
+ │ └── train.py
+ └── requirements.txt
+
+Let’s start with the component specifications. The full preprocessing
component specification is illustrated below. The inputs are the path where the
ingested dataset was saved by the ingest component and a path to a directory
where the component can store artifacts. The specifications for the ingestion
and train component are similar and can be found here and here, respectively.
+
+>Note: we are using the KFP v1 SDK, because v2 is still in
[beta](https://www.kubeflow.org/docs/started/support/#application-status). The
v2 SDK introduces some new options for specifying the component interface with
more native support for input and output artifacts. To see how to migrate
components from v1 to v2, consult the [KFP
docs](https://www.kubeflow.org/docs/components/pipelines/sdk-v2/v2-component-io/).
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml"
preprocessing_component_definition >}}
+{{< /highlight >}}
+
+In this case, each component shares an identical Dockerfile but extra
component-specific dependencies could be added where necessary.
+
+{{< highlight language="Dockerfile"
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile"
component_dockerfile >}}
+{{< /highlight >}}
+
+With the component specification and containerization out of the way we can
look at the actual implementation of the preprocessing component.
+
+Since KFP provides the input and output arguments as command-line arguments,
an argumentparser is needed.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kf/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
preprocess_component_argparse >}}
+{{< /highlight >}}
+
+The implementation of the `preprocess_dataset` function contains the Beam
pipeline code and the Beam pipeline options to select the desired runner. The
executed preprocessing involves downloading the image bytes from their url,
converting them to a Torch Tensor and resizing to the desired size. The caption
undergoes a series of string manipulations to ensure that our model receives
clean uniform image descriptions (Tokenization is not yet done here, but could
be included here as well if the vocabulary is known). Finally each element is
serialized and written to [Avro](https://avro.apache.org/docs/1.2.0/) files
(Alternative files formats could be used as well, e.g. TFRecords).
+
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
deploy_preprocessing_beam_pipeline >}}
+{{< /highlight >}}
+
+It also contains the necessary code to perform the component IO. First, a
target path is constructed to store the preprocessed dataset based on the
component input parameter `base_artifact_path` and a timestamp. Output values
from components can only be returned as files so we write the value of the
constructed target path to an output file that was provided by KFP to our
component.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
>}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py"
kfp_component_input_output >}}
+{{< /highlight >}}
+
+Since we are mainly interested in the preprocessing component to show how a
Beam pipeline can be integrated into a larger ML workflow, we will not cover
the implementation of the ingestion and train component in depth.
Implementations of dummy components that mock their behavior are provided in
the full example code.
+
+#### Create the pipeline definition
+
+`pipeline.py` first loads the created components from their specification
`.yaml` file.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
load_kfp_components >}}
+{{< /highlight >}}
+
+After that, the pipeline is created and the required components inputs and
outputs are specified manually.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
define_kfp_pipeline >}}
+{{< /highlight >}}
+
+Finally, the defined pipeline is compiled and a `pipeline.json` specification
file is generated.
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+{{< code_sample
"sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py"
compile_kfp_pipeline >}}
+{{< /highlight >}}
+
+
+#### Execute the KFP pipeline
+
+{{< highlight
file="sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py" >}}
+client = kfp.Client()
+try:
+ experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
+except:
+ experiment = client.create_experiment(EXPERIMENT_NAME)
+arguments = {}
+
+run_result = client.run_pipeline(experiment.id,
+ RUN_NAME,
+ PIPELINE_FILENAME,
+ arguments)
+{{< /highlight >}}
+
+
+### Tensorflow Extended (TFX)
+
+The way of working for TFX is similar to the approach for KFP as illustrated
above: Define the individual workflow components, connect them in a pipeline
object and run the pipeline in the target environment. However, what makes TFX
different is that it has already built a set of Python packages that are
libraries to create workflow components. So unlike the KFP example, we do not
need to start from scratch by writing and containerizing our code. What is left
for the users to do is pick which of those TFX components are relevant to their
specific workflow and adapt their functionality to the specific use case using
the library. The image below shows the available components and their
corresponding libraries. The link with Apache Beam is that TFX relies heavily
on it to implement data-parallel pipelines in these libraries. This means that
components created with these libraries will need to be run on one of the
support Beam runners. The full example code can again be found [here](sdk
s/python/apache_beam/examples/ml-orchestration/tfx/)
Review Comment:
similar question about this relative link
`sdks/python/apache_beam/examples/ml-orchestration/tfx/`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]