This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch fix-postcommit-python
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 24df28ae72d50cf04e3079bd728601c42f1d9ad3
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Thu Jul 24 16:33:38 2025 +0400

    Fix debezium port type, fix gemini post processor fn
---
 .../examples/inference/gemini_text_classification.py       | 14 ++++++++++++--
 sdks/python/apache_beam/io/debezium.py                     |  2 +-
 2 files changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/sdks/python/apache_beam/examples/inference/gemini_text_classification.py 
b/sdks/python/apache_beam/examples/inference/gemini_text_classification.py
index e82f407374a..b3ae712f9ab 100644
--- a/sdks/python/apache_beam/examples/inference/gemini_text_classification.py
+++ b/sdks/python/apache_beam/examples/inference/gemini_text_classification.py
@@ -67,8 +67,18 @@ def parse_known_args(argv):
 
 class PostProcessor(beam.DoFn):
   def process(self, element: PredictionResult) -> Iterable[str]:
-    yield "Input: " + str(element.example) + " Output: " + str(
-        element.inference[1][0].content.parts[0].text)
+
+    inference = getattr(element, "inference", None)
+
+    if hasattr(inference[1], "content"):
+      yield inference[1].content.parts[0].text
+      return
+
+    if isinstance(inference[1], (tuple, list)) and len(inference) > 1:
+      yield "Input: " + str(element.example) + " Output: " + str(
+          inference[1][0].content.parts[0].text)
+    else:
+      yield "Can't decode inference for element: " + str(element.example)
 
 
 def run(
diff --git a/sdks/python/apache_beam/io/debezium.py 
b/sdks/python/apache_beam/io/debezium.py
index 9e93801852c..26516fa4e4b 100644
--- a/sdks/python/apache_beam/io/debezium.py
+++ b/sdks/python/apache_beam/io/debezium.py
@@ -155,7 +155,7 @@ class ReadFromDebezium(PTransform):
         username=username,
         password=password,
         host=host,
-        port=port,
+        port=str(port),
         max_number_of_records=max_number_of_records,
         connection_properties=connection_properties)
     self.expansion_service = expansion_service or 
default_io_expansion_service()

Reply via email to