This is an automated email from the ASF dual-hosted git repository. vterentev pushed a commit to branch fix-flink-cogbk in repository https://gitbox.apache.org/repos/asf/beam.git
commit ac53131212720ffe02399c5bcd98eb994c8ed858 Author: Vitaly Terentyev <[email protected]> AuthorDate: Thu Oct 9 19:33:35 2025 +0400 Fix Python CoGBK Flink Batch config --- .../workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml | 11 +++++++---- ...txt => python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt} | 3 ++- ...ey.txt => python_CoGBK_Flink_Batch_100b_Single_Key.txt} | 3 ++- ...nk_Batch_10kB.txt => python_CoGBK_Flink_Batch_10kB.txt} | 3 ++- .../python/apache_beam/runners/portability/prism_runner.py | 14 +++++++++++++- 5 files changed, 26 insertions(+), 8 deletions(-) diff --git a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml index 2c0c61007cd..898f4ec73e4 100644 --- a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml +++ b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml @@ -89,9 +89,9 @@ jobs: test-type: load test-language: python argument-file-paths: | - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt - ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt - name: Start Flink with parallelism 5 env: FLINK_NUM_WORKERS: 5 @@ -108,6 +108,7 @@ jobs: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | --info \ + -PpythonVersion=3.9 \ -PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \ -Prunner=FlinkRunner \ '-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_1 }} --job_name=load-tests-python-flink-batch-cogbk-1-${{ steps.datetime.outputs.datetime }}' \ @@ -117,6 +118,7 @@ jobs: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | --info \ + -PpythonVersion=3.9 \ -PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \ -Prunner=FlinkRunner \ '-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-2-${{ steps.datetime.outputs.datetime }}' \ @@ -126,10 +128,11 @@ jobs: gradle-command: :sdks:python:apache_beam:testing:load_tests:run arguments: | --info \ + -PpythonVersion=3.9 \ -PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \ -Prunner=FlinkRunner \ '-PloadTest.args=${{ env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }} --job_name=load-tests-python-flink-batch-cogbk-3-${{ steps.datetime.outputs.datetime }}' \ - name: Teardown Flink if: always() run: | - ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete \ No newline at end of file + ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete diff --git a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt similarity index 92% rename from .github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt rename to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt index 4b8a2f72010..8c05f5c215d 100644 --- a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt +++ b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt @@ -23,6 +23,7 @@ --co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}'' --iterations=1 --parallelism=5 +--runner=FlinkRunner --endpoint=localhost:8099 --environment_type=DOCKER ---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest \ No newline at end of file +--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest diff --git a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt similarity index 92% rename from .github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt rename to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt index 3aeb927f04e..92db5e55a6a 100644 --- a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt +++ b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt @@ -23,6 +23,7 @@ --co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}'' --iterations=1 --parallelism=5 +--runner=FlinkRunner --endpoint=localhost:8099 --environment_type=DOCKER ---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest \ No newline at end of file +--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest diff --git a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt similarity index 92% rename from .github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt rename to .github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt index e350e2d2994..328e8f2b016 100644 --- a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt +++ b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt @@ -23,6 +23,7 @@ --co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}'' --iterations=4 --parallelism=5 +--runner=FlinkRunner --endpoint=localhost:8099 --environment_type=DOCKER ---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest \ No newline at end of file +--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py b/sdks/python/apache_beam/runners/portability/prism_runner.py index bc5d8c2a613..1c60fa3ee01 100644 --- a/sdks/python/apache_beam/runners/portability/prism_runner.py +++ b/sdks/python/apache_beam/runners/portability/prism_runner.py @@ -28,6 +28,7 @@ import json import logging import os import platform +import re import shutil import stat import subprocess @@ -121,7 +122,18 @@ class PrismRunnerLogFilter(logging.Filter): try: message = record.getMessage() json_record = json.loads(message) - record.levelno = getattr(logging, json_record["level"]) + level_str = json_record["level"] + # Example level with offset: 'ERROR+2' + if "+" in level_str or "-" in level_str: + match = re.match(r"([A-Z]+)([+-]\d+)", level_str) + if match: + base, offset = match.groups() + base_level = getattr(logging, base, logging.INFO) + record.levelno = base_level + int(offset) + else: + record.levelno = getattr(logging, level_str, logging.INFO) + else: + record.levelno = getattr(logging, level_str, logging.INFO) record.levelname = logging.getLevelName(record.levelno) if "source" in json_record: record.funcName = json_record["source"]["function"]
