This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d9b4363ae6 Consider windows with bands outside of [Min, Max] 
equivalent (#35460)
4d9b4363ae6 is described below

commit 4d9b4363ae68056a24afffff5e36cedfe1a3b4f1
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Jul 1 10:32:31 2025 -0400

    Consider windows with bands outside of [Min, Max] equivalent (#35460)
---
 sdks/python/apache_beam/utils/windowed_value.py | 23 ++++++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/utils/windowed_value.py 
b/sdks/python/apache_beam/utils/windowed_value.py
index f6232ce2f6b..2775ec4061b 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -432,13 +432,26 @@ class _IntervalWindowBase(object):
     return self._end_object
 
   def __hash__(self):
-    return hash((self._start_micros, self._end_micros))
+    # Cut off window at start/end timestamps for comparison purposes since some
+    # portable runners do this already, and timestamps outside of the bands of
+    # Min/Max timestamps are functionally equal to Min/Max.
+    start = max(self._start_micros, MIN_TIMESTAMP.micros)
+    end = min(self._end_micros, MAX_TIMESTAMP.micros)
+    return hash((start, end))
 
   def __eq__(self, other):
-    return (
-        type(self) == type(other) and
-        self._start_micros == other._start_micros and
-        self._end_micros == other._end_micros)
+    if type(self) != type(other):
+      return False
+
+    # Cut off window at start/end timestamps for comparison purposes since some
+    # portable runners do this already, and timestamps outside of the bands of
+    # Min/Max timestamps are functionally equal to Min/Max.
+    self_start = max(self._start_micros, MIN_TIMESTAMP.micros)
+    self_end = min(self._end_micros, MAX_TIMESTAMP.micros)
+    other_start = max(other._start_micros, MIN_TIMESTAMP.micros)
+    other_end = min(other._end_micros, MAX_TIMESTAMP.micros)
+
+    return (self_start == other_start and self_end == other_end)
 
   def __repr__(self):
     return '[%s, %s)' % (float(self.start), float(self.end))

Reply via email to