This is an automated email from the ASF dual-hosted git repository.
ningk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e39da4c [BEAM-12506] Changed WindowedValueHolder into a Row type
new 739dbb8 Merge pull request #15217 from KevinGG/BEAM-12506
e39da4c is described below
commit e39da4cddd2a020815d2282cf5712c2799605cba
Author: KevinGG <[email protected]>
AuthorDate: Fri Jul 23 15:28:59 2021 -0700
[BEAM-12506] Changed WindowedValueHolder into a Row type
The change avoids introducing a pickled python coder when writing/reading
WindowedValueHolders.
---
sdks/python/apache_beam/testing/test_stream.py | 45 +++++++++++++++++++++-
.../python/apache_beam/testing/test_stream_test.py | 21 ++++++++++
2 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/testing/test_stream.py
b/sdks/python/apache_beam/testing/test_stream.py
index b3b4a96..d655a90 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -207,15 +207,56 @@ class ProcessingTimeEvent(Event):
return 'ProcessingTimeEvent: <{}>'.format(self.advance_by)
-class WindowedValueHolder:
+class WindowedValueHolderMeta(type):
+ """A metaclass that overrides the isinstance check for WindowedValueHolder.
+
+ Python does a quick test for exact match. If an instance is exactly of
+ type WindowedValueHolder, the overridden isinstance check is omitted.
+ The override is needed because WindowedValueHolder elements encoded then
+ decoded become Row elements.
+ """
+ def __instancecheck__(cls, other):
+ """Checks if a beam.Row typed instance is a WindowedValueHolder.
+ """
+ return (
+ isinstance(other, beam.Row) and hasattr(other, 'windowed_value') and
+ hasattr(other, 'urn') and
+ isinstance(other.windowed_value, WindowedValue) and
+ other.urn == common_urns.coders.ROW.urn)
+
+
+class WindowedValueHolder(beam.Row, metaclass=WindowedValueHolderMeta):
"""A class that holds a WindowedValue.
This is a special class that can be used by the runner that implements the
TestStream as a signal that the underlying value should be unreified to the
specified window.
"""
+ # Register WindowedValueHolder to always use RowCoder.
+ coders.registry.register_coder(WindowedValueHolderMeta, coders.RowCoder)
+
def __init__(self, windowed_value):
- self.windowed_value = windowed_value
+ assert isinstance(windowed_value, WindowedValue), (
+ 'WindowedValueHolder can only hold %s type. Instead, %s is given.') % (
+ WindowedValue, windowed_value)
+ super().__init__(
+ **{
+ 'windowed_value': windowed_value, 'urn': common_urns.coders.ROW.urn
+ })
+
+ @classmethod
+ def from_row(cls, row):
+ """Converts a beam.Row typed instance to WindowedValueHolder.
+ """
+ if isinstance(row, WindowedValueHolder):
+ return WindowedValueHolder(row.windowed_value)
+ assert isinstance(row, beam.Row), 'The given row %s must be a %s type' % (
+ row, beam.Row)
+ assert hasattr(row, 'windowed_value'), (
+ 'The given %s must have a windowed_value attribute.') % row
+ assert isinstance(row.windowed_value, WindowedValue), (
+ 'The windowed_value attribute of %s must be a %s type') % (
+ row, WindowedValue)
class TestStream(PTransform):
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py
b/sdks/python/apache_beam/testing/test_stream_test.py
index 94445dd..a4580b7 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -332,6 +332,27 @@ class TestStreamTest(unittest.TestCase):
('a', timestamp.Timestamp(5), beam.window.IntervalWindow(5, 10)),
]))
+ def test_instance_check_windowed_value_holder(self):
+ windowed_value = WindowedValue(
+ 'a',
+ Timestamp(5), [beam.window.IntervalWindow(5, 10)],
+ PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0))
+ self.assertTrue(
+ isinstance(WindowedValueHolder(windowed_value), WindowedValueHolder))
+ self.assertTrue(
+ isinstance(
+ beam.Row(
+ windowed_value=windowed_value, urn=common_urns.coders.ROW.urn),
+ WindowedValueHolder))
+ self.assertFalse(
+ isinstance(
+ beam.Row(windowed_value=windowed_value), WindowedValueHolder))
+ self.assertFalse(isinstance(windowed_value, WindowedValueHolder))
+ self.assertFalse(
+ isinstance(beam.Row(x=windowed_value), WindowedValueHolder))
+ self.assertFalse(
+ isinstance(beam.Row(windowed_value=1), WindowedValueHolder))
+
def test_gbk_execution_no_triggers(self):
test_stream = (
TestStream().advance_watermark_to(10).add_elements([