This is an automated email from the ASF dual-hosted git repository.
vterentev pushed a commit to branch fix-flink-cogbk
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/fix-flink-cogbk by this push:
new 2cedc3ad192 Fix Python CoGBK Flink Batch config
new 2908f98c39e Merge remote-tracking branch 'origin/fix-flink-cogbk' into
fix-flink-cogbk
2cedc3ad192 is described below
commit 2cedc3ad1923245332f56adfc3fe5be5a9259f97
Author: Vitaly Terentyev <[email protected]>
AuthorDate: Thu Oct 9 19:33:35 2025 +0400
Fix Python CoGBK Flink Batch config
---
.../beam_LoadTests_Python_CoGBK_Flink_Batch.yml | 17 ++++++++++-------
.github/workflows/beam_Publish_Docker_Snapshots.yml | 2 +-
... => python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt} | 10 +++++-----
...txt => python_CoGBK_Flink_Batch_100b_Single_Key.txt} | 10 +++++-----
...Batch_10kB.txt => python_CoGBK_Flink_Batch_10kB.txt} | 10 +++++-----
.../apache_beam/runners/portability/prism_runner.py | 14 +++++++++++++-
6 files changed, 39 insertions(+), 24 deletions(-)
diff --git a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
index 2c0c61007cd..c40dd567826 100644
--- a/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
+++ b/.github/workflows/beam_LoadTests_Python_CoGBK_Flink_Batch.yml
@@ -89,9 +89,9 @@ jobs:
test-type: load
test-language: python
argument-file-paths: |
- ${{ github.workspace
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
- ${{ github.workspace
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
- ${{ github.workspace
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
+ ${{ github.workspace
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
+ ${{ github.workspace
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
+ ${{ github.workspace
}}/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
- name: Start Flink with parallelism 5
env:
FLINK_NUM_WORKERS: 5
@@ -108,8 +108,9 @@ jobs:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
--info \
+ -PpythonVersion=3.9 \
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
- -Prunner=FlinkRunner \
+ -Prunner=PortableRunner \
'-PloadTest.args=${{
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_1 }}
--job_name=load-tests-python-flink-batch-cogbk-1-${{
steps.datetime.outputs.datetime }}' \
- name: run CoGBK 2GB of 100B records with multiple keys
uses: ./.github/actions/gradle-command-self-hosted-action
@@ -117,8 +118,9 @@ jobs:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
--info \
+ -PpythonVersion=3.9 \
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
- -Prunner=FlinkRunner \
+ -Prunner=PortableRunner \
'-PloadTest.args=${{
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }}
--job_name=load-tests-python-flink-batch-cogbk-2-${{
steps.datetime.outputs.datetime }}' \
- name: run CoGBK reiterate 4 times 10kB values
uses: ./.github/actions/gradle-command-self-hosted-action
@@ -126,10 +128,11 @@ jobs:
gradle-command: :sdks:python:apache_beam:testing:load_tests:run
arguments: |
--info \
+ -PpythonVersion=3.9 \
-PloadTest.mainClass=apache_beam.testing.load_tests.co_group_by_key_test \
- -Prunner=FlinkRunner \
+ -Prunner=PortableRunner \
'-PloadTest.args=${{
env.beam_LoadTests_Python_CoGBK_Flink_Batch_test_arguments_2 }}
--job_name=load-tests-python-flink-batch-cogbk-3-${{
steps.datetime.outputs.datetime }}' \
- name: Teardown Flink
if: always()
run: |
- ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
\ No newline at end of file
+ ${{ github.workspace }}/.test-infra/dataproc/flink_cluster.sh delete
diff --git a/.github/workflows/beam_Publish_Docker_Snapshots.yml
b/.github/workflows/beam_Publish_Docker_Snapshots.yml
index ad3f0da2296..098e06e447c 100644
--- a/.github/workflows/beam_Publish_Docker_Snapshots.yml
+++ b/.github/workflows/beam_Publish_Docker_Snapshots.yml
@@ -83,7 +83,7 @@ jobs:
arguments: |
-Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \
-Pdocker-tag-list=${{ github.sha }}${LATEST_TAG}
- - name: run Publish Docker Snapshots script for Flink
+ - name: run Publish Docker Snapshots script for Flink 1.17
uses: ./.github/actions/gradle-command-self-hosted-action
with:
gradle-command: :runners:flink:1.17:job-server-container:dockerPush
diff --git
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
similarity index 74%
rename from
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
rename to
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
index 4b8a2f72010..6e26ee72a77 100644
---
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Multiple_Keys.txt
+++
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Multiple_Keys.txt
@@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
---temp_location=gs://temp-storage-for-perf-tests/loadtests
--publish_to_big_query=true
--metrics_dataset=load_test
--metrics_table=python_flink_batch_cogbk_2
--influx_measurement=python_batch_cogbk_2
---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
+--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
+--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":5,\\"hot_key_fraction\\":1}''
--iterations=1
--parallelism=5
---endpoint=localhost:8099
+--runner=PortableRunner
+--job_endpoint=localhost:8099
--environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
similarity index 69%
rename from
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
rename to
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
index 3aeb927f04e..e1df7e3fd5f 100644
---
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_100b_Single_Key.txt
+++
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_100b_Single_Key.txt
@@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
---temp_location=gs://temp-storage-for-perf-tests/loadtests
--publish_to_big_query=true
--metrics_dataset=load_test
--metrics_table=python_flink_batch_cogbk_1
--influx_measurement=python_batch_cogbk_1
---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
+--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1,\\"hot_key_fraction\\":1}''
+--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":100,\\"hot_key_fraction\\":1}''
--iterations=1
--parallelism=5
---endpoint=localhost:8099
+--runner=PortableRunner
+--job_endpoint=localhost:8099
--environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
similarity index 69%
rename from
.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
rename to
.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
index e350e2d2994..b1f95027c9d 100644
---
a/.github/workflows/load-tests-pipeline-options/python_CoGBK_Dataflow_Flink_Batch_10kB.txt
+++
b/.github/workflows/load-tests-pipeline-options/python_CoGBK_Flink_Batch_10kB.txt
@@ -14,15 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
---temp_location=gs://temp-storage-for-perf-tests/loadtests
--publish_to_big_query=true
--metrics_dataset=load_test
--metrics_table=python_flink_batch_cogbk_3
--influx_measurement=python_batch_cogbk_3
---input_options=''{\\"num_records\\":20000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
---co_input_options=''{\\"num_records\\":2000000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
+--input_options=''{\\"num_records\\":200000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":200000,\\"hot_key_fraction\\":1}''
+--co_input_options=''{\\"num_records\\":20000,\\"key_size\\":10,\\"value_size\\":90,\\"num_hot_keys\\":1000,\\"hot_key_fraction\\":1}''
--iterations=4
--parallelism=5
---endpoint=localhost:8099
+--runner=PortableRunner
+--job_endpoint=localhost:8099
--environment_type=DOCKER
---environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_go_sdk:latest
\ No newline at end of file
+--environment_config=gcr.io/apache-beam-testing/beam-sdk/beam_python3.9_sdk:latest
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index bc5d8c2a613..1c60fa3ee01 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -28,6 +28,7 @@ import json
import logging
import os
import platform
+import re
import shutil
import stat
import subprocess
@@ -121,7 +122,18 @@ class PrismRunnerLogFilter(logging.Filter):
try:
message = record.getMessage()
json_record = json.loads(message)
- record.levelno = getattr(logging, json_record["level"])
+ level_str = json_record["level"]
+ # Example level with offset: 'ERROR+2'
+ if "+" in level_str or "-" in level_str:
+ match = re.match(r"([A-Z]+)([+-]\d+)", level_str)
+ if match:
+ base, offset = match.groups()
+ base_level = getattr(logging, base, logging.INFO)
+ record.levelno = base_level + int(offset)
+ else:
+ record.levelno = getattr(logging, level_str, logging.INFO)
+ else:
+ record.levelno = getattr(logging, level_str, logging.INFO)
record.levelname = logging.getLevelName(record.levelno)
if "source" in json_record:
record.funcName = json_record["source"]["function"]