This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch oss-image-classification
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/oss-image-classification by 
this push:
     new 780bbe0503a Refactoring
780bbe0503a is described below

commit 780bbe0503a0207b5672832683e4df7dd813815c
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Mon Nov 10 11:58:44 2025 +0400

    Refactoring
---
 .../examples/inference/pytorch_imagenet_rightfit.py           | 11 +++--------
 1 file changed, 3 insertions(+), 8 deletions(-)

diff --git 
a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py 
b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
index 185727883e8..51da89bfedc 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
@@ -292,7 +292,7 @@ def pick_batch_size(arg: str) -> Optional[int]:
     return None
 
 
-# ============ Load pipeline (optional) ============
+# ============ Load pipeline ============
 
 def run_load_pipeline(known_args, pipeline_args):
   """Reads GCS file with URIs and publishes them to Pub/Sub (for streaming 
mode)."""
@@ -331,7 +331,7 @@ def run(argv=None, save_main_session=True, 
test_pipeline=None) -> PipelineResult
   # If streaming -> start feeder thread that reads URIs from GCS and fills 
Pub/Sub.
   if known_args.mode == 'streaming':
     threading.Thread(
-      target=lambda: (time.sleep(60), run_load_pipeline(known_args, 
pipeline_args)),
+      target=lambda: (time.sleep(300), run_load_pipeline(known_args, 
pipeline_args)),
       daemon=True).start()
 
   # StandardOptions
@@ -372,7 +372,6 @@ def run(argv=None, save_main_session=True, 
test_pipeline=None) -> PipelineResult
       continue
 
   if bs_ok is None:
-    # Fallback minimal
     logging.warning("Falling back to batch_size=8 due to previous errors: %s", 
last_err)
     bs_ok = 8
     model_handler = PytorchModelHandlerTensor(
@@ -383,18 +382,15 @@ def run(argv=None, save_main_session=True, 
test_pipeline=None) -> PipelineResult
       inference_batch_size=bs_ok,
     )
 
-  tokenizer = None  # not used, kept for parity with your style
+  tokenizer = None
 
   pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
 
-  # Input PCollection
   if known_args.mode == 'batch':
-    # Batch: read URIs from a file and process
     pcoll = (pipeline
              | 'ReadURIsBatch' >> 
beam.Create(list(read_gcs_file_lines(known_args.input)))
              | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip()))
   else:
-    # Streaming: subscription → decode → windowing
     pcoll = (pipeline
              | 'ReadFromPubSub' >> 
beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription)
              | 'DecodeUTF8' >> beam.Map(lambda x: x.decode('utf-8'))
@@ -415,7 +411,6 @@ def run(argv=None, save_main_session=True, 
test_pipeline=None) -> PipelineResult
         input_mode=known_args.input_mode,
         image_size=known_args.image_size)))
 
-  # Map to (key, tensor) for KeyedModelHandler
   to_infer = (preprocessed
               | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], 
kv[1]["tensor"])))
 

Reply via email to