agvdndor commented on code in PR #23094:
URL: https://github.com/apache/beam/pull/23094#discussion_r970833961


##########
sdks/python/apache_beam/examples/ml-orchestration/kfp/components/preprocessing/src/preprocess.py:
##########
@@ -0,0 +1,160 @@
+"""Dummy ingestion function that fetches data from one file and simply copies 
it to another."""
+import re
+import json
+import io
+import argparse
+import time
+from pathlib import Path
+
+import requests
+from PIL import Image, UnidentifiedImageError
+import numpy as np
+import torch
+import torchvision.transforms as T
+import torchvision.transforms.functional as TF
+import apache_beam as beam
+from apache_beam.options.pipeline_options import PipelineOptions
+
+
+PROJECT_ID = "<project-id>"
+LOCATION = "<project-location>"
+STAGING_DIR = "<uri-to-data-flow-staging-dir>"
+BEAM_RUNNER = "<beam-runner>"
+
+# [START preprocess_component_argparse]
+def parse_args():
+  """Parse preprocessing arguments."""
+  parser = argparse.ArgumentParser()
+  parser.add_argument(
+    "--ingested-dataset-path", type=str,
+    help="Path to the ingested dataset")
+  parser.add_argument(
+    "--preprocessed-dataset-path", type=str,
+    help="The target directory for the ingested dataset.")
+  parser.add_argument(
+    "--base-artifact-path", type=str,
+    help="Base path to store pipeline artifacts.")
+  return parser.parse_args()
+# [END preprocess_component_argparse]
+
+
+def preprocess_dataset(
+  ingested_dataset_path: str,
+  preprocessed_dataset_path: str,
+  base_artifact_path: str):
+  """Preprocess the ingested raw dataset and write the result to avro format.
+
+  Args:
+    ingested_dataset_path (str): Path to the ingested dataset
+    preprocessed_dataset_path (str): Path to where the preprocessed dataset 
will be saved
+    base_artifact_path (str): path to the base directory of where artifacts 
can be stored for
+      this component.
+  """
+  # [START kfp_component_input_output]
+  timestamp = time.time()
+  target_path = 
f"{base_artifact_path}/preprocessing/preprocessed_dataset_{timestamp}"
+
+  # the directory where the output file is created may or may not exists
+  # so we have to create it.
+  Path(preprocessed_dataset_path).parent.mkdir(parents=True, exist_ok=True)
+  with open(preprocessed_dataset_path, 'w') as f:
+    f.write(target_path)
+  # [END kfp_component_input_output]
+
+
+  # [START deploy_preprocessing_beam_pipeline]
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(
+    runner=BEAM_RUNNER,
+    project=PROJECT_ID,
+    job_name=f'preprocessing-{int(time.time())}',
+    temp_location=STAGING_DIR,
+    region=LOCATION,
+    requirements_file="/requirements.txt",
+    save_main_session = True,
+  )
+
+  with beam.Pipeline(options=pipeline_options) as pipeline:
+    (
+      pipeline
+      | "Read input jsonl file" >> beam.io.ReadFromText(ingested_dataset_path)

Review Comment:
   This was actually not a typo 🙂. We are reading [json 
lines](https://jsonlines.org/) as input, which by convention has a `.jsonl` 
file extension.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to