davidcavazos commented on code in PR #23094: URL: https://github.com/apache/beam/pull/23094#discussion_r992688421
########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/Dockerfile: ########## @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8-slim Review Comment: Nit: can we update to `python:3.9-slim`? ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/src/ingest.py: ########## @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" Review Comment: The word "dummy" can be disrespectful or offensive to some readers. As per the [style guide](https://developers.google.com/style/word-list#dummy-variable) let's change it to something else. Under this context, I think it's fine to simply drop it "Ingestion function that ...". The rest of the explanation already states that it simply copies a file. Maybe "Simple ingestion function...", or stating that it's a "small data sample". ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/component.yaml: ########## @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START preprocessing_component_definition] +name: preprocessing +description: Component that mimicks scraping data from the web and outputs it to a jsonlines format file +inputs: + - name: ingested_dataset_path + description: source uri of the data to scrape + type: String + - name: base_artifact_path + description: base path to store data + type: String +outputs: + - name: preprocessed_dataset_path + description: target uri for the ingested dataset + type: String +implementation: + container: + image: <your-docker-registry/preprocessing-image-name:latest> Review Comment: It would be great if this could be replaced with a command line argument or environment variable, but we can leave it as is if it's not possible. ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" Review Comment: Please either drop or replace the word "dummy". ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import re +import json +import io +import argparse +import time +from pathlib import Path + +import requests +from PIL import Image, UnidentifiedImageError +import numpy as np +import torch +import torchvision.transforms as T +import torchvision.transforms.functional as TF +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +PROJECT_ID = "<project-id>" +LOCATION = "<project-location>" +STAGING_DIR = "<uri-to-data-flow-staging-dir>" +BEAM_RUNNER = "<beam-runner>" + + +# [START preprocess_component_argparse] +def parse_args(): + """Parse preprocessing arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to the ingested dataset", + required=True) + parser.add_argument( + "--preprocessed-dataset-path", + type=str, + help="The target directory for the ingested dataset.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +# [END preprocess_component_argparse] + + +def preprocess_dataset( + ingested_dataset_path: str, + preprocessed_dataset_path: str, + base_artifact_path: str): + """Preprocess the ingested raw dataset and write the result to avro format. + + Args: + ingested_dataset_path (str): Path to the ingested dataset + preprocessed_dataset_path (str): Path to where the preprocessed dataset will be saved + base_artifact_path (str): path to the base directory of where artifacts can be stored for + this component. + """ + # [START kfp_component_input_output] + timestamp = time.time() + target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}" + + # the directory where the output file is created may or may not exists + # so we have to create it. + Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True) + with open(preprocessed_dataset_path, 'w') as f: + f.write(target_path) + # [END kfp_component_input_output] + + # [START deploy_preprocessing_beam_pipeline] + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions( + runner=BEAM_RUNNER, + project=PROJECT_ID, + job_name=f'preprocessing-{int(time.time())}', + temp_location=STAGING_DIR, + region=LOCATION, + requirements_file="/requirements.txt", + save_main_session=True, + ) + + with beam.Pipeline(options=pipeline_options) as pipeline: + ( + pipeline + | + "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path) Review Comment: Typo: `jsonl` -> `json` (?) ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import re +import json +import io +import argparse +import time +from pathlib import Path + +import requests +from PIL import Image, UnidentifiedImageError +import numpy as np +import torch +import torchvision.transforms as T +import torchvision.transforms.functional as TF +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +PROJECT_ID = "<project-id>" +LOCATION = "<project-location>" +STAGING_DIR = "<uri-to-data-flow-staging-dir>" +BEAM_RUNNER = "<beam-runner>" + + +# [START preprocess_component_argparse] +def parse_args(): + """Parse preprocessing arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to the ingested dataset", + required=True) + parser.add_argument( + "--preprocessed-dataset-path", + type=str, + help="The target directory for the ingested dataset.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +# [END preprocess_component_argparse] + + +def preprocess_dataset( + ingested_dataset_path: str, + preprocessed_dataset_path: str, + base_artifact_path: str): + """Preprocess the ingested raw dataset and write the result to avro format. + + Args: + ingested_dataset_path (str): Path to the ingested dataset + preprocessed_dataset_path (str): Path to where the preprocessed dataset will be saved + base_artifact_path (str): path to the base directory of where artifacts can be stored for + this component. + """ + # [START kfp_component_input_output] + timestamp = time.time() + target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}" + + # the directory where the output file is created may or may not exists + # so we have to create it. + Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True) + with open(preprocessed_dataset_path, 'w') as f: + f.write(target_path) + # [END kfp_component_input_output] + + # [START deploy_preprocessing_beam_pipeline] + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions( + runner=BEAM_RUNNER, + project=PROJECT_ID, + job_name=f'preprocessing-{int(time.time())}', + temp_location=STAGING_DIR, + region=LOCATION, + requirements_file="/requirements.txt", + save_main_session=True, + ) + + with beam.Pipeline(options=pipeline_options) as pipeline: + ( + pipeline + | + "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path) + | "Load json" >> beam.Map(json.loads) + | "Filter licenses" >> beam.Filter(valid_license) + | "Download image from URL" >> beam.ParDo(DownloadImageFromURL()) + | "Filter on valid images" >> + beam.Filter(lambda el: el['image'] is not None) + | "Resize image" >> beam.ParDo(ResizeImage(size=(224, 224))) Review Comment: Can we make the `224` into a constant at the top of the file? Maybe something like `IMAGE_SIZE = 224` or `IMAGE_SIZE = (224, 224)`, whichever you prefer. ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/Dockerfile: ########## @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8-slim Review Comment: Nit: can we update to `python:3.9-slim`? ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/src/train.py: ########## @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy training function that loads a pretrained model from the torch hub and saves it.""" + +import argparse +import time +from pathlib import Path + +import torch + + +def parse_args(): + """Parse ingestion arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--preprocessed-dataset-path", + type=str, + help="Path to the preprocessed dataset.", + required=True) + parser.add_argument( + "--trained-model-path", + type=str, + help="Output path to the trained model.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +def train_model( + preprocessed_dataset_path: str, + trained_model_path: str, + base_artifact_path: str): + """Dummy to load a model from the torch hub and save it. Review Comment: Please either drop or replace the word "dummy". ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/component.yaml: ########## @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Ingestion +description: Component that mimicks scraping data from the web and outputs it to a jsonlines format file +inputs: + - name: base_artifact_path + description: base path to store data + type: String +outputs: + - name: ingested_dataset_path + description: target uri for the ingested dataset + type: String +implementation: + container: + image: <your-docker-registry/ingestion-image-name:latest> Review Comment: I'm assuming this requires users modifying this file. I'm not too familiar with KFP, but is there any way we can pass this via a command line argument or environment variable or any other way? It would be nice if users don't have to modify the source code to run something, but I'm not sure if that's an existing tooling limitation. ########## sdks/python/apache_beam/examples/ml-orchestration/tfx/coco_captions_local.py: ########## @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Preprocessing example with TFX with the LocalDagRunner and +either the beam DirectRunner or DataflowRunner""" +import os + +from tfx import v1 as tfx + +PROJECT_ID = "<project id>" Review Comment: Can we make all these into command line arguments? Thanks ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/Dockerfile: ########## @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START component_dockerfile] +FROM python:3.8-slim Review Comment: Nit: can we update to `python:3.9-slim`? ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/src/ingest.py: ########## @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import argparse +import time +from pathlib import Path + + +def parse_args(): + """Parse ingestion arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to save the ingested dataset to.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +def dummy_ingest_data(ingested_dataset_path: str, base_artifact_path: str): + """Dummy data ingestion step that returns an uri Review Comment: """Data ingestion step that... ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/src/ingest.py: ########## @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import argparse +import time +from pathlib import Path + + +def parse_args(): + """Parse ingestion arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to save the ingested dataset to.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +def dummy_ingest_data(ingested_dataset_path: str, base_artifact_path: str): Review Comment: How about we rename this to `ingest_data`? ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/ingestion/src/ingest.py: ########## @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import argparse +import time +from pathlib import Path + + +def parse_args(): + """Parse ingestion arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to save the ingested dataset to.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +def dummy_ingest_data(ingested_dataset_path: str, base_artifact_path: str): + """Dummy data ingestion step that returns an uri + to the data it has 'ingested' as jsonlines. + + Args: + data_ingestion_target (str): uri to the data that was scraped and + ingested by the component""" + # timestamp as unique id for the component execution + timestamp = int(time.time()) + + # create directory to store the actual data + target_path = f"{base_artifact_path}/ingestion/ingested_dataset_{timestamp}.jsonl" + # if the target path is a google cloud storage path convert the path to the gcsfuse path + target_path_gcsfuse = target_path.replace("gs://", "/gcs/") + Path(target_path_gcsfuse).parent.mkdir(parents=True, exist_ok=True) + + with open(target_path_gcsfuse, 'w') as f: + f.writelines([ + """{"image_id": 318556, "id": 255, "caption": "An angled view of a beautifully decorated bathroom.", "image_url": "http://farm4.staticflickr.com/3133/3378902101_3c9fa16b84_z.jpg", "image_name": "COCO_train2014_000000318556.jpg", "image_license": "Attribution-NonCommercial-ShareAlike License"}\n""", Review Comment: Nit: would it make sense to use `json.dumps` for each line? It might help at least validating we're writing correct data, but since it's pretty static I don't have a strong opinion about it, so I'll leave this up to you. ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import re +import json +import io +import argparse +import time +from pathlib import Path + +import requests +from PIL import Image, UnidentifiedImageError +import numpy as np +import torch +import torchvision.transforms as T +import torchvision.transforms.functional as TF +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +PROJECT_ID = "<project-id>" Review Comment: Can we make all these constants into command line arguments? It would be great to be able to run the sample without modifying source code files. The workflow looks roughly like this. ```py parser = argparse.ArgumentParser() # ... args, beam_args = parser.parse_known_args() ``` ```py pipeline_options = PipelineOptions( beam_args, job_name=f'preprocessing-{int(time.time())}', requirements_file="/requirements.txt", save_main_session=True, ) ``` Then pass the CLI arguments them when running the pipeline. ```sh python preprocess.py \ --runner=DataflowRunner \ --project=$PROJECT \ --region=us-central1 \ --temp_location=gs://$BUCKET/temp \ ... ``` Here's a recent sample I made where I follow a similar pattern. ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import re +import json +import io +import argparse +import time +from pathlib import Path + +import requests +from PIL import Image, UnidentifiedImageError +import numpy as np +import torch +import torchvision.transforms as T +import torchvision.transforms.functional as TF +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +PROJECT_ID = "<project-id>" +LOCATION = "<project-location>" +STAGING_DIR = "<uri-to-data-flow-staging-dir>" +BEAM_RUNNER = "<beam-runner>" + + +# [START preprocess_component_argparse] +def parse_args(): + """Parse preprocessing arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to the ingested dataset", + required=True) + parser.add_argument( + "--preprocessed-dataset-path", + type=str, + help="The target directory for the ingested dataset.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +# [END preprocess_component_argparse] + + +def preprocess_dataset( + ingested_dataset_path: str, + preprocessed_dataset_path: str, + base_artifact_path: str): + """Preprocess the ingested raw dataset and write the result to avro format. + + Args: + ingested_dataset_path (str): Path to the ingested dataset + preprocessed_dataset_path (str): Path to where the preprocessed dataset will be saved + base_artifact_path (str): path to the base directory of where artifacts can be stored for + this component. + """ + # [START kfp_component_input_output] + timestamp = time.time() + target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}" + + # the directory where the output file is created may or may not exists + # so we have to create it. + Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True) + with open(preprocessed_dataset_path, 'w') as f: + f.write(target_path) + # [END kfp_component_input_output] + + # [START deploy_preprocessing_beam_pipeline] + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions( + runner=BEAM_RUNNER, + project=PROJECT_ID, + job_name=f'preprocessing-{int(time.time())}', + temp_location=STAGING_DIR, + region=LOCATION, + requirements_file="/requirements.txt", + save_main_session=True, + ) + + with beam.Pipeline(options=pipeline_options) as pipeline: + ( + pipeline + | + "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path) + | "Load json" >> beam.Map(json.loads) + | "Filter licenses" >> beam.Filter(valid_license) + | "Download image from URL" >> beam.ParDo(DownloadImageFromURL()) + | "Filter on valid images" >> + beam.Filter(lambda el: el['image'] is not None) + | "Resize image" >> beam.ParDo(ResizeImage(size=(224, 224))) + | "Clean Text" >> beam.ParDo(CleanText()) + | "Serialize Example" >> beam.ParDo(SerializeExample()) + | "Write to Avro files" >> beam.io.WriteToAvro( + file_path_prefix=target_path, + schema={ + "namespace": "preprocessing.example", + "type": "record", + "name": "Sample", + "fields": [{ + "name": "id", "type": "int" + }, { + "name": "caption", "type": "string" + }, { + "name": "image", "type": "bytes" + }] + }, + file_name_suffix=".avro")) + # [END deploy_preprocessing_beam_pipeline] + + +class DownloadImageFromURL(beam.DoFn): Review Comment: It doesn't look like this needs to be a full blown custom `DoFn` class, it could simply be a function that either `yield`s a valid value or nothing. Also, I would try to make all functions _pure_. I've encountered very hard to debug problems when there are side effects that mutate elements in a PCollection. If the PCollection is shared, one transform can corrupt the values in another transform giving weird and unpredictable results. The behavior is runner-specific, so it'll behave differently on different runners, like "undefined behavior". Here I would _avoid mutating_ dictionaries, and instead create new ones based from the previous element. This makes sure the previous elements are not modified. ```py from collections.abc import Iterable def download_image_from_url(element: dict) -> Iterable[dict]: response = ... try: image = ... yield {**element, 'image': image} except UnidentifiedImageError as e: logging.exception(e) ``` And then it could be used with `beam.FlatMap(download_image_from_url)` instead of `ParDo+Filter`. ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import re +import json +import io +import argparse +import time +from pathlib import Path + +import requests +from PIL import Image, UnidentifiedImageError +import numpy as np +import torch +import torchvision.transforms as T +import torchvision.transforms.functional as TF +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +PROJECT_ID = "<project-id>" +LOCATION = "<project-location>" +STAGING_DIR = "<uri-to-data-flow-staging-dir>" +BEAM_RUNNER = "<beam-runner>" + + +# [START preprocess_component_argparse] +def parse_args(): + """Parse preprocessing arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to the ingested dataset", + required=True) + parser.add_argument( + "--preprocessed-dataset-path", + type=str, + help="The target directory for the ingested dataset.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +# [END preprocess_component_argparse] + + +def preprocess_dataset( + ingested_dataset_path: str, + preprocessed_dataset_path: str, + base_artifact_path: str): + """Preprocess the ingested raw dataset and write the result to avro format. + + Args: + ingested_dataset_path (str): Path to the ingested dataset + preprocessed_dataset_path (str): Path to where the preprocessed dataset will be saved + base_artifact_path (str): path to the base directory of where artifacts can be stored for + this component. + """ + # [START kfp_component_input_output] + timestamp = time.time() + target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}" + + # the directory where the output file is created may or may not exists + # so we have to create it. + Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True) + with open(preprocessed_dataset_path, 'w') as f: + f.write(target_path) + # [END kfp_component_input_output] + + # [START deploy_preprocessing_beam_pipeline] + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions( + runner=BEAM_RUNNER, + project=PROJECT_ID, + job_name=f'preprocessing-{int(time.time())}', + temp_location=STAGING_DIR, + region=LOCATION, + requirements_file="/requirements.txt", + save_main_session=True, + ) + + with beam.Pipeline(options=pipeline_options) as pipeline: + ( + pipeline + | + "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path) + | "Load json" >> beam.Map(json.loads) + | "Filter licenses" >> beam.Filter(valid_license) + | "Download image from URL" >> beam.ParDo(DownloadImageFromURL()) + | "Filter on valid images" >> + beam.Filter(lambda el: el['image'] is not None) + | "Resize image" >> beam.ParDo(ResizeImage(size=(224, 224))) + | "Clean Text" >> beam.ParDo(CleanText()) + | "Serialize Example" >> beam.ParDo(SerializeExample()) + | "Write to Avro files" >> beam.io.WriteToAvro( + file_path_prefix=target_path, + schema={ + "namespace": "preprocessing.example", + "type": "record", + "name": "Sample", + "fields": [{ + "name": "id", "type": "int" + }, { + "name": "caption", "type": "string" + }, { + "name": "image", "type": "bytes" + }] + }, + file_name_suffix=".avro")) + # [END deploy_preprocessing_beam_pipeline] + + +class DownloadImageFromURL(beam.DoFn): + """DoFn to download the images from their uri.""" + def process(self, element): + response = requests.get(element['image_url']) + try: + image = Image.open(io.BytesIO(response.content)) + image = T.ToTensor()(image) + element['image'] = image + except UnidentifiedImageError: + element['image'] = None + return [element] + + +class ResizeImage(beam.DoFn): + "DoFn to resize the elememt's PIL image to the target resolution." + + def process(self, element, size=(256, 256)): + element['image'] = TF.resize(element['image'], size) + return [element] + + +class CleanText(beam.DoFn): Review Comment: This can also be a simple function. ```py def clean_text(element: dict): text = ... return {**element, "preprocessed_caption": text} ``` `beam.Map(clean_text)` ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import re +import json +import io +import argparse +import time +from pathlib import Path + +import requests +from PIL import Image, UnidentifiedImageError +import numpy as np +import torch +import torchvision.transforms as T +import torchvision.transforms.functional as TF +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +PROJECT_ID = "<project-id>" +LOCATION = "<project-location>" +STAGING_DIR = "<uri-to-data-flow-staging-dir>" +BEAM_RUNNER = "<beam-runner>" + + +# [START preprocess_component_argparse] +def parse_args(): + """Parse preprocessing arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to the ingested dataset", + required=True) + parser.add_argument( + "--preprocessed-dataset-path", + type=str, + help="The target directory for the ingested dataset.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +# [END preprocess_component_argparse] + + +def preprocess_dataset( + ingested_dataset_path: str, + preprocessed_dataset_path: str, + base_artifact_path: str): + """Preprocess the ingested raw dataset and write the result to avro format. + + Args: + ingested_dataset_path (str): Path to the ingested dataset + preprocessed_dataset_path (str): Path to where the preprocessed dataset will be saved + base_artifact_path (str): path to the base directory of where artifacts can be stored for + this component. + """ + # [START kfp_component_input_output] + timestamp = time.time() + target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}" + + # the directory where the output file is created may or may not exists + # so we have to create it. + Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True) + with open(preprocessed_dataset_path, 'w') as f: + f.write(target_path) + # [END kfp_component_input_output] + + # [START deploy_preprocessing_beam_pipeline] + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions( + runner=BEAM_RUNNER, + project=PROJECT_ID, + job_name=f'preprocessing-{int(time.time())}', + temp_location=STAGING_DIR, + region=LOCATION, + requirements_file="/requirements.txt", + save_main_session=True, + ) + + with beam.Pipeline(options=pipeline_options) as pipeline: + ( + pipeline + | + "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path) + | "Load json" >> beam.Map(json.loads) + | "Filter licenses" >> beam.Filter(valid_license) + | "Download image from URL" >> beam.ParDo(DownloadImageFromURL()) + | "Filter on valid images" >> + beam.Filter(lambda el: el['image'] is not None) + | "Resize image" >> beam.ParDo(ResizeImage(size=(224, 224))) + | "Clean Text" >> beam.ParDo(CleanText()) + | "Serialize Example" >> beam.ParDo(SerializeExample()) + | "Write to Avro files" >> beam.io.WriteToAvro( + file_path_prefix=target_path, + schema={ + "namespace": "preprocessing.example", + "type": "record", + "name": "Sample", + "fields": [{ + "name": "id", "type": "int" + }, { + "name": "caption", "type": "string" + }, { + "name": "image", "type": "bytes" + }] + }, + file_name_suffix=".avro")) + # [END deploy_preprocessing_beam_pipeline] + + +class DownloadImageFromURL(beam.DoFn): + """DoFn to download the images from their uri.""" + def process(self, element): + response = requests.get(element['image_url']) + try: + image = Image.open(io.BytesIO(response.content)) + image = T.ToTensor()(image) + element['image'] = image + except UnidentifiedImageError: + element['image'] = None + return [element] + + +class ResizeImage(beam.DoFn): Review Comment: This can also be a simple function. ```py def resize_image(element: dict, size=(256, 256)): image = TF.resize(element['image'], size) return {**element, 'image': image} ``` And call it with `beam.Map(resize_image, size=IMAGE_SIZE)`. ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/src/train.py: ########## @@ -0,0 +1,83 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy training function that loads a pretrained model from the torch hub and saves it.""" Review Comment: Please either drop or replace the word "dummy". ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py: ########## @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Dummy ingestion function that fetches data from one file and simply copies it to another.""" + +import re +import json +import io +import argparse +import time +from pathlib import Path + +import requests +from PIL import Image, UnidentifiedImageError +import numpy as np +import torch +import torchvision.transforms as T +import torchvision.transforms.functional as TF +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + +PROJECT_ID = "<project-id>" +LOCATION = "<project-location>" +STAGING_DIR = "<uri-to-data-flow-staging-dir>" +BEAM_RUNNER = "<beam-runner>" + + +# [START preprocess_component_argparse] +def parse_args(): + """Parse preprocessing arguments.""" + parser = argparse.ArgumentParser() + parser.add_argument( + "--ingested-dataset-path", + type=str, + help="Path to the ingested dataset", + required=True) + parser.add_argument( + "--preprocessed-dataset-path", + type=str, + help="The target directory for the ingested dataset.", + required=True) + parser.add_argument( + "--base-artifact-path", + type=str, + help="Base path to store pipeline artifacts.", + required=True) + return parser.parse_args() + + +# [END preprocess_component_argparse] + + +def preprocess_dataset( + ingested_dataset_path: str, + preprocessed_dataset_path: str, + base_artifact_path: str): + """Preprocess the ingested raw dataset and write the result to avro format. + + Args: + ingested_dataset_path (str): Path to the ingested dataset + preprocessed_dataset_path (str): Path to where the preprocessed dataset will be saved + base_artifact_path (str): path to the base directory of where artifacts can be stored for + this component. + """ + # [START kfp_component_input_output] + timestamp = time.time() + target_path = f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}" + + # the directory where the output file is created may or may not exists + # so we have to create it. + Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True) + with open(preprocessed_dataset_path, 'w') as f: + f.write(target_path) + # [END kfp_component_input_output] + + # [START deploy_preprocessing_beam_pipeline] + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions( + runner=BEAM_RUNNER, + project=PROJECT_ID, + job_name=f'preprocessing-{int(time.time())}', + temp_location=STAGING_DIR, + region=LOCATION, + requirements_file="/requirements.txt", + save_main_session=True, + ) + + with beam.Pipeline(options=pipeline_options) as pipeline: + ( + pipeline + | + "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path) + | "Load json" >> beam.Map(json.loads) + | "Filter licenses" >> beam.Filter(valid_license) + | "Download image from URL" >> beam.ParDo(DownloadImageFromURL()) + | "Filter on valid images" >> + beam.Filter(lambda el: el['image'] is not None) + | "Resize image" >> beam.ParDo(ResizeImage(size=(224, 224))) + | "Clean Text" >> beam.ParDo(CleanText()) + | "Serialize Example" >> beam.ParDo(SerializeExample()) + | "Write to Avro files" >> beam.io.WriteToAvro( + file_path_prefix=target_path, + schema={ + "namespace": "preprocessing.example", + "type": "record", + "name": "Sample", + "fields": [{ + "name": "id", "type": "int" + }, { + "name": "caption", "type": "string" + }, { + "name": "image", "type": "bytes" + }] + }, + file_name_suffix=".avro")) + # [END deploy_preprocessing_beam_pipeline] + + +class DownloadImageFromURL(beam.DoFn): + """DoFn to download the images from their uri.""" + def process(self, element): + response = requests.get(element['image_url']) + try: + image = Image.open(io.BytesIO(response.content)) + image = T.ToTensor()(image) + element['image'] = image + except UnidentifiedImageError: + element['image'] = None + return [element] + + +class ResizeImage(beam.DoFn): + "DoFn to resize the elememt's PIL image to the target resolution." + + def process(self, element, size=(256, 256)): + element['image'] = TF.resize(element['image'], size) + return [element] + + +class CleanText(beam.DoFn): + """Dofn to perform a series of string cleaning operations.""" + def process(self, element): + text = element['caption'] + + text = text.lower() # lower case + text = re.sub(r"http\S+", "", text) # remove urls + text = re.sub("\s+", " ", text) # remove extra spaces (including \n and \t) + text = re.sub( + "[()[\].,|:;?!=+~\-\/{}]", ",", + text) # all puncutation are replace w commas + text = f" {text}" # always start with a space + text = text.strip(',') # remove commas at the start or end of the caption + text = text[:-1] if text and text[-1] == "," else text + text = text[1:] if text and text[0] == "," else text + + element["preprocessed_caption"] = text + return [element] + + +def valid_license(element): + """Checks whether an element's image has the correct license for our use case.""" + license = element['image_license'] + return license in ["Attribution License", "No known copyright restrictions"] + + +class SerializeExample(beam.DoFn): Review Comment: This can also be a simple function. ```py def serialize_example(element): image = ... return {**element, 'image': image} ``` ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/components/train/component.yaml: ########## @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: model training +description: Train a pytorch model +inputs: + - name: base_artifact_path + description: base path to store data + type: String + - name: preprocessed_dataset_path + description: path to the preprocessed dataset + type: String +outputs: + - name: trained_model_path + description: trained model file + type: String +implementation: + container: + image: <your-docker-registry/train-image-name:latest> Review Comment: It would be nice having this as a command line argument or environment variable. ########## sdks/python/apache_beam/examples/ml-orchestration/kfp/pipeline.py: ########## @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import components as comp +from kfp.v2 import dsl +from kfp.v2.compiler import Compiler + +PIPELINE_ROOT = "<pipeline-root-path>" Review Comment: I'm not too familiar with KFP, but can we replace these with either command line arguments or environment variables? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
