This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch fix-flink-cogbk
in repository https://gitbox.apache.org/repos/asf/beam.git

commit ac53131212720ffe02399c5bcd98eb994c8ed858
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Thu Oct 9 19:33:35 2025 +0400

    Fix Python CoGBK Flink Batch config
---
 .../workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml  | 11 +++++++----
 ...txt => python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt} |  3 ++-
 ...ey.txt => python_CoGBK_Flink_Batch_100b_Single_Key.txt} |  3 ++-
 ...nk_Batch_10kB.txt => python_CoGBK_Flink_Batch_10kB.txt} |  3 ++-
 .../python/apache_beam/runners/portability/prism_runner.py | 14 +++++++++++++-
 5 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml 
b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
index 2c0c61007cd..898f4ec73e4 100644
--- a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
+++ b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
@@ -89,9 +89,9 @@ jobs:
           test-type: load
           test-language: python
           argument-file-paths: |
-            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
-            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
-            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
       - name: Start Flink with parallelism 5
         env:
           FLINK_NUM_WORKERS: 5
@@ -108,6 +108,7 @@ jobs:
           gradle-command: :sdks:python:apache_beam:testing:load_tests:run
           arguments: |
             --info \
+            -PpythonVersion=3.9 \
             
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
             -Prunner=FlinkRunner \
             '-PloadTest.args=${{ 
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_1 }} 
--job_name=load-tests-python-flink-batch-cogbk-1-${{ 
steps.datetime.outputs.datetime }}' \
@@ -117,6 +118,7 @@ jobs:
           gradle-command: :sdks:python:apache_beam:testing:load_tests:run
           arguments: |
             --info \
+            -PpythonVersion=3.9 \
             
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
             -Prunner=FlinkRunner \
             '-PloadTest.args=${{ 
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} 
--job_name=load-tests-python-flink-batch-cogbk-2-${{ 
steps.datetime.outputs.datetime }}' \
@@ -126,10 +128,11 @@ jobs:
           gradle-command: :sdks:python:apache_beam:testing:load_tests:run
           arguments: |
             --info \
+            -PpythonVersion=3.9 \
             
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
             -Prunner=FlinkRunner \
             '-PloadTest.args=${{ 
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} 
--job_name=load-tests-python-flink-batch-cogbk-3-${{ 
steps.datetime.outputs.datetime }}' \
       - name: Teardown Flink
         if: always()
         run: |
-          ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
\ No newline at end of file
+          ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
diff --git 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
similarity index 92%
rename from 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
rename to 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
index 4b8a2f72010..8c05f5c215d 100644
--- 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
@@ -23,6 +23,7 @@
 
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
 --iterations=1
 --parallelism=5
+--runner=FlinkRunner
 --endpoint=localhost:8099
 --environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
similarity index 92%
rename from 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
rename to 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
index 3aeb927f04e..92db5e55a6a 100644
--- 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
@@ -23,6 +23,7 @@
 
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
 --iterations=1
 --parallelism=5
+--runner=FlinkRunner
 --endpoint=localhost:8099
 --environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
similarity index 92%
rename from 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
rename to 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
index e350e2d2994..328e8f2b016 100644
--- 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
@@ -23,6 +23,7 @@
 
--co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
 --iterations=4
 --parallelism=5
+--runner=FlinkRunner
 --endpoint=localhost:8099
 --environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py 
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index bc5d8c2a613..1c60fa3ee01 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -28,6 +28,7 @@ import json
 import logging
 import os
 import platform
+import re
 import shutil
 import stat
 import subprocess
@@ -121,7 +122,18 @@ class PrismRunnerLogFilter(logging.Filter):
       try:
         message = record.getMessage()
         json_record = json.loads(message)
-        record.levelno = getattr(logging, json_record["level"])
+        level_str = json_record["level"]
+        # Example level with offset: 'ERROR+2'
+        if "+" in level_str or "-" in level_str:
+          match = re.match(r"([A-Z]+)([+-]\d+)", level_str)
+          if match:
+            base, offset = match.groups()
+            base_level = getattr(logging, base, logging.INFO)
+            record.levelno = base_level + int(offset)
+          else:
+            record.levelno = getattr(logging, level_str, logging.INFO)
+        else:
+          record.levelno = getattr(logging, level_str, logging.INFO)
         record.levelname = logging.getLevelName(record.levelno)
         if "source" in json_record:
           record.funcName = json_record["source"]["function"]

Reply via email to