This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch oss-image-cpu
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6c9c5729fb7dda6df0b759e8c5917456812dd5eb
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Mon Oct 27 18:33:03 2025 +0400

    Add PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) 
ML pipeline
---
 .../beam_Inference_Python_Benchmarks_Dataflow.yml  |  16 +-
 ...aflow_Pytorch_Image_Classification_Rightfit.txt |  46 ++
 .test-infra/tools/refresh_looker_metrics.py        |   1 +
 .../inference/pytorch_imagenet_rightfit.py         | 525 +++++++++++++++++++++
 .../ml/inference/pytorch_rightfit_requirements.txt |  26 +
 .../pytorch_imagenet_rightfit_benchmarks.py        |  42 ++
 website/www/site/content/en/performance/_index.md  |   1 +
 .../performance/pytorchimagenetrightfit/_index.md  |  44 ++
 website/www/site/data/performance.yaml             |  16 +
 9 files changed, 715 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml 
b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
index 1c2b3fd23bb..edd92bec660 100644
--- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
+++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml
@@ -92,6 +92,7 @@ jobs:
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt
             ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt
       # The env variables are created and populated in the 
test-arguments-action as 
"<github.job>_test_arguments_<argument_file_paths_index>"
@@ -192,6 +193,17 @@ jobs:
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/torch_tests_requirements.txt
 \
             '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} 
--job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} 
--output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt'
 \
+      - name: run PyTorch Image Classification EfficientNet-B0 Streaming 
(Right-fitting)
+        uses: ./.github/actions/gradle-command-self-hosted-action
+        timeout-minutes: 180
+        with:
+          gradle-command: :sdks:python:apache_beam:testing:load_tests:run
+          arguments: |
+            
-PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_imagenet_rightfit_benchmarks
 \
+            -Prunner=DataflowRunner \
+            -PpythonVersion=3.10 \
+            
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
 \
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} 
--mode=streaming 
--job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit'
 \
       - name: run PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch
         uses: ./.github/actions/gradle-command-self-hosted-action
         timeout-minutes: 180
@@ -202,7 +214,7 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --mode=batch 
--job_name=benchmark-tests-pytorch-image-object_detection-batch-${{env.NOW_UTC}}
 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_object_detection_batch'
 \
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_10 }} --mode=batch 
--job_name=benchmark-tests-pytorch-image-object_detection-batch-${{env.NOW_UTC}}
 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_object_detection_batch'
 \
       - name: run PyTorch Image Captioning BLIP + CLIP Batch
         uses: ./.github/actions/gradle-command-self-hosted-action
         timeout-minutes: 180
@@ -213,4 +225,4 @@ jobs:
             -Prunner=DataflowRunner \
             -PpythonVersion=3.10 \
             
-PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt
 \
-            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_10 }} --mode=batch 
--job_name=benchmark-tests-pytorch-image-captioning-batch-${{env.NOW_UTC}} 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_captioning_batch'
+            '-PloadTest.args=${{ 
env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_11 }} --mode=batch 
--job_name=benchmark-tests-pytorch-image-captioning-batch-${{env.NOW_UTC}} 
--output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_captioning_batch'
diff --git 
a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt
 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt
new file mode 100644
index 00000000000..0e19440503c
--- /dev/null
+++ 
b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt
@@ -0,0 +1,46 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+--region=us-central1
+--machine_type=n1-standard-4
+--num_workers=50
+--disk_size_gb=50
+--autoscaling_algorithm=NONE
+--staging_location=gs://temp-storage-for-perf-tests/loadtests
+--temp_location=gs://temp-storage-for-perf-tests/loadtests
+--requirements_file=apache_beam/ml/inference/pytorch_rightfit_requirements.txt
+--publish_to_big_query=true
+--metrics_dataset=beam_run_inference
+--metrics_table=torch_inference_imagenet_results_stream_rightfit
+--influx_measurement=torch_inference_imagenet_stream_rightfit
+--pretrained_model_name=efficientnet_b0
+--device=GPU
+--input_file=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt
+--runner=DataflowRunner
+--mode=streaming
+--input_mode=gcs_uris
+--input_options={}
+--pubsub_topic=projects/apache-beam-testing/topics/images_topic
+--pubsub_subscription=projects/apache-beam-testing/subscriptions/images_subscription
+--model_state_dict_path=gs://apache-beam-ml/models/efficientnet_b0_state_dict.pth
+--rate_limit=250
+--image_size=224
+--top_k=5
+--inference_batch_size=auto
+--window_sec=60
+--trigger_proc_time_sec=30
+--enable_dedup=false
+--experiments=worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx
diff --git a/.test-infra/tools/refresh_looker_metrics.py 
b/.test-infra/tools/refresh_looker_metrics.py
index 5daac3aaf31..87564d5d65e 100644
--- a/.test-infra/tools/refresh_looker_metrics.py
+++ b/.test-infra/tools/refresh_looker_metrics.py
@@ -43,6 +43,7 @@ LOOKS_TO_DOWNLOAD = [
     ("82", ["263", "264", "265", "266", "267"]),  # PyTorch Sentiment 
Streaming DistilBERT base uncased
     ("85", ["268", "269", "270", "271", "272"]),  # PyTorch Sentiment Batch 
DistilBERT base uncased
     ("86", ["284", "285", "286", "287", "288"]),  # VLLM Batch Gemma
+    ("92", ["289", "290", "291", "292", "293"]),  # PyTorch Image 
Classification EfficientNet-B0 Streaming (Right-fitting)
     #TODO: PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch
     #TODO: PyTorch Image Captioning BLIP + CLIP Batch
 ]
diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py 
b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
new file mode 100644
index 00000000000..0d16f786581
--- /dev/null
+++ b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
@@ -0,0 +1,525 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This streaming pipeline performs image classification using an open-source
+PyTorch EfficientNet-B0 model optimized for T4 GPUs.
+It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel,
+and runs inference with adaptive batch sizing for optimal GPU utilization.
+The pipeline ensures exactly-once semantics via stateful deduplication and
+idempotent BigQuery writes, allowing stable and reproducible performance
+measurements under continuous load.
+Resources like Pub/Sub topic/subscription cleanup is handled programmatically.
+"""
+
+import argparse
+import io
+import json
+import logging
+import threading
+import time
+from typing import Iterable
+from typing import Optional
+from typing import Tuple
+
+import torch
+import torch.nn.functional as F
+
+import apache_beam as beam
+from apache_beam.coders import BytesCoder
+from apache_beam.io.filesystems import FileSystems
+from apache_beam.ml.inference.base import KeyedModelHandler
+from apache_beam.ml.inference.base import PredictionResult
+from apache_beam.ml.inference.base import RunInference
+from apache_beam.ml.inference.pytorch_inference import 
PytorchModelHandlerTensor
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.runner import PipelineResult
+from apache_beam.transforms import userstate
+from apache_beam.transforms import window
+
+from google.cloud import pubsub_v1
+import PIL.Image as PILImage
+
+# ============ Utility & Preprocessing ============
+
+IMAGENET_MEAN = [0.485, 0.456, 0.406]
+IMAGENET_STD = [0.229, 0.224, 0.225]
+
+
+def now_millis() -> int:
+  return int(time.time() * 1000)
+
+
+def read_gcs_file_lines(gcs_path: str) -> Iterable[str]:
+  """Reads text lines from a GCS file."""
+  with FileSystems.open(gcs_path) as f:
+    for line in f.read().decode("utf-8").splitlines():
+      yield line.strip()
+
+
+def load_image_from_uri(uri: str) -> bytes:
+  with FileSystems.open(uri) as f:
+    return f.read()
+
+
+def decode_and_preprocess(image_bytes: bytes, size: int = 224) -> torch.Tensor:
+  """Decode bytes->RGB PIL->resize/crop->tensor->normalize."""
+  with PILImage.open(io.BytesIO(image_bytes)) as img:
+    img = img.convert("RGB")
+    img.thumbnail((256, 256))
+    w, h = img.size
+    left = (w - size) // 2
+    top = (h - size) // 2
+    img = img.crop(
+        (max(0, left), max(0, top), min(w, left + size), min(h, top + size)))
+
+    # To tensor [0..1]
+    import numpy as np
+    arr = np.asarray(img).astype("float32") / 255.0  # H,W,3
+    # Normalize
+    arr = (arr - IMAGENET_MEAN) / IMAGENET_STD
+    # HWC -> CHW
+    arr = np.transpose(arr, (2, 0, 1))
+    return torch.from_numpy(arr)  # float32, shape (3,224,224)
+
+
+class RateLimitDoFn(beam.DoFn):
+  def __init__(self, rate_per_sec: float):
+    self.delay = 1.0 / rate_per_sec
+
+  def process(self, element):
+    time.sleep(self.delay)
+    yield element
+
+
+class MakeKeyDoFn(beam.DoFn):
+  """Produce (image_id, payload) stable for dedup & BQ insertId."""
+  def __init__(self, input_mode: str):
+    self.input_mode = input_mode
+
+  def process(self, element: str | bytes):
+    # Input can be raw bytes from Pub/Sub or a GCS URI string, depends on mode
+    if self.input_mode == "bytes":
+      # element is bytes message, assume it includes
+      # {"image_id": "...", "bytes": base64?} or just raw bytes.
+      import hashlib
+      b = element if isinstance(element, (bytes, bytearray)) else 
bytes(element)
+      image_id = hashlib.sha1(b).hexdigest()
+      yield image_id, b
+    else:
+      # gcs_uris: element is uri string; image_id = sha1(uri)
+      import hashlib
+      uri = element.decode("utf-8") if isinstance(
+          element, (bytes, bytearray)) else str(element)
+      image_id = hashlib.sha1(uri.encode("utf-8")).hexdigest()
+      yield image_id, uri
+
+
+class DedupDoFn(beam.DoFn):
+  seen = userstate.ReadModifyWriteStateSpec('seen', BytesCoder())
+
+  def process(self, element, seen=beam.DoFn.StateParam(seen)):
+    if seen.read() == b'1':
+      return
+    seen.write(b'1')
+    yield element
+
+
+class DecodePreprocessDoFn(beam.DoFn):
+  """Turn (image_id, bytes|uri) -> (image_id, torch.Tensor)"""
+  def __init__(
+      self, input_mode: str, image_size: int = 224, decode_threads: int = 4):
+    self.input_mode = input_mode
+    self.image_size = image_size
+    self.decode_threads = decode_threads
+
+  def process(self, kv: Tuple[str, object]):
+    image_id, payload = kv
+    start = now_millis()
+
+    try:
+      if self.input_mode == "bytes":
+        b = payload if isinstance(payload,
+                                  (bytes, bytearray)) else bytes(payload)
+      else:
+        uri = payload if isinstance(payload, str) else payload.decode("utf-8")
+        b = load_image_from_uri(uri)
+
+      tensor = decode_and_preprocess(b, self.image_size)
+      preprocess_ms = now_millis() - start
+      yield image_id, {"tensor": tensor, "preprocess_ms": preprocess_ms}
+    except Exception as e:
+      logging.warning("Decode failed for %s: %s", image_id, e)
+      return
+
+
+class PostProcessDoFn(beam.DoFn):
+  """PredictionResult -> dict row for BQ."""
+  def __init__(self, top_k: int, model_name: str):
+    self.top_k = top_k
+    self.model_name = model_name
+
+  def process(self, kv: Tuple[str, PredictionResult]):
+    image_id, pred = kv
+    logits = pred.inference[
+        "logits"]  # torch.Tensor [B, num_classes] or [num_classes]
+    if isinstance(logits, torch.Tensor) and logits.ndim == 1:
+      logits = logits.unsqueeze(0)
+
+    probs = F.softmax(logits, dim=-1)  # [B, C]
+    values, indices = torch.topk(
+        probs, k=min(self.top_k, probs.shape[-1]), dim=-1
+    )
+
+    topk = [{
+        "class_id": int(idx.item()), "score": float(val.item())
+    } for idx, val in zip(indices[0], values[0])]
+
+    yield {
+        "image_id": image_id,
+        "model_name": self.model_name,
+        "topk": json.dumps(topk),
+        "infer_ts_ms": now_millis(),
+    }
+
+
+# ============ Args & Helpers ============
+
+
+def parse_known_args(argv):
+  parser = argparse.ArgumentParser()
+  # I/O & runtime
+  parser.add_argument(
+      '--mode', default='streaming', choices=['streaming', 'batch'])
+  parser.add_argument(
+      '--output_table',
+      required=True,
+      help='BigQuery output table: dataset.table')
+  parser.add_argument(
+      '--publish_to_big_query', default='true', choices=['true', 'false'])
+  parser.add_argument(
+      '--input_mode', default='gcs_uris', choices=['gcs_uris', 'bytes'])
+  parser.add_argument(
+      '--input',
+      required=True,
+      help='GCS path to file with URIs (for load) OR unused for bytes')
+  parser.add_argument(
+      '--pubsub_topic',
+      default='projects/apache-beam-testing/topics/images_topic')
+  parser.add_argument(
+      '--pubsub_subscription',
+      default='projects/apache-beam-testing/subscriptions/images_subscription')
+  parser.add_argument(
+      '--rate_limit',
+      type=float,
+      default=None,
+      help='Elements per second for load pipeline')
+
+  # Model & inference
+  parser.add_argument(
+      '--pretrained_model_name',
+      default='efficientnet_b0',
+      help='OSS model name (e.g., efficientnet_b0|mobilenetv3_large_100)')
+  parser.add_argument(
+      '--model_state_dict_path',
+      default=None,
+      help='Optional state_dict to load')
+  parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU'])
+  parser.add_argument('--image_size', type=int, default=224)
+  parser.add_argument('--top_k', type=int, default=5)
+  parser.add_argument(
+      '--inference_batch_size',
+      default='auto',
+      help='int or "auto"; auto tries 64→32→16')
+
+  # Windows
+  parser.add_argument('--window_sec', type=int, default=60)
+  parser.add_argument('--trigger_proc_time_sec', type=int, default=30)
+
+  # Dedup
+  parser.add_argument(
+      '--enable_dedup', default='false', choices=['true', 'false'])
+
+  known_args, pipeline_args = parser.parse_known_args(argv)
+  return known_args, pipeline_args
+
+
+def ensure_pubsub_resources(
+    project: str, topic_path: str, subscription_path: str):
+  publisher = pubsub_v1.PublisherClient()
+  subscriber = pubsub_v1.SubscriberClient()
+
+  topic_name = topic_path.split("/")[-1]
+  subscription_name = subscription_path.split("/")[-1]
+
+  full_topic_path = publisher.topic_path(project, topic_name)
+  full_subscription_path = subscriber.subscription_path(
+      project, subscription_name)
+
+  try:
+    publisher.get_topic(request={"topic": full_topic_path})
+  except Exception:
+    publisher.create_topic(name=full_topic_path)
+
+  try:
+    subscriber.get_subscription(
+        request={"subscription": full_subscription_path})
+  except Exception:
+    subscriber.create_subscription(
+        name=full_subscription_path, topic=full_topic_path)
+
+
+def cleanup_pubsub_resources(
+    project: str, topic_path: str, subscription_path: str):
+  publisher = pubsub_v1.PublisherClient()
+  subscriber = pubsub_v1.SubscriberClient()
+
+  topic_name = topic_path.split("/")[-1]
+  subscription_name = subscription_path.split("/")[-1]
+
+  full_topic_path = publisher.topic_path(project, topic_name)
+  full_subscription_path = subscriber.subscription_path(
+      project, subscription_name)
+
+  try:
+    subscriber.delete_subscription(
+        request={"subscription": full_subscription_path})
+    print(f"Deleted subscription: {subscription_name}")
+  except Exception as e:
+    print(f"Failed to delete subscription: {e}")
+
+  try:
+    publisher.delete_topic(request={"topic": full_topic_path})
+    print(f"Deleted topic: {topic_name}")
+  except Exception as e:
+    print(f"Failed to delete topic: {e}")
+
+
+def override_or_add(args, flag, value):
+  if flag in args:
+    idx = args.index(flag)
+    args[idx + 1] = str(value)
+  else:
+    args.extend([flag, str(value)])
+
+
+# ============ Model factory (timm) ============
+
+
+def create_timm_model(model_name: str, num_classes: int = 1000):
+  import timm
+  model = timm.create_model(
+      model_name, pretrained=True, num_classes=num_classes)
+  model.eval()
+  return model
+
+
+def pick_batch_size(arg: str) -> Optional[int]:
+  if isinstance(arg, str) and arg.lower() == 'auto':
+    return None
+  try:
+    return int(arg)
+  except Exception:
+    return None
+
+
+# ============ Load pipeline ============
+
+
+def run_load_pipeline(known_args, pipeline_args):
+  """Reads GCS file with URIs and publishes them to Pub/Sub (for streaming)."""
+  # enforce smaller/CPU-only defaults for feeder
+  override_or_add(pipeline_args, '--device', 'CPU')
+  override_or_add(pipeline_args, '--num_workers', '5')
+  override_or_add(pipeline_args, '--max_num_workers', '10')
+  override_or_add(
+      pipeline_args, '--job_name', f"images-load-pubsub-{int(time.time())}")
+  override_or_add(pipeline_args, '--project', known_args.project)
+  pipeline_args = [
+      arg for arg in pipeline_args if not arg.startswith("--experiments")
+  ]
+
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline = beam.Pipeline(options=pipeline_options)
+
+  lines = (
+      pipeline
+      |
+      'ReadGCSFile' >> beam.Create(list(read_gcs_file_lines(known_args.input)))
+      | 'FilterEmpty' >> beam.Filter(lambda line: line.strip()))
+  if known_args.rate_limit:
+    lines = lines | 'RateLimit' >> beam.ParDo(
+        RateLimitDoFn(rate_per_sec=known_args.rate_limit))
+
+  _ = (
+      lines
+      | 'ToBytes' >> beam.Map(lambda line: line.encode('utf-8'))
+      |
+      'WriteToPubSub' >> beam.io.WriteToPubSub(topic=known_args.pubsub_topic))
+  return pipeline.run()
+
+
+# ============ Main pipeline ============
+
+
+def run(
+    argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult:
+  known_args, pipeline_args = parse_known_args(argv)
+
+  ensure_pubsub_resources(
+      project=known_args.project,
+      topic_path=known_args.pubsub_topic,
+      subscription_path=known_args.pubsub_subscription)
+
+  if known_args.mode == 'streaming':
+    # Start feeder thread that reads URIs from GCS and fills Pub/Sub.
+    threading.Thread(
+        target=lambda:
+        (time.sleep(900), run_load_pipeline(known_args, pipeline_args)),
+        daemon=True).start()
+
+  # StandardOptions
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
+  pipeline_options.view_as(StandardOptions).streaming = (
+      known_args.mode == 'streaming')
+
+  # Build model handler with right-fitting batch size
+  desired_batch = pick_batch_size(known_args.inference_batch_size)
+  tried = [64, 32, 16] if desired_batch is None else [desired_batch]
+
+  # Device
+  device = 'GPU' if known_args.device.upper() == 'GPU' else 'CPU'
+
+  bs_ok = None
+  last_err = None
+  for bs in tried:
+    try:
+      model_handler = PytorchModelHandlerTensor(
+          model_class=lambda: 
create_timm_model(known_args.pretrained_model_name),
+          model_params={},
+          state_dict_path=known_args.model_state_dict_path,
+          device=device,
+          inference_batch_size=bs
+          if bs is not None else 64,  # start guess for warmup
+      )
+      # quick warmup to validate memory (single dummy tensor)
+      dummy = torch.zeros((3, known_args.image_size, known_args.image_size),
+                          dtype=torch.float32)
+      _ = model_handler.load_model()  # ensures weights on device
+      with torch.no_grad():
+        mdl = model_handler._model
+        mdl(torch.unsqueeze(dummy, 0))
+      bs_ok = bs if bs is not None else 64
+      break
+    except Exception as e:
+      last_err = e
+      logging.warning("Batch size %s failed during warmup: %s", bs, e)
+      continue
+
+  if bs_ok is None:
+    logging.warning(
+        "Falling back to batch_size=8 due to previous errors: %s", last_err)
+    bs_ok = 8
+    model_handler = PytorchModelHandlerTensor(
+        model_class=lambda: create_timm_model(
+            known_args.pretrained_model_name
+        ),
+        model_params={},
+        state_dict_path=known_args.model_state_dict_path,
+        device=device,
+        inference_batch_size=bs_ok,
+    )
+
+  pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
+
+  if known_args.mode == 'batch':
+    pcoll = (
+        pipeline
+        | 'ReadURIsBatch' >> beam.Create(
+            list(read_gcs_file_lines(known_args.input)))
+        | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip()))
+  else:
+    pcoll = (
+        pipeline
+        | 'ReadFromPubSub' >>
+        beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription)
+        | 'DecodeUTF8' >> beam.Map(lambda x: x.decode('utf-8'))
+        | 'Window' >> beam.WindowInto(
+            window.FixedWindows(known_args.window_sec),
+            trigger=beam.trigger.AfterProcessingTime(
+                known_args.trigger_proc_time_sec),
+            accumulation_mode=beam.trigger.AccumulationMode.DISCARDING,
+            allowed_lateness=0))
+
+  keyed = (
+      pcoll
+      | 'MakeKey' >> beam.ParDo(MakeKeyDoFn(input_mode=known_args.input_mode)))
+
+  if known_args.enable_dedup == 'true':
+    keyed = keyed | 'Dedup' >> beam.ParDo(DedupDoFn())
+
+  preprocessed = (
+      keyed
+      | 'DecodePreprocess' >> beam.ParDo(
+          DecodePreprocessDoFn(
+              input_mode=known_args.input_mode,
+              image_size=known_args.image_size)))
+
+  to_infer = (
+      preprocessed
+      | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"])))
+
+  predictions = (
+      to_infer
+      | 'RunInference' >> RunInference(KeyedModelHandler(model_handler)))
+
+  results = (
+      predictions
+      | 'PostProcess' >> beam.ParDo(
+          PostProcessDoFn(
+              top_k=known_args.top_k,
+              model_name=known_args.pretrained_model_name)))
+
+  if known_args.publish_to_big_query == 'true':
+    _ = (
+        results
+        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
+            known_args.output_table,
+            schema=
+            'image_id:STRING, model_name:STRING, topk:STRING, 
infer_ts_ms:INT64',
+            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
+            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+            method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS))
+
+  result = pipeline.run()
+  result.wait_until_finish(duration=1800000)  # 30 min
+  try:
+    result.cancel()
+  except Exception:
+    pass
+
+  cleanup_pubsub_resources(
+      project=known_args.project,
+      topic_path=known_args.pubsub_topic,
+      subscription_path=known_args.pubsub_subscription)
+
+  return result
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  run()
diff --git 
a/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt 
b/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt
new file mode 100644
index 00000000000..2b2916c577e
--- /dev/null
+++ b/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+torch>=2.2.0,<2.8.0
+torchvision>=0.17.0,<0.21.0
+timm>=1.0.7
+Pillow>=10.0.0
+numpy>=1.25.0
+google-cloud-pubsub>=2.15.0
+google-cloud-monitoring>=2.27.0
+protobuf>=4.25.1
+requests>=2.31.0
diff --git 
a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py
 
b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py
new file mode 100644
index 00000000000..1528711a801
--- /dev/null
+++ 
b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pytype: skip-file
+
+import logging
+
+from apache_beam.examples.inference import pytorch_imagenet_rightfit
+from apache_beam.testing.load_tests.dataflow_cost_benchmark import 
DataflowCostBenchmark
+
+
+class PytorchImagenetRightfitBenchmarkTest(DataflowCostBenchmark):
+  def __init__(self):
+    self.metrics_namespace = 'BeamML_PyTorch'
+    super().__init__(
+        metrics_namespace=self.metrics_namespace,
+        pcollection='PostProcess.out0')
+
+  def test(self):
+    extra_opts = {}
+    extra_opts['input'] = self.pipeline.get_option('input_file')
+    self.result = pytorch_imagenet_rightfit.run(
+        self.pipeline.get_full_options_as_args(**extra_opts),
+        test_pipeline=self.pipeline)
+
+
+if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
+  PytorchImagenetRightfitBenchmarkTest().run()
diff --git a/website/www/site/content/en/performance/_index.md 
b/website/www/site/content/en/performance/_index.md
index 138b98391bc..914842e5ab1 100644
--- a/website/www/site/content/en/performance/_index.md
+++ b/website/www/site/content/en/performance/_index.md
@@ -46,6 +46,7 @@ See the following pages for performance measures recorded 
when running various B
 ## Streaming
 
 - [PyTorch Sentiment Analysis Streaming DistilBERT 
base](/performance/pytorchbertsentimentstreaming)
+- [PyTorch Image Classification EfficientNet-B0 Streaming 
(Right-fitting)](/performance/pytorchimagenetrightfit)
 
 ## Batch
 
diff --git 
a/website/www/site/content/en/performance/pytorchimagenetrightfit/_index.md 
b/website/www/site/content/en/performance/pytorchimagenetrightfit/_index.md
new file mode 100644
index 00000000000..8ed16690a6a
--- /dev/null
+++ b/website/www/site/content/en/performance/pytorchimagenetrightfit/_index.md
@@ -0,0 +1,44 @@
+---
+title: "PyTorch Language Modeling BERT base Performance"
+---
+
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting)
+
+**Model**: PyTorch Image Classification — EfficientNet-B0 (pretrained on 
ImageNet)
+**Accelerator**: Tesla T4 GPU (right-fitted batch size 64 → 32 → 16 → 8)
+**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM)
+
+This streaming pipeline performs image classification using an open-source 
PyTorch EfficientNet-B0 model optimized for T4 GPUs.
+It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, 
and runs inference with adaptive batch sizing for optimal GPU utilization.
+The pipeline ensures exactly-once semantics via stateful deduplication and 
idempotent BigQuery writes, allowing stable and reproducible performance 
measurements under continuous load.
+
+The following graphs show various metrics when running PyTorch Sentiment 
Analysis Streaming using Hugging Face DistilBERT base uncased model pipeline.
+See the [glossary](/performance/glossary) for definitions.
+
+Full pipeline implementation is available 
[here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py).
+
+## What is the estimated cost to run the pipeline?
+
+{{< performance_looks io="pytorchimagenetrightfit" read_or_write="write" 
section="cost" >}}
+
+## How has various metrics changed when running the pipeline for different 
Beam SDK versions?
+
+{{< performance_looks io="pytorchimagenetrightfit" read_or_write="write" 
section="version" >}}
+
+## How has various metrics changed over time when running the pipeline?
+
+{{< performance_looks io="pytorchimagenetrightfit" read_or_write="write" 
section="date" >}}
diff --git a/website/www/site/data/performance.yaml 
b/website/www/site/data/performance.yaml
index 8f561fda49e..2ba5c2efc62 100644
--- a/website/www/site/data/performance.yaml
+++ b/website/www/site/data/performance.yaml
@@ -250,6 +250,22 @@ looks:
           title: AvgThroughputBytesPerSec by Version
         - id: dKyJy5ZKhkBdSTXRY3wZR6fXzptSs2qm
           title: AvgThroughputElementsPerSec by Version
+  pytorchimagenetrightfit:
+    write:
+      folder: 92
+      cost:
+        - id: zJhxrMmxJ3zVHH5WZnDQqcBHdFDrBhxK
+          title: RunTime and EstimatedCost
+      date:
+        - id: RybzxZdkXJg6PzQZkBcfxTJkByTf3ZV5
+          title: AvgThroughputBytesPerSec by Date
+        - id: xTVYPytQVH7zXYz7SvphnRV4nQcxCddp
+          title: AvgThroughputElementsPerSec by Date
+      version:
+        - id: dGN6Zr6rh7DfnRtTDCN6GHcNfhSkrbCq
+          title: AvgThroughputBytesPerSec by Version
+        - id: VJMWrZh3jXk2mqCZk4NQn3tBrHgGWqnC
+          title: AvgThroughputElementsPerSec by Version
   pytorchimageobjectdetection:
     write:
       folder: #TODO

Reply via email to