This is an automated email from the ASF dual-hosted git repository.
vterentev pushed a commit to branch oss-image-classification
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/oss-image-classification by
this push:
new 780bbe0503a Refactoring
780bbe0503a is described below
commit 780bbe0503a0207b5672832683e4df7dd813815c
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Mon Nov 10 11:58:44 2025 +0400
Refactoring
---
.../examples/inference/pytorch_imagenet_rightfit.py | 11 +++--------
1 file changed, 3 insertions(+), 8 deletions(-)
diff --git
a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
index 185727883e8..51da89bfedc 100644
--- a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
+++ b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py
@@ -292,7 +292,7 @@ def pick_batch_size(arg: str) -> Optional[int]:
return None
-# ============ Load pipeline (optional) ============
+# ============ Load pipeline ============
def run_load_pipeline(known_args, pipeline_args):
"""Reads GCS file with URIs and publishes them to Pub/Sub (for streaming
mode)."""
@@ -331,7 +331,7 @@ def run(argv=None, save_main_session=True,
test_pipeline=None) -> PipelineResult
# If streaming -> start feeder thread that reads URIs from GCS and fills
Pub/Sub.
if known_args.mode == 'streaming':
threading.Thread(
- target=lambda: (time.sleep(60), run_load_pipeline(known_args,
pipeline_args)),
+ target=lambda: (time.sleep(300), run_load_pipeline(known_args,
pipeline_args)),
daemon=True).start()
# StandardOptions
@@ -372,7 +372,6 @@ def run(argv=None, save_main_session=True,
test_pipeline=None) -> PipelineResult
continue
if bs_ok is None:
- # Fallback minimal
logging.warning("Falling back to batch_size=8 due to previous errors: %s",
last_err)
bs_ok = 8
model_handler = PytorchModelHandlerTensor(
@@ -383,18 +382,15 @@ def run(argv=None, save_main_session=True,
test_pipeline=None) -> PipelineResult
inference_batch_size=bs_ok,
)
- tokenizer = None # not used, kept for parity with your style
+ tokenizer = None
pipeline = test_pipeline or beam.Pipeline(options=pipeline_options)
- # Input PCollection
if known_args.mode == 'batch':
- # Batch: read URIs from a file and process
pcoll = (pipeline
| 'ReadURIsBatch' >>
beam.Create(list(read_gcs_file_lines(known_args.input)))
| 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip()))
else:
- # Streaming: subscription → decode → windowing
pcoll = (pipeline
| 'ReadFromPubSub' >>
beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription)
| 'DecodeUTF8' >> beam.Map(lambda x: x.decode('utf-8'))
@@ -415,7 +411,6 @@ def run(argv=None, save_main_session=True,
test_pipeline=None) -> PipelineResult
input_mode=known_args.input_mode,
image_size=known_args.image_size)))
- # Map to (key, tensor) for KeyedModelHandler
to_infer = (preprocessed
| 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0],
kv[1]["tensor"])))