[
https://issues.apache.org/jira/browse/BEAM-6153?focusedWorklogId=172227&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-172227
]
ASF GitHub Bot logged work on BEAM-6153:
----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/18 11:04
Start Date: 05/Dec/18 11:04
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #7170: [BEAM-6153]
Re-enable coder optimization.
URL: https://github.com/apache/beam/pull/7170
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd
b/sdks/python/apache_beam/coders/coder_impl.pxd
index 03d0a56c053e..db724e652f83 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -25,11 +25,14 @@ cimport libc.stdint
cimport libc.stdlib
cimport libc.string
+cdef extern from "math.h":
+ libc.stdint.int64_t abs "llabs"(libc.stdint.int64_t)
+
from .stream cimport InputStream, OutputStream
from apache_beam.utils cimport windowed_value
-cdef object loads, dumps, create_InputStream, create_OutputStream,
ByteCountingOutputStream, get_varint_size
+cdef object loads, dumps, create_InputStream, create_OutputStream,
ByteCountingOutputStream, get_varint_size, past_unicode
# Temporarily untyped to allow monkeypatching on failed import.
#cdef type WindowedValue
@@ -75,8 +78,11 @@ cdef unsigned char SET_TYPE
cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
cdef CoderImpl fallback_coder_impl
- @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t)
+ @cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
+ unicode_value=unicode)
cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+ @cython.locals(t=int)
+ cpdef decode_from_stream(self, InputStream stream, bint nested)
cdef class BytesCoderImpl(CoderImpl):
@@ -123,6 +129,9 @@ cdef class TupleCoderImpl(AbstractComponentCoderImpl):
cdef class SequenceCoderImpl(StreamCoderImpl):
cdef CoderImpl _elem_coder
cpdef _construct_from_sequence(self, values)
+ @cython.locals(buffer=OutputStream, target_buffer_size=libc.stdint.int64_t,
+ index=libc.stdint.int64_t)
+ cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
cdef class TupleSequenceCoderImpl(SequenceCoderImpl):
@@ -133,8 +142,41 @@ cdef class IterableCoderImpl(SequenceCoderImpl):
pass
+cdef object IntervalWindow
+
+cdef class IntervalWindowCoderImpl(StreamCoderImpl):
+ cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
+ cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
+
+ @cython.locals(typed_value=windowed_value._IntervalWindowBase,
+ span_millis=libc.stdint.int64_t)
+ cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+
+ @cython.locals(typed_value=windowed_value._IntervalWindowBase)
+ cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+ @cython.locals(typed_value=windowed_value._IntervalWindowBase,
+ span_millis=libc.stdint.int64_t)
+ cpdef estimate_size(self, value, bint nested=?)
+
+
+cdef int PaneInfoTiming_UNKNOWN
+cdef int PaneInfoEncoding_FIRST
+
+
cdef class PaneInfoCoderImpl(StreamCoderImpl):
- cdef int _choose_encoding(self, value)
+ cdef int _choose_encoding(self, windowed_value.PaneInfo value)
+
+ @cython.locals(pane_info=windowed_value.PaneInfo, encoding_type=int)
+ cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+
+ @cython.locals(encoded_first_byte=int, encoding_type=int)
+ cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+
+cdef libc.stdint.uint64_t _TIME_SHIFT
+cdef libc.stdint.int64_t MIN_TIMESTAMP_micros
+cdef libc.stdint.int64_t MAX_TIMESTAMP_micros
cdef class WindowedValueCoderImpl(StreamCoderImpl):
@@ -144,8 +186,18 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
cdef CoderImpl _windows_coder
cdef CoderImpl _pane_info_coder
+ cdef libc.stdint.uint64_t _to_normal_time(self, libc.stdint.int64_t value)
+ cdef libc.stdint.int64_t _from_normal_time(self, libc.stdint.uint64_t value)
+
@cython.locals(c=CoderImpl)
cpdef get_estimated_size_and_observables(self, value, bint nested=?)
- @cython.locals(wv=windowed_value.WindowedValue)
+ @cython.locals(timestamp=libc.stdint.int64_t)
+ cpdef decode_from_stream(self, InputStream stream, bint nested)
+
+ @cython.locals(wv=windowed_value.WindowedValue, restore_sign=int)
cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
+
+
+cdef class LengthPrefixCoderImpl(StreamCoderImpl):
+ cdef CoderImpl _value_coder
diff --git a/sdks/python/apache_beam/coders/coder_impl.py
b/sdks/python/apache_beam/coders/coder_impl.py
index 070e07d0d370..c3768dd10f4e 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -37,8 +37,8 @@
from builtins import chr
from builtins import object
+from past.builtins import unicode as past_unicode
from past.builtins import long
-from past.builtins import unicode
from apache_beam.coders import observable
from apache_beam.utils import windowed_value
@@ -71,6 +71,11 @@
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
+_TIME_SHIFT = 1 << 63
+MIN_TIMESTAMP_micros = MIN_TIMESTAMP.micros
+MAX_TIMESTAMP_micros = MAX_TIMESTAMP.micros
+
+
class CoderImpl(object):
"""For internal use only; no backwards-compatibility guarantees."""
@@ -216,7 +221,7 @@ def __init__(self, coder, step_label):
self._step_label = step_label
def _check_safe(self, value):
- if isinstance(value, (bytes, unicode, long, int, float)):
+ if isinstance(value, (bytes, past_unicode, long, int, float)):
pass
elif value is None:
pass
@@ -321,10 +326,10 @@ def encode_to_stream(self, value, stream, nested):
elif t is bytes:
stream.write_byte(BYTES_TYPE)
stream.write(value, nested)
- elif t is unicode:
- text_value = value # for typing
+ elif t is past_unicode:
+ unicode_value = value # for typing
stream.write_byte(UNICODE_TYPE)
- stream.write(text_value.encode('utf-8'), nested)
+ stream.write(unicode_value.encode('utf-8'), nested)
elif t is list or t is tuple or t is set:
stream.write_byte(
LIST_TYPE if t is list else TUPLE_TYPE if t is tuple else SET_TYPE)
@@ -413,37 +418,47 @@ def estimate_size(self, unused_value, nested=False):
return 8
+IntervalWindow = None
+
+
class IntervalWindowCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
# TODO: Fn Harness only supports millis. Is this important enough to fix?
def _to_normal_time(self, value):
"""Convert "lexicographically ordered unsigned" to signed."""
- return value - (1 << 63)
+ return value - _TIME_SHIFT
def _from_normal_time(self, value):
"""Convert signed to "lexicographically ordered unsigned"."""
- return value + (1 << 63)
+ return value + _TIME_SHIFT
def encode_to_stream(self, value, out, nested):
- span_micros = value.end.micros - value.start.micros
+ typed_value = value
+ span_millis = (typed_value._end_micros // 1000
+ - typed_value._start_micros // 1000)
out.write_bigendian_uint64(
- self._from_normal_time(value.end.micros // 1000))
- out.write_var_int64(span_micros // 1000)
+ self._from_normal_time(typed_value._end_micros // 1000))
+ out.write_var_int64(span_millis)
def decode_from_stream(self, in_, nested):
- end_millis = self._to_normal_time(in_.read_bigendian_uint64())
- start_millis = end_millis - in_.read_var_int64()
- from apache_beam.transforms.window import IntervalWindow
- ret = IntervalWindow(start=Timestamp(micros=start_millis * 1000),
- end=Timestamp(micros=end_millis * 1000))
- return ret
+ global IntervalWindow
+ if IntervalWindow is None:
+ from apache_beam.transforms.window import IntervalWindow
+ typed_value = IntervalWindow(None, None)
+ typed_value._end_micros = (
+ 1000 * self._to_normal_time(in_.read_bigendian_uint64()))
+ typed_value._start_micros = (
+ typed_value._end_micros - 1000 * in_.read_var_int64())
+ return typed_value
def estimate_size(self, value, nested=False):
# An IntervalWindow is context-insensitive, with a timestamp (8 bytes)
# and a varint timespam.
- span = value.end.micros - value.start.micros
- return 8 + get_varint_size(span // 1000)
+ typed_value = value
+ span_millis = (typed_value._end_micros // 1000
+ - typed_value._start_micros // 1000)
+ return 8 + get_varint_size(span_millis)
class TimestampCoderImpl(StreamCoderImpl):
@@ -647,10 +662,11 @@ def encode_to_stream(self, value, out, nested):
# -1 to indicate that the length is not known.
out.write_bigendian_int32(-1)
buffer = create_OutputStream()
+ target_buffer_size = self._DEFAULT_BUFFER_SIZE
prev_index = index = -1
for index, elem in enumerate(value):
self._elem_coder.encode_to_stream(elem, buffer, True)
- if out.size() > self._DEFAULT_BUFFER_SIZE:
+ if buffer.size() > target_buffer_size:
out.write_var_int64(index - prev_index)
out.write(buffer.get())
prev_index = index
@@ -739,25 +755,31 @@ class PaneInfoEncoding(object):
TWO_INDICES = 2
+# These are cdef'd to ints to optimized the common case.
+PaneInfoTiming_UNKNOWN = windowed_value.PaneInfoTiming.UNKNOWN
+PaneInfoEncoding_FIRST = PaneInfoEncoding.FIRST
+
+
class PaneInfoCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.
Coder for a PaneInfo descriptor."""
def _choose_encoding(self, value):
- if ((value.index == 0 and value.nonspeculative_index == 0) or
- value.timing == windowed_value.PaneInfoTiming.UNKNOWN):
- return PaneInfoEncoding.FIRST
- elif (value.index == value.nonspeculative_index or
- value.timing == windowed_value.PaneInfoTiming.EARLY):
+ if ((value._index == 0 and value._nonspeculative_index == 0) or
+ value._timing == PaneInfoTiming_UNKNOWN):
+ return PaneInfoEncoding_FIRST
+ elif (value._index == value._nonspeculative_index or
+ value._timing == windowed_value.PaneInfoTiming.EARLY):
return PaneInfoEncoding.ONE_INDEX
else:
return PaneInfoEncoding.TWO_INDICES
def encode_to_stream(self, value, out, nested):
- encoding_type = self._choose_encoding(value)
- out.write_byte(value.encoded_byte | (encoding_type << 4))
- if encoding_type == PaneInfoEncoding.FIRST:
+ pane_info = value # cast
+ encoding_type = self._choose_encoding(pane_info)
+ out.write_byte(pane_info._encoded_byte | (encoding_type << 4))
+ if encoding_type == PaneInfoEncoding_FIRST:
return
elif encoding_type == PaneInfoEncoding.ONE_INDEX:
out.write_var_int64(value.index)
@@ -772,7 +794,7 @@ def decode_from_stream(self, in_stream, nested):
base = windowed_value._BYTE_TO_PANE_INFO[encoded_first_byte & 0xF]
assert base is not None
encoding_type = encoded_first_byte >> 4
- if encoding_type == PaneInfoEncoding.FIRST:
+ if encoding_type == PaneInfoEncoding_FIRST:
return base
elif encoding_type == PaneInfoEncoding.ONE_INDEX:
index = in_stream.read_var_int64()
@@ -811,11 +833,11 @@ class WindowedValueCoderImpl(StreamCoderImpl):
# byte representation of timestamps.
def _to_normal_time(self, value):
"""Convert "lexicographically ordered unsigned" to signed."""
- return value - (1 << 63)
+ return value - _TIME_SHIFT
def _from_normal_time(self, value):
"""Convert signed to "lexicographically ordered unsigned"."""
- return value + (1 << 63)
+ return value + _TIME_SHIFT
def __init__(self, value_coder, timestamp_coder, window_coder):
# TODO(lcwik): Remove the timestamp coder field
@@ -849,16 +871,12 @@ def decode_from_stream(self, in_stream, nested):
# were indeed MIN/MAX timestamps.
# TODO(BEAM-1524): Clean this up once we have a BEAM wide consensus on
# precision of timestamps.
- if timestamp == -(abs(MIN_TIMESTAMP.micros) // 1000):
- timestamp = MIN_TIMESTAMP.micros
- elif timestamp == (MAX_TIMESTAMP.micros // 1000):
- timestamp = MAX_TIMESTAMP.micros
+ if timestamp <= -(abs(MIN_TIMESTAMP_micros) // 1000):
+ timestamp = MIN_TIMESTAMP_micros
+ elif timestamp >= MAX_TIMESTAMP_micros // 1000:
+ timestamp = MAX_TIMESTAMP_micros
else:
timestamp *= 1000
- if timestamp > MAX_TIMESTAMP.micros:
- timestamp = MAX_TIMESTAMP.micros
- if timestamp < MIN_TIMESTAMP.micros:
- timestamp = MIN_TIMESTAMP.micros
windows = self._windows_coder.decode_from_stream(in_stream, True)
# Read PaneInfo encoded byte.
diff --git a/sdks/python/apache_beam/coders/coders.py
b/sdks/python/apache_beam/coders/coders.py
index a17bb0808a0a..f5c90a8ec99e 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -995,7 +995,7 @@ def __init__(self, value_coder):
self._value_coder = value_coder
def _create_impl(self):
- return coder_impl.LengthPrefixCoderImpl(self._value_coder)
+ return coder_impl.LengthPrefixCoderImpl(self._value_coder.get_impl())
def is_deterministic(self):
return self._value_coder.is_deterministic()
diff --git a/sdks/python/apache_beam/tools/coders_microbenchmark.py
b/sdks/python/apache_beam/tools/coders_microbenchmark.py
index 9453d61067c9..695bdd1dd620 100644
--- a/sdks/python/apache_beam/tools/coders_microbenchmark.py
+++ b/sdks/python/apache_beam/tools/coders_microbenchmark.py
@@ -31,7 +31,9 @@
from __future__ import absolute_import
from __future__ import print_function
+import argparse
import random
+import re
import string
import sys
@@ -102,7 +104,8 @@ def small_list():
def large_list():
- return list_int(1000)
+ # Bool is the last item in FastPrimitiveCoders before pickle.
+ return [bool(k) for k in list_int(1000)]
def small_tuple():
@@ -122,6 +125,19 @@ def large_dict():
return {i: i for i in large_list()}
+def large_iterable():
+ yield 'a' * coders.coder_impl.SequenceCoderImpl._DEFAULT_BUFFER_SIZE
+ for k in range(1000):
+ yield k
+
+
+def globally_windowed_value():
+ return windowed_value.WindowedValue(
+ value=small_int(),
+ timestamp=12345678,
+ windows=(window.GlobalWindow(),))
+
+
def random_windowed_value(num_windows):
return windowed_value.WindowedValue(
value=small_int(),
@@ -140,7 +156,8 @@ def wv_with_multiple_windows():
return random_windowed_value(num_windows=32)
-def run_coder_benchmarks(num_runs, input_size, seed, verbose):
+def run_coder_benchmarks(
+ num_runs, input_size, seed, verbose, filter_regex='.*'):
random.seed(seed)
# TODO(BEAM-4441): Pick coders using type hints, for example:
@@ -166,6 +183,9 @@ def run_coder_benchmarks(num_runs, input_size, seed,
verbose):
coder_benchmark_factory(
coders.IterableCoder(coders.FastPrimitivesCoder()),
large_list),
+ coder_benchmark_factory(
+ coders.IterableCoder(coders.FastPrimitivesCoder()),
+ large_iterable),
coder_benchmark_factory(
coders.FastPrimitivesCoder(),
small_tuple),
@@ -182,20 +202,38 @@ def run_coder_benchmarks(num_runs, input_size, seed,
verbose):
coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
wv_with_one_window),
coder_benchmark_factory(
- coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
+ coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
+ coders.IntervalWindowCoder()),
wv_with_multiple_windows),
+ coder_benchmark_factory(
+ coders.WindowedValueCoder(coders.FastPrimitivesCoder(),
+ coders.GlobalWindowCoder()),
+ globally_windowed_value),
+ coder_benchmark_factory(
+ coders.LengthPrefixCoder(coders.FastPrimitivesCoder()),
+ small_int)
]
- suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks]
+ suite = [utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks
+ if re.search(filter_regex, b.__name__, flags=re.I)]
utils.run_benchmarks(suite, verbose=verbose)
if __name__ == "__main__":
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--filter', default='.*')
+ parser.add_argument('--num_runs', default=20, type=int)
+ parser.add_argument('--num_elements_per_benchmark', default=1000, type=int)
+ parser.add_argument('--seed', default=42, type=int)
+ options = parser.parse_args()
+
utils.check_compiled("apache_beam.coders.coder_impl")
num_runs = 20
num_elements_per_benchmark = 1000
seed = 42 # Fix the seed for better consistency
- run_coder_benchmarks(num_runs, num_elements_per_benchmark, seed,
- verbose=True)
+ run_coder_benchmarks(
+ options.num_runs, options.num_elements_per_benchmark, options.seed,
+ verbose=True, filter_regex=options.filter)
diff --git a/sdks/python/apache_beam/transforms/window.py
b/sdks/python/apache_beam/transforms/window.py
index 0970a28d685a..1990532e2671 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -66,6 +66,7 @@
from apache_beam.transforms import timeutil
from apache_beam.utils import proto_utils
from apache_beam.utils import urns
+from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.timestamp import Duration
from apache_beam.utils.timestamp import Timestamp
@@ -225,31 +226,18 @@ def __repr__(self):
return '[?, %s)' % float(self.end)
-class IntervalWindow(BoundedWindow):
+@total_ordering
+class IntervalWindow(windowed_value._IntervalWindowBase, BoundedWindow):
"""A window for timestamps in range [start, end).
Attributes:
start: Start of window as seconds since Unix epoch.
end: End of window as seconds since Unix epoch.
"""
-
- def __init__(self, start, end):
- super(IntervalWindow, self).__init__(end)
- self.start = Timestamp.of(start)
-
- def __hash__(self):
- return hash((self.start, self.end))
-
- def __eq__(self, other):
- return (self.start == other.start
- and self.end == other.end
- and type(self) == type(other))
-
- def __ne__(self, other):
- return not self == other
-
- def __repr__(self):
- return '[%s, %s)' % (float(self.start), float(self.end))
+ def __lt__(self, other):
+ if self.end != other.end:
+ return self.end < other.end
+ return hash(self) < hash(other)
def intersects(self, other):
return other.start < self.end or self.start < other.end
diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd
b/sdks/python/apache_beam/utils/windowed_value.pxd
index 8755c939d364..5d867c83c384 100644
--- a/sdks/python/apache_beam/utils/windowed_value.pxd
+++ b/sdks/python/apache_beam/utils/windowed_value.pxd
@@ -21,6 +21,18 @@ from libc.stdint cimport int64_t
cdef type Timestamp
+
+cdef list _BYTE_TO_PANE_INFO
+
[email protected]
+cdef class PaneInfo(object):
+ cdef readonly bint _is_first
+ cdef readonly bint _is_last
+ cdef readonly int _timing
+ cdef readonly int _index
+ cdef readonly int _nonspeculative_index
+ cdef readonly unsigned char _encoded_byte
+
@cython.final
cdef class WindowedValue(object):
cdef public object value
@@ -34,3 +46,10 @@ cdef class WindowedValue(object):
@cython.locals(wv=WindowedValue)
cpdef WindowedValue create(
object value, int64_t timestamp_micros, object windows, object pane_info=*)
+
+
+cdef class _IntervalWindowBase(object):
+ cdef object _start_object
+ cdef readonly int64_t _start_micros
+ cdef object _end_object
+ cdef readonly int64_t _end_micros
diff --git a/sdks/python/apache_beam/utils/windowed_value.py
b/sdks/python/apache_beam/utils/windowed_value.py
index 64984977da60..8239abf409cb 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -58,11 +58,11 @@ def __init__(self, is_first, is_last, timing, index,
nonspeculative_index):
def _get_encoded_byte(self):
byte = 0
- if self.is_first:
+ if self._is_first:
byte |= 1
- if self.is_last:
+ if self._is_last:
byte |= 2
- byte |= self.timing << 2
+ byte |= self._timing << 2
return byte
@staticmethod
@@ -121,6 +121,10 @@ def __hash__(self):
return hash((self.is_first, self.is_last, self.timing, self.index,
self.nonspeculative_index))
+ def __reduce__(self):
+ return PaneInfo, (self._is_first, self._is_last, self._timing, self._index,
+ self._nonspeculative_index)
+
def _construct_well_known_pane_infos():
pane_infos = []
@@ -233,3 +237,54 @@ def create(value, timestamp_micros, windows,
pane_info=PANE_INFO_UNKNOWN):
# the cdef class, but in this case it's OK as it's already present
# on each instance.
pass
+
+
+class _IntervalWindowBase(object):
+ """Optimized form of IntervalWindow storing only microseconds for endpoints.
+ """
+
+ def __init__(self, start, end):
+ if start is not None or end is not None:
+ self._start_object = Timestamp.of(start)
+ self._end_object = Timestamp.of(end)
+ try:
+ self._start_micros = self._start_object.micros
+ except OverflowError:
+ self._start_micros = (
+ MIN_TIMESTAMP.micros if self._start_object.micros < 0
+ else MAX_TIMESTAMP.micros)
+ try:
+ self._end_micros = self._end_object.micros
+ except OverflowError:
+ self._end_micros = (
+ MIN_TIMESTAMP.micros if self._end_object.micros < 0
+ else MAX_TIMESTAMP.micros)
+ else:
+ # Micros must be populated elsewhere.
+ self._start_object = self._end_object = None
+
+ @property
+ def start(self):
+ if self._start_object is None:
+ self._start_object = Timestamp(0, self._start_micros)
+ return self._start_object
+
+ @property
+ def end(self):
+ if self._end_object is None:
+ self._end_object = Timestamp(0, self._end_micros)
+ return self._end_object
+
+ def __hash__(self):
+ return hash((self._start_micros, self._end_micros))
+
+ def __eq__(self, other):
+ return (type(self) == type(other)
+ and self._start_micros == other._start_micros
+ and self._end_micros == other._end_micros)
+
+ def __ne__(self, other):
+ return not self == other
+
+ def __repr__(self):
+ return '[%s, %s)' % (float(self.start), float(self.end))
diff --git a/sdks/python/scripts/generate_pydoc.sh
b/sdks/python/scripts/generate_pydoc.sh
index e447d5e54fad..4929c5eb01fc 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -160,6 +160,7 @@ ignore_identifiers = [
'apache_beam.typehints.typehints.CompositeTypeHint',
'apache_beam.typehints.typehints.TypeConstraint',
'apache_beam.typehints.typehints.validate_composite_type_param()',
+ 'apache_beam.utils.windowed_value._IntervalWindowBase',
# Private classes which are used within the same module
'WindowedTypeConstraint', # apache_beam.typehints.typehints
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 172227)
Time Spent: 1h 20m (was: 1h 10m)
> PR 7130 causes transforms/util_test to fail
> -------------------------------------------
>
> Key: BEAM-6153
> URL: https://issues.apache.org/jira/browse/BEAM-6153
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Charles Chen
> Assignee: Robert Bradshaw
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> http://github.com/apache/beam/pull/7130 seems to cause transforms/util_test
> to fail.
> ======================================================================
> ERROR: test_reshuffle_sliding_window (__main__.ReshuffleTest)
> ----------------------------------------------------------------------
> BeamAssertException: Failed assert: [(1, [1, 2, 4]), (1, [1, 2, 4]), (2, [1,
> 2]), (2, [1, 2]), (3, [1]), (3, [1])] == [(1, [1]), (1, [1]), (1, [2]), (1,
> [2]), (1, [4]), (1, [4]), (2, [1]), (2, [1]), (2, [2]), (2, [2]), (3, [1]),
> (3, [1])] [while running 'before_reshuffle/Match']
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)