claudevdm commented on code in PR #36575: URL: https://github.com/apache/beam/pull/36575#discussion_r2452073845
########## sdks/python/apache_beam/examples/cookbook/ordered_batch_elements.py: ########## @@ -0,0 +1,436 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import apache_beam as beam +from apache_beam.coders import BooleanCoder +from apache_beam.coders import PickleCoder +from apache_beam.coders import TimestampCoder +from apache_beam.coders import VarIntCoder +from apache_beam.transforms.ptransform import PTransform +from apache_beam.transforms.timeutil import TimeDomain +from apache_beam.transforms.userstate import OrderedListStateSpec +from apache_beam.transforms.userstate import ReadModifyWriteStateSpec +from apache_beam.transforms.userstate import TimerSpec +from apache_beam.transforms.userstate import on_timer +from apache_beam.transforms.window import FixedWindows +from apache_beam.transforms.window import GlobalWindows +from apache_beam.transforms.window import TimestampedValue +from apache_beam.typehints.typehints import TupleConstraint +from apache_beam.utils.timestamp import MIN_TIMESTAMP +from apache_beam.utils.timestamp import DurationTypes # pylint: disable=unused-import +from apache_beam.utils.timestamp import Timestamp +from apache_beam.utils.timestamp import TimestampTypes # pylint: disable=unused-import + +_LOGGER = logging.getLogger("ordered_batch_elements") +"""An example of using states and timers to batch elements in time order. + +The PTransform is a turn-key transform that can handle different input window +settings and element types. + +Not only does it buffer elements, it will also prepend a window with +the last seen element if the window is empty or there is a gap between +the beginning of the window and the timestamp of its first element. +This facilitates the subsequent forward-filling operation. +""" + + +class OrderedBatchElementsDoFn(beam.DoFn): + """A DoFn that batches elements into ordered, fixed-size windows. + + This DoFn uses Beam's stateful processing capabilities to buffer elements + and emit them in order within fixed windows. It handles out-of-order data, + late data, and idle periods by leveraging `OrderedListState`, `TimerSpec`, + and `ReadModifyWriteStateSpec`. + + Attributes: + ORDERED_BUFFER_STATE: An `OrderedListStateSpec` for storing incoming + elements (timestamp, value) in a time-ordered buffer. + WINDOW_TIMER: A `TimerSpec` set to the watermark time domain, used to + trigger the emission of batched elements at window boundaries. + TIMER_STATE: A `ReadModifyWriteStateSpec` (BooleanCoder) to track whether + the window timer has been initialized and set for the current key. + IDLE_COUNT: A `ReadModifyWriteStateSpec` (VarIntCoder) to count + consecutive empty windows, used to detect and potentially abort + processing for idle keys. + LAST_VALUE: A `ReadModifyWriteStateSpec` (PickleCoder) to store the last + emitted value for a key, used to fill gaps in windows or provide a + default for empty windows. + BUFFER_MIN_TS_STATE: A `ReadModifyWriteStateSpec` (TimestampCoder) to + keep track of the minimum timestamp currently present in the + `ordered_buffer` for efficient clearing. + ESTIMATED_WM_STATE: A `ReadModifyWriteStateSpec` (TimestampCoder) to + store the highest observed timestamp for a key, used as an estimated + watermark to detect and filter excessively late data. + """ + ORDERED_BUFFER_STATE = OrderedListStateSpec('ordered_buffer', PickleCoder()) + WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK) + TIMER_STATE = ReadModifyWriteStateSpec('timer_state', BooleanCoder()) + IDLE_COUNT = ReadModifyWriteStateSpec('idle_count', VarIntCoder()) + LAST_VALUE = ReadModifyWriteStateSpec('last_value', PickleCoder()) + BUFFER_MIN_TS_STATE = ReadModifyWriteStateSpec( + 'buffer_min_ts', TimestampCoder()) + ESTIMATED_WM_STATE = ReadModifyWriteStateSpec( + 'estimated_wm', TimestampCoder()) + + def __init__( + self, + size, Review Comment: I see. Maybe size is the correct term, but it makes me first think this is the number of elements that will be present in the window. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
