This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ec35441d7f3 CombinePerKey with gbek (Python) (#36382)
ec35441d7f3 is described below
commit ec35441d7f336e329debf56ce5d0d13e87d1d91a
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Oct 7 13:10:48 2025 -0400
CombinePerKey with gbek (Python) (#36382)
* [WIP] CombinePerKey with gbek
* Run on dataflow postcommit
* Run on all postcommit
* Don't lift combinebykey
* Lint
---
.github/trigger_files/beam_PostCommit_Python.json | 2 +-
sdks/python/apache_beam/transforms/core.py | 8 +-
sdks/python/apache_beam/transforms/core_it_test.py | 109 +++++++++++++++++++++
3 files changed, 117 insertions(+), 2 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python.json
b/.github/trigger_files/beam_PostCommit_Python.json
index 815b511b898..42a6e88b8a2 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run.",
- "modification": 30
+ "modification": 31
}
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index da7602b9cda..3aae9f08344 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -3058,6 +3058,10 @@ class CombinePerKey(PTransformWithSideInputs):
return lambda element, *args, **kwargs: None
def expand(self, pcoll):
+ # When using gbek, don't allow overriding default implementation
+ gbek_option = (pcoll.pipeline._options.view_as(SetupOptions).gbek)
+ self._using_gbek = (gbek_option is not None and len(gbek_option) > 0)
+
args, kwargs = util.insert_values_in_args(
self.args, self.kwargs, self.side_inputs)
return pcoll | GroupByKey() | 'Combine' >> CombineValues(
@@ -3083,7 +3087,9 @@ class CombinePerKey(PTransformWithSideInputs):
self,
context, # type: PipelineContext
):
- # type: (...) -> typing.Tuple[str, beam_runner_api_pb2.CombinePayload]
+ # type: (...) -> tuple[str, typing.Optional[typing.Union[message.Message,
bytes, str]]]
+ if getattr(self, '_using_gbek', False):
+ return super().to_runner_api_parameter(context)
if self.args or self.kwargs:
from apache_beam.transforms.combiners import curry_combine_fn
combine_fn = curry_combine_fn(self.fn, self.args, self.kwargs)
diff --git a/sdks/python/apache_beam/transforms/core_it_test.py
b/sdks/python/apache_beam/transforms/core_it_test.py
new file mode 100644
index 00000000000..50744e28c67
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/core_it_test.py
@@ -0,0 +1,109 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Integration tests for cross-language transform expansion."""
+
+# pytype: skip-file
+
+import random
+import string
+import unittest
+
+import pytest
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.util import GcpSecret
+from apache_beam.transforms.util import Secret
+
+try:
+ from google.cloud import secretmanager
+except ImportError:
+ secretmanager = None # type: ignore[assignment]
+
+
+class GbekIT(unittest.TestCase):
+ def setUp(self):
+ if secretmanager is not None:
+ self.project_id = 'apache-beam-testing'
+ secret_postfix = ''.join(random.choice(string.digits) for _ in range(6))
+ self.secret_id = 'gbek_secret_tests_' + secret_postfix
+ self.client = secretmanager.SecretManagerServiceClient()
+ self.project_path = f'projects/{self.project_id}'
+ self.secret_path = f'{self.project_path}/secrets/{self.secret_id}'
+ try:
+ self.client.get_secret(request={'name': self.secret_path})
+ except Exception:
+ self.client.create_secret(
+ request={
+ 'parent': self.project_path,
+ 'secret_id': self.secret_id,
+ 'secret': {
+ 'replication': {
+ 'automatic': {}
+ }
+ }
+ })
+ self.client.add_secret_version(
+ request={
+ 'parent': self.secret_path,
+ 'payload': {
+ 'data': Secret.generate_secret_bytes()
+ }
+ })
+ version_name = f'{self.secret_path}/versions/latest'
+ self.gcp_secret = GcpSecret(version_name)
+ self.secret_option = f'type:GcpSecret;version_name:{version_name}'
+
+ def tearDown(self):
+ if secretmanager is not None:
+ self.client.delete_secret(request={'name': self.secret_path})
+
+ @pytest.mark.it_postcommit
+ @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed')
+ def test_gbk_with_gbek_it(self):
+ pipeline = TestPipeline(is_integration_test=True)
+ pipeline.options.view_as(SetupOptions).gbek = self.secret_option
+
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b',
3),
+ ('c', 4)])
+ result = (pcoll_1) | beam.GroupByKey()
+ sorted_result = result | beam.Map(lambda x: (x[0], sorted(x[1])))
+ assert_that(
+ sorted_result, equal_to([('a', ([1, 2])), ('b', ([3])), ('c', ([4]))]))
+
+ pipeline.run().wait_until_finish()
+
+ @pytest.mark.it_postcommit
+ @unittest.skipIf(secretmanager is None, 'GCP dependencies are not installed')
+ def test_combineValues_with_gbek_it(self):
+ pipeline = TestPipeline(is_integration_test=True)
+ pipeline.options.view_as(SetupOptions).gbek = self.secret_option
+
+ pcoll_1 = pipeline | 'Start 1' >> beam.Create([('a', 1), ('a', 2), ('b',
3),
+ ('c', 4)])
+ result = (pcoll_1) | beam.CombinePerKey(sum)
+ assert_that(result, equal_to([('a', 3), ('b', 3), ('c', 4)]))
+
+ pipeline.run().wait_until_finish()
+
+
+if __name__ == '__main__':
+ unittest.main()