shunping commented on code in PR #35379:
URL: https://github.com/apache/beam/pull/35379#discussion_r2159080067


##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder, BooleanCoder
+from apache_beam.transforms.userstate import OrderedListStateSpec, TimerSpec, 
on_timer, ReadModifyWriteStateSpec
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP, MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import TimeDomain
+import typing
+from collections import defaultdict
+import numpy as np
+import logging
+logging.basicConfig(level=logging.INFO)
+
+class OrderedSlidingWindowFn(beam.DoFn):
+
+  ORDERED_BUFFER_STATE = OrderedListStateSpec('ordered_buffer', PickleCoder())
+  WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
+  TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder())
+  EARLIEST_TS_STATE = ReadModifyWriteStateSpec('earliest_ts', PickleCoder())
+
+
+  def __init__(self, window_size, slide_interval):
+    self.window_size = window_size
+    self.slide_interval = slide_interval
+
+  def start_bundle(self):
+    logging.info("start bundle")

Review Comment:
   How about using the debugging message level for all your messages here to 
avoid flooding the logs in a common setting.



##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder, BooleanCoder
+from apache_beam.transforms.userstate import OrderedListStateSpec, TimerSpec, 
on_timer, ReadModifyWriteStateSpec
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP, MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import TimeDomain
+import typing
+from collections import defaultdict
+import numpy as np
+import logging
+logging.basicConfig(level=logging.INFO)
+
+class OrderedSlidingWindowFn(beam.DoFn):
+
+  ORDERED_BUFFER_STATE = OrderedListStateSpec('ordered_buffer', PickleCoder())
+  WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
+  TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder())
+  EARLIEST_TS_STATE = ReadModifyWriteStateSpec('earliest_ts', PickleCoder())
+
+
+  def __init__(self, window_size, slide_interval):
+    self.window_size = window_size
+    self.slide_interval = slide_interval
+
+  def start_bundle(self):
+    logging.info("start bundle")
+
+  def finish_bundle(self):
+    logging.info("finish bundle")
+
+  def process(self,
+              element,
+              timestamp=beam.DoFn.TimestampParam,
+              ordered_buffer=beam.DoFn.StateParam(ORDERED_BUFFER_STATE),
+              window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
+              timer_state=beam.DoFn.StateParam(TIMER_STATE),
+              earliest_ts_state=beam.DoFn.StateParam(EARLIEST_TS_STATE)):
+
+    key, value = element
+    ordered_buffer.add((timestamp, value))
+
+    logging.info(f"receive {element} at {timestamp}")
+    timer_started = timer_state.read()
+    if not timer_started:
+      earliest_ts_state.write(timestamp)

Review Comment:
   I think we should set `earliest_ts_state` if it is a first slide OR if the 
current input timestamp is smaller than the existing value in 
`earliest_ts_state`. Otherwise, a late data here would not update 
`earliest_ts_state`.
   
   I think we can do the following before `if not timer_started`:
   ```python
       earliest = earliest_ts_state.read()
       if not earlist or earliest > timestamp:
         earliest_ts_state.write(timestamp)
   ```
   
   WDYT?



##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder, BooleanCoder
+from apache_beam.transforms.userstate import OrderedListStateSpec, TimerSpec, 
on_timer, ReadModifyWriteStateSpec
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP, MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import TimeDomain
+import typing
+from collections import defaultdict
+import numpy as np
+import logging
+logging.basicConfig(level=logging.INFO)
+
+class OrderedSlidingWindowFn(beam.DoFn):
+
+  ORDERED_BUFFER_STATE = OrderedListStateSpec('ordered_buffer', PickleCoder())
+  WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
+  TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder())
+  EARLIEST_TS_STATE = ReadModifyWriteStateSpec('earliest_ts', PickleCoder())
+
+
+  def __init__(self, window_size, slide_interval):
+    self.window_size = window_size
+    self.slide_interval = slide_interval
+
+  def start_bundle(self):
+    logging.info("start bundle")
+
+  def finish_bundle(self):
+    logging.info("finish bundle")
+
+  def process(self,
+              element,
+              timestamp=beam.DoFn.TimestampParam,
+              ordered_buffer=beam.DoFn.StateParam(ORDERED_BUFFER_STATE),
+              window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
+              timer_state=beam.DoFn.StateParam(TIMER_STATE),
+              earliest_ts_state=beam.DoFn.StateParam(EARLIEST_TS_STATE)):
+
+    key, value = element
+    ordered_buffer.add((timestamp, value))
+
+    logging.info(f"receive {element} at {timestamp}")
+    timer_started = timer_state.read()
+    if not timer_started:
+      earliest_ts_state.write(timestamp)
+
+      first_slide_start = int(
+          timestamp.micros / 1e6 // self.slide_interval) * self.slide_interval
+      first_slide_start_ts = Timestamp.of(first_slide_start)
+
+      first_window_end_ts = first_slide_start_ts + self.window_size
+      logging.info(f"set timer to {first_window_end_ts}")
+      window_timer.set(first_window_end_ts)
+
+      timer_state.write(True)
+
+  @on_timer(WINDOW_TIMER)
+  def on_timer(self,
+               key=beam.DoFn.KeyParam,
+               fire_ts=beam.DoFn.TimestampParam,
+               ordered_buffer=beam.DoFn.StateParam(ORDERED_BUFFER_STATE),
+               window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
+               timer_state=beam.DoFn.StateParam(TIMER_STATE),
+               earliest_ts_state=beam.DoFn.StateParam(EARLIEST_TS_STATE)):
+    logging.info(f"timer fire at {fire_ts}")
+    window_end_ts = fire_ts
+    window_start_ts = window_end_ts - self.window_size
+
+    window_values = list(ordered_buffer.read_range(window_start_ts, 
window_end_ts))
+    logging.info(f"Window [{window_start_ts}, {window_end_ts}) contains 
{len(window_values)} elements.")
+
+
+    logging.info(f"window start: {window_start_ts}, window end: 
{window_end_ts}")
+    logging.info(f"windowed data in buffer {str(window_values)}")
+    if window_values:
+      yield (key, (window_start_ts, window_end_ts, window_values))
+
+    next_window_end_ts = fire_ts + self.slide_interval
+    next_window_start_ts = window_start_ts + self.slide_interval
+
+    earliest_ts = earliest_ts_state.read()
+    ordered_buffer.clear_range(earliest_ts, next_window_start_ts)
+
+    remaining_data = list(ordered_buffer.read_range(next_window_start_ts, 
MAX_TIMESTAMP))
+
+    if not remaining_data:
+      timer_state.clear()
+      earliest_ts_state.clear()

Review Comment:
   I think instead of clearing this state, you probably wants to set it to a 
reasonable value like `next_window_start_ts`. 
   
   The main purpose of `earliest_ts` is to keep track of any late value and 
prevent them from staying in the `ordered_buffer` forever.



##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder, BooleanCoder
+from apache_beam.transforms.userstate import OrderedListStateSpec, TimerSpec, 
on_timer, ReadModifyWriteStateSpec
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP, MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import TimeDomain
+import typing
+from collections import defaultdict
+import numpy as np
+import logging
+logging.basicConfig(level=logging.INFO)

Review Comment:
   Instead of setting the overall logging, we can create our own logger and use 
it throughout this module.
   
   ```
   _LOGGER = logging.getLogger(__name__)
   ```
   
   Then when you want to log a info message, use `_LOGGER.info(...)`.



##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder, BooleanCoder
+from apache_beam.transforms.userstate import OrderedListStateSpec, TimerSpec, 
on_timer, ReadModifyWriteStateSpec
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP, MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import TimeDomain
+import typing
+from collections import defaultdict
+import numpy as np
+import logging
+logging.basicConfig(level=logging.INFO)
+
+class OrderedSlidingWindowFn(beam.DoFn):
+
+  ORDERED_BUFFER_STATE = OrderedListStateSpec('ordered_buffer', PickleCoder())
+  WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
+  TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder())
+  EARLIEST_TS_STATE = ReadModifyWriteStateSpec('earliest_ts', PickleCoder())

Review Comment:
   I think we can use `TimestampCoder` instead of `PickleCoder` for 
`EARLIEST_TS_STATE`.



##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder, BooleanCoder
+from apache_beam.transforms.userstate import OrderedListStateSpec, TimerSpec, 
on_timer, ReadModifyWriteStateSpec
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP, MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import TimeDomain
+import typing
+from collections import defaultdict
+import numpy as np
+import logging
+logging.basicConfig(level=logging.INFO)
+
+class OrderedSlidingWindowFn(beam.DoFn):
+
+  ORDERED_BUFFER_STATE = OrderedListStateSpec('ordered_buffer', PickleCoder())
+  WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
+  TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder())
+  EARLIEST_TS_STATE = ReadModifyWriteStateSpec('earliest_ts', PickleCoder())
+
+
+  def __init__(self, window_size, slide_interval):
+    self.window_size = window_size

Review Comment:
   Are `window_size` and `slide_interval` here floating point numbers or 
integers?



##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window_test.py:
##########
@@ -0,0 +1,138 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from ordered_sliding_window import OrderedSlidingWindowFn, FillGapsFn
+import unittest
+import apache_beam as beam
+from apache_beam.testing.util import assert_that, equal_to
+import math
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP
+from util import PeriodicStream
+import random
+import logging
+from apache_beam.options.pipeline_options import PipelineOptions
+import numpy as np
+
+logging.basicConfig(level=logging.INFO)
+
+def format_for_comparison(element):
+    """Converts np.nan in the data list to the string 'NaN' for stable 
comparison."""
+    key, (start_ts, end_ts, data_list) = element
+    formatted_list = ['NaN' if isinstance(x, float) and np.isnan(x) else x for 
x in data_list]
+    return (key, (start_ts, end_ts, formatted_list))
+
+class DoFnTests(unittest.TestCase):
+
+    def test_pipeline_with_periodic_stream_data(self):

Review Comment:
   Can we also add a test when late data is present?



##########
sdks/python/apache_beam/ml/ts/ordered_sliding_window.py:
##########
@@ -0,0 +1,154 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import apache_beam as beam
+from apache_beam.coders import PickleCoder, BooleanCoder
+from apache_beam.transforms.userstate import OrderedListStateSpec, TimerSpec, 
on_timer, ReadModifyWriteStateSpec
+from apache_beam.utils.timestamp import Timestamp, MIN_TIMESTAMP, MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import TimeDomain
+import typing
+from collections import defaultdict
+import numpy as np
+import logging
+logging.basicConfig(level=logging.INFO)
+
+class OrderedSlidingWindowFn(beam.DoFn):
+
+  ORDERED_BUFFER_STATE = OrderedListStateSpec('ordered_buffer', PickleCoder())
+  WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
+  TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder())
+  EARLIEST_TS_STATE = ReadModifyWriteStateSpec('earliest_ts', PickleCoder())
+
+
+  def __init__(self, window_size, slide_interval):
+    self.window_size = window_size
+    self.slide_interval = slide_interval
+
+  def start_bundle(self):
+    logging.info("start bundle")
+
+  def finish_bundle(self):
+    logging.info("finish bundle")
+
+  def process(self,
+              element,
+              timestamp=beam.DoFn.TimestampParam,
+              ordered_buffer=beam.DoFn.StateParam(ORDERED_BUFFER_STATE),
+              window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
+              timer_state=beam.DoFn.StateParam(TIMER_STATE),
+              earliest_ts_state=beam.DoFn.StateParam(EARLIEST_TS_STATE)):
+
+    key, value = element
+    ordered_buffer.add((timestamp, value))
+
+    logging.info(f"receive {element} at {timestamp}")
+    timer_started = timer_state.read()
+    if not timer_started:
+      earliest_ts_state.write(timestamp)
+
+      first_slide_start = int(
+          timestamp.micros / 1e6 // self.slide_interval) * self.slide_interval
+      first_slide_start_ts = Timestamp.of(first_slide_start)
+
+      first_window_end_ts = first_slide_start_ts + self.window_size
+      logging.info(f"set timer to {first_window_end_ts}")
+      window_timer.set(first_window_end_ts)
+
+      timer_state.write(True)
+
+  @on_timer(WINDOW_TIMER)
+  def on_timer(self,
+               key=beam.DoFn.KeyParam,
+               fire_ts=beam.DoFn.TimestampParam,
+               ordered_buffer=beam.DoFn.StateParam(ORDERED_BUFFER_STATE),
+               window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
+               timer_state=beam.DoFn.StateParam(TIMER_STATE),
+               earliest_ts_state=beam.DoFn.StateParam(EARLIEST_TS_STATE)):
+    logging.info(f"timer fire at {fire_ts}")
+    window_end_ts = fire_ts
+    window_start_ts = window_end_ts - self.window_size
+
+    window_values = list(ordered_buffer.read_range(window_start_ts, 
window_end_ts))
+    logging.info(f"Window [{window_start_ts}, {window_end_ts}) contains 
{len(window_values)} elements.")
+
+
+    logging.info(f"window start: {window_start_ts}, window end: 
{window_end_ts}")
+    logging.info(f"windowed data in buffer {str(window_values)}")
+    if window_values:
+      yield (key, (window_start_ts, window_end_ts, window_values))
+
+    next_window_end_ts = fire_ts + self.slide_interval
+    next_window_start_ts = window_start_ts + self.slide_interval
+
+    earliest_ts = earliest_ts_state.read()
+    ordered_buffer.clear_range(earliest_ts, next_window_start_ts)
+
+    remaining_data = list(ordered_buffer.read_range(next_window_start_ts, 
MAX_TIMESTAMP))
+
+    if not remaining_data:
+      timer_state.clear()
+      earliest_ts_state.clear()
+      return
+
+    logging.info(f"set timer to {next_window_end_ts}")
+    window_timer.set(next_window_end_ts)
+
+
+class FillGapsFn(beam.DoFn):

Review Comment:
   Shall we just introduce OrderedSlidingWindow in this PR and leave FillGapsFn 
to a different one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to