This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 9c60fd5 Revert "[BEAM-2914] Add portable merging window support to
Python. (#12995)"
new 871e023 Merge pull request #14004 from ajamato/rollback
9c60fd5 is described below
commit 9c60fd546458f48912285d38bce52a31d2eb4472
Author: Alex Amato <[email protected]>
AuthorDate: Wed Feb 17 11:04:53 2021 -0800
Revert "[BEAM-2914] Add portable merging window support to Python. (#12995)"
This reverts commit 625ee1f6e27636f26672e973ecbcecf19a8cb361.
---
.../runners/portability/flink_runner_test.py | 3 -
.../runners/portability/fn_api_runner/execution.py | 241 +--------------------
.../portability/fn_api_runner/fn_runner_test.py | 37 ----
.../runners/portability/spark_runner_test.py | 3 -
.../apache_beam/runners/worker/bundle_processor.py | 43 ----
5 files changed, 12 insertions(+), 315 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index e97ce49..94e30bf 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -398,9 +398,6 @@ class
FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
def test_register_finalizations(self):
raise unittest.SkipTest("BEAM-11021")
- def test_custom_merging_window(self):
- raise unittest.SkipTest("BEAM-11004")
-
# Inherits all other tests.
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
index a08aa5f..bc69123 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py
@@ -24,8 +24,6 @@ from __future__ import absolute_import
import collections
import copy
import itertools
-import uuid
-import weakref
from typing import TYPE_CHECKING
from typing import Any
from typing import DefaultDict
@@ -57,7 +55,6 @@ from
apache_beam.runners.portability.fn_api_runner.translations import only_elem
from apache_beam.runners.portability.fn_api_runner.translations import
split_buffer_id
from apache_beam.runners.portability.fn_api_runner.translations import
unique_name
from apache_beam.runners.worker import bundle_processor
-from apache_beam.transforms import core
from apache_beam.transforms import trigger
from apache_beam.transforms import window
from apache_beam.transforms.window import GlobalWindow
@@ -72,6 +69,7 @@ if TYPE_CHECKING:
from apache_beam.runners.portability.fn_api_runner.fn_runner import
DataOutput
from apache_beam.runners.portability.fn_api_runner.fn_runner import
OutputTimers
from apache_beam.runners.portability.fn_api_runner.translations import
DataSideInput
+ from apache_beam.transforms import core
from apache_beam.transforms.window import BoundedWindow
ENCODED_IMPULSE_VALUE = WindowedValueCoder(
@@ -340,222 +338,6 @@ class
GenericNonMergingWindowFn(window.NonMergingWindowFn):
context.coders[window_coder_id.decode('utf-8')])
-class GenericMergingWindowFn(window.WindowFn):
-
- URN = 'internal-generic-merging'
-
- TO_SDK_TRANSFORM = 'read'
- FROM_SDK_TRANSFORM = 'write'
-
- _HANDLES = {} # type: Dict[str, GenericMergingWindowFn]
-
- def __init__(self, execution_context, windowing_strategy_proto):
- # type: (FnApiRunnerExecutionContext,
beam_runner_api_pb2.WindowingStrategy) -> None
- self._worker_handler = None # type:
Optional[worker_handlers.WorkerHandler]
- self._handle_id = handle_id = uuid.uuid4().hex
- self._HANDLES[handle_id] = self
- # ExecutionContexts are expensive, we don't want to keep them in the
- # static dictionary forever. Instead we hold a weakref and pop self
- # out of the dict once this context goes away.
- self._execution_context_ref_obj = weakref.ref(
- execution_context, lambda _: self._HANDLES.pop(handle_id, None))
- self._windowing_strategy_proto = windowing_strategy_proto
- self._counter = 0
- # Lazily created in make_process_bundle_descriptor()
- self._process_bundle_descriptor = None
- self._bundle_processor_id = None # type: Optional[str]
- self.windowed_input_coder_impl = None # type: Optional[CoderImpl]
- self.windowed_output_coder_impl = None # type: Optional[CoderImpl]
-
- def _execution_context_ref(self):
- # type: () -> FnApiRunnerExecutionContext
- result = self._execution_context_ref_obj()
- assert result is not None
- return result
-
- def payload(self):
- # type: () -> bytes
- return self._handle_id.encode('utf-8')
-
- @staticmethod
- @window.urns.RunnerApiFn.register_urn(URN, bytes)
- def from_runner_api_parameter(handle_id, unused_context):
- # type: (bytes, Any) -> GenericMergingWindowFn
- return GenericMergingWindowFn._HANDLES[handle_id.decode('utf-8')]
-
- def assign(self, assign_context):
- # type: (window.WindowFn.AssignContext) -> Iterable[window.BoundedWindow]
- raise NotImplementedError()
-
- def merge(self, merge_context):
- # type: (window.WindowFn.MergeContext) -> None
- worker_handler = self.worker_handle()
-
- assert self.windowed_input_coder_impl is not None
- assert self.windowed_output_coder_impl is not None
- process_bundle_id = self.uid('process')
- to_worker = worker_handler.data_conn.output_stream(
- process_bundle_id, self.TO_SDK_TRANSFORM)
- to_worker.write(
- self.windowed_input_coder_impl.encode_nested(
- window.GlobalWindows.windowed_value((b'', merge_context.windows))))
- to_worker.close()
-
- process_bundle_req = beam_fn_api_pb2.InstructionRequest(
- instruction_id=process_bundle_id,
- process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
- process_bundle_descriptor_id=self._bundle_processor_id))
- result_future = worker_handler.control_conn.push(process_bundle_req)
- for output in worker_handler.data_conn.input_elements(
- process_bundle_id, [self.FROM_SDK_TRANSFORM],
- abort_callback=lambda: bool(result_future.is_done() and result_future.
- get().error)):
- if isinstance(output, beam_fn_api_pb2.Elements.Data):
- windowed_result = self.windowed_output_coder_impl.decode_nested(
- output.data)
- for merge_result, originals in windowed_result.value[1][1]:
- merge_context.merge(originals, merge_result)
- else:
- raise RuntimeError("Unexpected data: %s" % output)
-
- result = result_future.get()
- if result.error:
- raise RuntimeError(result.error)
- # The result was "returned" via the merge callbacks on merge_context above.
-
- def get_window_coder(self):
- # type: () -> coders.Coder
- return self._execution_context_ref().pipeline_context.coders[
- self._windowing_strategy_proto.window_coder_id]
-
- def worker_handle(self):
- # type: () -> worker_handlers.WorkerHandler
- if self._worker_handler is None:
- worker_handler_manager = self._execution_context_ref(
- ).worker_handler_manager
- self._worker_handler = worker_handler_manager.get_worker_handlers(
- self._windowing_strategy_proto.environment_id, 1)[0]
- process_bundle_decriptor = self.make_process_bundle_descriptor(
- self._worker_handler.data_api_service_descriptor(),
- self._worker_handler.state_api_service_descriptor())
- worker_handler_manager.register_process_bundle_descriptor(
- process_bundle_decriptor)
- return self._worker_handler
-
- def make_process_bundle_descriptor(
- self, data_api_service_descriptor, state_api_service_descriptor):
- # type: (Optional[endpoints_pb2.ApiServiceDescriptor],
Optional[endpoints_pb2.ApiServiceDescriptor]) ->
beam_fn_api_pb2.ProcessBundleDescriptor
-
- """Creates a ProcessBundleDescriptor for invoking the WindowFn's
- merge operation.
- """
- def make_channel_payload(coder_id):
- # type: (str) -> bytes
- data_spec = beam_fn_api_pb2.RemoteGrpcPort(coder_id=coder_id)
- if data_api_service_descriptor:
- data_spec.api_service_descriptor.url =
(data_api_service_descriptor.url)
- return data_spec.SerializeToString()
-
- pipeline_context = self._execution_context_ref().pipeline_context
- global_windowing_strategy_id = self.uid('global_windowing_strategy')
- global_windowing_strategy_proto = core.Windowing(
- window.GlobalWindows()).to_runner_api(pipeline_context)
- coders = dict(pipeline_context.coders.get_id_to_proto_map())
-
- def make_coder(urn, *components):
- # type: (str, str) -> str
- coder_proto = beam_runner_api_pb2.Coder(
- spec=beam_runner_api_pb2.FunctionSpec(urn=urn),
- component_coder_ids=components)
- coder_id = self.uid('coder')
- coders[coder_id] = coder_proto
- pipeline_context.coders.put_proto(coder_id, coder_proto)
- return coder_id
-
- bytes_coder_id = make_coder(common_urns.coders.BYTES.urn)
- window_coder_id = self._windowing_strategy_proto.window_coder_id
- global_window_coder_id = make_coder(common_urns.coders.GLOBAL_WINDOW.urn)
- iter_window_coder_id = make_coder(
- common_urns.coders.ITERABLE.urn, window_coder_id)
- input_coder_id = make_coder(
- common_urns.coders.KV.urn, bytes_coder_id, iter_window_coder_id)
- output_coder_id = make_coder(
- common_urns.coders.KV.urn,
- bytes_coder_id,
- make_coder(
- common_urns.coders.KV.urn,
- iter_window_coder_id,
- make_coder(
- common_urns.coders.ITERABLE.urn,
- make_coder(
- common_urns.coders.KV.urn,
- window_coder_id,
- iter_window_coder_id))))
- windowed_input_coder_id = make_coder(
- common_urns.coders.WINDOWED_VALUE.urn,
- input_coder_id,
- global_window_coder_id)
- windowed_output_coder_id = make_coder(
- common_urns.coders.WINDOWED_VALUE.urn,
- output_coder_id,
- global_window_coder_id)
-
- self.windowed_input_coder_impl = pipeline_context.coders[
- windowed_input_coder_id].get_impl()
- self.windowed_output_coder_impl = pipeline_context.coders[
- windowed_output_coder_id].get_impl()
-
- self._bundle_processor_id = self.uid('merge_windows')
- return beam_fn_api_pb2.ProcessBundleDescriptor(
- id=self._bundle_processor_id,
- transforms={
- self.TO_SDK_TRANSFORM: beam_runner_api_pb2.PTransform(
- unique_name='MergeWindows/Read',
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_INPUT_URN,
- payload=make_channel_payload(windowed_input_coder_id)),
- outputs={'input': 'input'}),
- 'Merge': beam_runner_api_pb2.PTransform(
- unique_name='MergeWindows/Merge',
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=common_urns.primitives.MERGE_WINDOWS.urn,
- payload=self._windowing_strategy_proto.window_fn.
- SerializeToString()),
- inputs={'input': 'input'},
- outputs={'output': 'output'}),
- self.FROM_SDK_TRANSFORM: beam_runner_api_pb2.PTransform(
- unique_name='MergeWindows/Write',
- spec=beam_runner_api_pb2.FunctionSpec(
- urn=bundle_processor.DATA_OUTPUT_URN,
- payload=make_channel_payload(windowed_output_coder_id)),
- inputs={'output': 'output'}),
- },
- pcollections={
- 'input': beam_runner_api_pb2.PCollection(
- unique_name='input',
- windowing_strategy_id=global_windowing_strategy_id,
- coder_id=input_coder_id),
- 'output': beam_runner_api_pb2.PCollection(
- unique_name='output',
- windowing_strategy_id=global_windowing_strategy_id,
- coder_id=output_coder_id),
- },
- coders=coders,
- windowing_strategies={
- global_windowing_strategy_id: global_windowing_strategy_proto,
- },
- environments=dict(
- self._execution_context_ref().pipeline_components.environments.
- items()),
- state_api_service_descriptor=state_api_service_descriptor,
- timer_api_service_descriptor=data_api_service_descriptor)
-
- def uid(self, name=''):
- # type: (str) -> str
- self._counter += 1
- return '%s_%s_%s' % (self._handle_id, name, self._counter)
-
-
class FnApiRunnerExecutionContext(object):
"""
:var pcoll_buffers: (dict): Mapping of
@@ -661,22 +443,23 @@ class FnApiRunnerExecutionContext(object):
windowing_strategy_proto =
self.pipeline_components.windowing_strategies[id]
if windowing_strategy_proto.window_fn.urn in SAFE_WINDOW_FNS:
return id
- else:
+ elif (windowing_strategy_proto.merge_status ==
+ beam_runner_api_pb2.MergeStatus.NON_MERGING) or True:
safe_id = id + '_safe'
while safe_id in self.pipeline_components.windowing_strategies:
safe_id += '_'
safe_proto = copy.copy(windowing_strategy_proto)
- if (windowing_strategy_proto.merge_status ==
- beam_runner_api_pb2.MergeStatus.NON_MERGING):
- safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN
- safe_proto.window_fn.payload = (
- windowing_strategy_proto.window_coder_id.encode('utf-8'))
- else:
- window_fn = GenericMergingWindowFn(self, windowing_strategy_proto)
- safe_proto.window_fn.urn = GenericMergingWindowFn.URN
- safe_proto.window_fn.payload = window_fn.payload()
+ safe_proto.window_fn.urn = GenericNonMergingWindowFn.URN
+ safe_proto.window_fn.payload = (
+ windowing_strategy_proto.window_coder_id.encode('utf-8'))
self.pipeline_context.windowing_strategies.put_proto(safe_id, safe_proto)
return safe_id
+ elif windowing_strategy_proto.window_fn.urn ==
python_urns.PICKLED_WINDOWFN:
+ return id
+ else:
+ raise NotImplementedError(
+ '[BEAM-10119] Unknown merging WindowFn: %s' %
+ windowing_strategy_proto)
@property
def state_servicer(self):
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 912074e..7bbdd3c 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -20,7 +20,6 @@ from __future__ import absolute_import
from __future__ import print_function
import collections
-import gc
import logging
import os
import random
@@ -47,7 +46,6 @@ from tenacity import retry
from tenacity import stop_after_attempt
import apache_beam as beam
-from apache_beam.coders import coders
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.io import restriction_trackers
from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
@@ -782,21 +780,6 @@ class FnApiRunnerTest(unittest.TestCase):
| beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])]))
- def test_custom_merging_window(self):
- with self.create_pipeline() as p:
- res = (
- p
- | beam.Create([1, 2, 100, 101, 102])
- | beam.Map(lambda t: window.TimestampedValue(('k', t), t))
- | beam.WindowInto(CustomMergingWindowFn())
- | beam.GroupByKey()
- | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
- assert_that(
- res, equal_to([('k', [1]), ('k', [101]), ('k', [2, 100, 102])]))
- gc.collect()
- from apache_beam.runners.portability.fn_api_runner.execution import
GenericMergingWindowFn
- self.assertEqual(GenericMergingWindowFn._HANDLES, {})
-
@unittest.skip('BEAM-9119: test is flaky')
def test_large_elements(self):
with self.create_pipeline() as p:
@@ -2019,26 +2002,6 @@ class FnApiBasedStateBackedCoderTest(unittest.TestCase):
assert_that(r, equal_to([VALUES_PER_ELEMENT * NUM_OF_ELEMENTS]))
-# TODO(robertwb): Why does pickling break when this is inlined?
-class CustomMergingWindowFn(window.WindowFn):
- def assign(self, assign_context):
- return [
- window.IntervalWindow(
- assign_context.timestamp, assign_context.timestamp + 1)
- ]
-
- def merge(self, merge_context):
- evens = [w for w in merge_context.windows if w.start % 2 == 0]
- if evens:
- merge_context.merge(
- evens,
- window.IntervalWindow(
- min(w.start for w in evens), max(w.end for w in evens)))
-
- def get_window_coder(self):
- return coders.IntervalWindowCoder()
-
-
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/runners/portability/spark_runner_test.py
b/sdks/python/apache_beam/runners/portability/spark_runner_test.py
index 3473cad..062e06f 100644
--- a/sdks/python/apache_beam/runners/portability/spark_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/spark_runner_test.py
@@ -181,9 +181,6 @@ class
SparkRunnerTest(portable_runner_test.PortableRunnerTest):
super(SparkRunnerTest,
self).test_flattened_side_input(with_transcoding=False)
- def test_custom_merging_window(self):
- raise unittest.SkipTest("BEAM-11004")
-
# Inherits all other tests from PortableRunnerTest.
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index f05228e..1e1c058 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -76,7 +76,6 @@ from apache_beam.transforms import TimeDomain
from apache_beam.transforms import core
from apache_beam.transforms import sideinputs
from apache_beam.transforms import userstate
-from apache_beam.transforms import window
from apache_beam.utils import counters
from apache_beam.utils import proto_utils
from apache_beam.utils import timestamp
@@ -1857,48 +1856,6 @@ def create_map_windows(
factory, transform_id, transform_proto, consumers, MapWindows())
[email protected]_urn(
- common_urns.primitives.MERGE_WINDOWS.urn, beam_runner_api_pb2.FunctionSpec)
-def create_merge_windows(
- factory, # type: BeamTransformFactory
- transform_id, # type: str
- transform_proto, # type: beam_runner_api_pb2.PTransform
- mapping_fn_spec, # type: beam_runner_api_pb2.FunctionSpec
- consumers # type: Dict[str, List[operations.Operation]]
-):
- assert mapping_fn_spec.urn == python_urns.PICKLED_WINDOWFN
- window_fn = pickler.loads(mapping_fn_spec.payload)
-
- class MergeWindows(beam.DoFn):
- def process(self, element):
- nonce, windows = element
-
- original_windows = set(windows) # type: Set[window.BoundedWindow]
- merged_windows = collections.defaultdict(
- set
- ) # type: MutableMapping[window.BoundedWindow,
Set[window.BoundedWindow]]
-
- class RecordingMergeContext(window.WindowFn.MergeContext):
- def merge(
- self,
- to_be_merged, # type: Iterable[window.BoundedWindow]
- merge_result, # type: window.BoundedWindow
- ):
- originals = merged_windows[merge_result]
- for window in to_be_merged:
- if window in original_windows:
- originals.add(window)
- original_windows.remove(window)
- else:
- originals.update(merged_windows.pop(window))
-
- window_fn.merge(RecordingMergeContext(windows))
- yield nonce, (original_windows, merged_windows.items())
-
- return _create_simple_pardo_operation(
- factory, transform_id, transform_proto, consumers, MergeWindows())
-
-
@BeamTransformFactory.register_urn(common_urns.primitives.TO_STRING.urn, None)
def create_to_string_fn(
factory, # type: BeamTransformFactory