This is an automated email from the ASF dual-hosted git repository.

vterentev pushed a commit to branch fix-flink-cogbk
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/fix-flink-cogbk by this push:
     new 2cedc3ad192 Fix Python CoGBK Flink Batch config
     new 2908f98c39e Merge remote-tracking branch 'origin/fix-flink-cogbk' into 
fix-flink-cogbk
2cedc3ad192 is described below

commit 2cedc3ad1923245332f56adfc3fe5be5a9259f97
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Thu Oct 9 19:33:35 2025 +0400

    Fix Python CoGBK Flink Batch config
---
 .../beam_LoadTests_Python_CoGBK_Flink_Batch.yml         | 17 ++++++++++-------
 .github/workflows/beam_Publish_Docker_Snapshots.yml     |  2 +-
 ... => python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt} | 10 +++++-----
 ...txt => python_CoGBK_Flink_Batch_100b_Single_Key.txt} | 10 +++++-----
 ...Batch_10kB.txt => python_CoGBK_Flink_Batch_10kB.txt} | 10 +++++-----
 .../apache_beam/runners/portability/prism_runner.py     | 14 +++++++++++++-
 6 files changed, 39 insertions(+), 24 deletions(-)

diff --git a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml 
b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
index 2c0c61007cd..c40dd567826 100644
--- a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
+++ b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
@@ -89,9 +89,9 @@ jobs:
           test-type: load
           test-language: python
           argument-file-paths: |
-            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
-            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
-            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
+            ${{ github.workspace 
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
       - name: Start Flink with parallelism 5
         env:
           FLINK_NUM_WORKERS: 5
@@ -108,8 +108,9 @@ jobs:
           gradle-command: :sdks:python:apache_beam:testing:load_tests:run
           arguments: |
             --info \
+            -PpythonVersion=3.9 \
             
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
-            -Prunner=FlinkRunner \
+            -Prunner=PortableRunner \
             '-PloadTest.args=${{ 
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_1 }} 
--job_name=load-tests-python-flink-batch-cogbk-1-${{ 
steps.datetime.outputs.datetime }}' \
       - name: run CoGBK 2GB of 100B records with multiple keys
         uses: ./.github/actions/gradle-command-self-hosted-action
@@ -117,8 +118,9 @@ jobs:
           gradle-command: :sdks:python:apache_beam:testing:load_tests:run
           arguments: |
             --info \
+            -PpythonVersion=3.9 \
             
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
-            -Prunner=FlinkRunner \
+            -Prunner=PortableRunner \
             '-PloadTest.args=${{ 
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} 
--job_name=load-tests-python-flink-batch-cogbk-2-${{ 
steps.datetime.outputs.datetime }}' \
       - name: run CoGBK reiterate 4 times 10kB values
         uses: ./.github/actions/gradle-command-self-hosted-action
@@ -126,10 +128,11 @@ jobs:
           gradle-command: :sdks:python:apache_beam:testing:load_tests:run
           arguments: |
             --info \
+            -PpythonVersion=3.9 \
             
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
-            -Prunner=FlinkRunner \
+            -Prunner=PortableRunner \
             '-PloadTest.args=${{ 
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} 
--job_name=load-tests-python-flink-batch-cogbk-3-${{ 
steps.datetime.outputs.datetime }}' \
       - name: Teardown Flink
         if: always()
         run: |
-          ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
\ No newline at end of file
+          ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
diff --git a/.github/workflows/beam_Publish_Docker_Snapshots.yml 
b/.github/workflows/beam_Publish_Docker_Snapshots.yml
index ad3f0da2296..098e06e447c 100644
--- a/.github/workflows/beam_Publish_Docker_Snapshots.yml
+++ b/.github/workflows/beam_Publish_Docker_Snapshots.yml
@@ -83,7 +83,7 @@ jobs:
           arguments: |
             
-Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \
             -Pdocker-tag-list=${{ github.sha }}${LATEST_TAG}
-      - name: run Publish Docker Snapshots script for Flink
+      - name: run Publish Docker Snapshots script for Flink 1.17
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
           gradle-command: :runners:flink:1.17:job-server-container:dockerPush
diff --git 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
similarity index 74%
rename from 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
rename to 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
index 4b8a2f72010..6e26ee72a77 100644
--- 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
@@ -14,15 +14,15 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
---temp_location=gs://temp-storage-for-perf-tests/loadtests
 --publish_to_big_query=true
 --metrics_dataset=load_test
 --metrics_table=python_flink_batch_cogbk_2
 --influx_measurement=python_batch_cogbk_2
---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
+--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
+--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
 --iterations=1
 --parallelism=5
---endpoint=localhost:8099
+--runner=PortableRunner
+--job_endpoint=localhost:8099
 --environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
similarity index 69%
rename from 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
rename to 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
index 3aeb927f04e..e1df7e3fd5f 100644
--- 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
@@ -14,15 +14,15 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
---temp_location=gs://temp-storage-for-perf-tests/loadtests
 --publish_to_big_query=true
 --metrics_dataset=load_test
 --metrics_table=python_flink_batch_cogbk_1
 --influx_measurement=python_batch_cogbk_1
---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
+--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
+--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":100,\\"hot_key_fraction\\":1}''
 --iterations=1
 --parallelism=5
---endpoint=localhost:8099
+--runner=PortableRunner
+--job_endpoint=localhost:8099
 --environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
similarity index 69%
rename from 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
rename to 
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
index e350e2d2994..b1f95027c9d 100644
--- 
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
+++ 
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
@@ -14,15 +14,15 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
---temp_location=gs://temp-storage-for-perf-tests/loadtests
 --publish_to_big_query=true
 --metrics_dataset=load_test
 --metrics_table=python_flink_batch_cogbk_3
 --influx_measurement=python_batch_cogbk_3
---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
+--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
+--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
 --iterations=4
 --parallelism=5
---endpoint=localhost:8099
+--runner=PortableRunner
+--job_endpoint=localhost:8099
 --environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py 
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index bc5d8c2a613..1c60fa3ee01 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -28,6 +28,7 @@ import json
 import logging
 import os
 import platform
+import re
 import shutil
 import stat
 import subprocess
@@ -121,7 +122,18 @@ class PrismRunnerLogFilter(logging.Filter):
       try:
         message = record.getMessage()
         json_record = json.loads(message)
-        record.levelno = getattr(logging, json_record["level"])
+        level_str = json_record["level"]
+        # Example level with offset: 'ERROR+2'
+        if "+" in level_str or "-" in level_str:
+          match = re.match(r"([A-Z]+)([+-]\d+)", level_str)
+          if match:
+            base, offset = match.groups()
+            base_level = getattr(logging, base, logging.INFO)
+            record.levelno = base_level + int(offset)
+          else:
+            record.levelno = getattr(logging, level_str, logging.INFO)
+        else:
+          record.levelno = getattr(logging, level_str, logging.INFO)
         record.levelname = logging.getLevelName(record.levelno)
         if "source" in json_record:
           record.funcName = json_record["source"]["function"]

Reply via email to