This is an automated email from the ASF dual-hosted git repository. vterentev pushed a commit to branch oss-image-cpu in repository https://gitbox.apache.org/repos/asf/beam.git
commit 1b96dad3e5fb84604a978baaa1f630e38c197ada Author: Vitaly Terentyev <[email protected]> AuthorDate: Fri Jan 2 13:22:19 2026 +0400 Refactoring --- .../beam_Inference_Python_Benchmarks_Dataflow.yml | 2 +- ...nchmarks_Dataflow_Pytorch_Image_Captioning.txt} | 0 .../examples/inference/pytorch_image_captioning.py | 59 +++++++++++----------- .../inference/pytorch_image_object_detection.py | 5 +- 4 files changed, 32 insertions(+), 34 deletions(-) diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml index c673feed720..1c2b3fd23bb 100644 --- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml +++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml @@ -93,7 +93,7 @@ jobs: ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Captioning.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt # The env variables are created and populated in the test-arguments-action as "<github.job>_test_arguments_<argument_file_paths_index>" - name: get current time run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Captioning.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt similarity index 100% rename from .github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Captioning.txt rename to .github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py index 04089b8b1e8..914ac43fe99 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py @@ -51,7 +51,6 @@ from apache_beam.runners.runner import PipelineResult import torch import PIL.Image as PILImage - # ============ Utility ============ @@ -143,13 +142,13 @@ class PostProcessDoFn(beam.DoFn): class BlipCaptionModelHandler(ModelHandler): def __init__( - self, - model_name: str, - device: str, - batch_size: int, - num_captions: int, - max_new_tokens: int, - num_beams: int): + self, + model_name: str, + device: str, + batch_size: int, + num_captions: int, + max_new_tokens: int, + num_beams: int): self.model_name = model_name self.device = device self.batch_size = batch_size @@ -214,7 +213,7 @@ class BlipCaptionModelHandler(ModelHandler): candidates_per_image = [] idx = 0 for _ in range(len(batch)): - candidates_per_image.append(captions_all[idx: idx + self.num_captions]) + candidates_per_image.append(captions_all[idx:idx + self.num_captions]) idx += self.num_captions blip_ms = now_millis() - start @@ -235,11 +234,11 @@ class BlipCaptionModelHandler(ModelHandler): class ClipRankModelHandler(ModelHandler): def __init__( - self, - model_name: str, - device: str, - batch_size: int, - score_normalize: bool): + self, + model_name: str, + device: str, + batch_size: int, + score_normalize: bool): self.model_name = model_name self.device = device self.batch_size = batch_size @@ -363,7 +362,8 @@ def parse_known_args(argv): parser.add_argument('--num_beams', type=int, default=5) # CLIP - parser.add_argument('--clip_model_name', default='openai/clip-vit-base-patch32') + parser.add_argument( + '--clip_model_name', default='openai/clip-vit-base-patch32') parser.add_argument('--clip_batch_size', type=int, default=8) parser.add_argument( '--clip_score_normalize', default='false', choices=['true', 'false']) @@ -415,9 +415,7 @@ def run( | 'MakeKey' >> beam.ParDo(MakeKeyDoFn()) | 'DistinctByKey' >> beam.Distinct()) - images = ( - keyed - | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn())) + images = (keyed | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn())) # Stage 1: BLIP candidate generation blip_out = ( @@ -431,23 +429,24 @@ def run( results = ( clip_out - | 'PostProcess' >> beam.ParDo(PostProcessDoFn( - blip_name=known_args.blip_model_name, - clip_name=known_args.clip_model_name))) + | 'PostProcess' >> beam.ParDo( + PostProcessDoFn( + blip_name=known_args.blip_model_name, + clip_name=known_args.clip_model_name))) if known_args.publish_to_big_query == 'true': _ = ( results | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( - known_args.output_table, - schema=( - 'image_id:STRING, blip_model:STRING, clip_model:STRING, ' - 'best_caption:STRING, best_score:FLOAT, ' - 'candidates:STRING, scores:STRING, ' - 'blip_ms:INT64, clip_ms:INT64, total_ms:INT64, infer_ms:INT64'), - write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, - create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, - method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) + known_args.output_table, + schema=( + 'image_id:STRING, blip_model:STRING, clip_model:STRING, ' + 'best_caption:STRING, best_score:FLOAT, ' + 'candidates:STRING, scores:STRING, ' + 'blip_ms:INT64, clip_ms:INT64, total_ms:INT64, infer_ms:INT64'), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=beam.io.WriteToBigQuery.Method.FILE_LOADS)) result = pipeline.run() result.wait_until_finish(duration=1800000) # 30 min diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py index 6ca9cdb1f93..8b0f216e5fe 100644 --- a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py @@ -155,7 +155,7 @@ def _torchvision_detection_inference_fn( class PostProcessDoFn(beam.DoFn): """PredictionResult -> dict row for BQ.""" def __init__( - self, model_name: str, score_threshold: float, max_detections: int): + self, model_name: str, score_threshold: float, max_detections: int): self.model_name = model_name self.score_threshold = score_threshold self.max_detections = max_detections @@ -329,8 +329,7 @@ def run( model_handler = PytorchModelHandlerTensor( model_class=lambda: create_torchvision_detection_model( - known_args.pretrained_model_name - ), + known_args.pretrained_model_name), model_params={}, state_dict_path=known_args.model_state_dict_path, device=device,
