HuangXingBo commented on a change in pull request #15212:
URL: https://github.com/apache/flink/pull/15212#discussion_r599193955



##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -406,6 +420,121 @@ def create_process_function(self, user_defined_aggs, 
input_extractors, filter_ar
             self.index_of_count_star)
 
 
+class 
StreamGroupWindowAggregateOperation(AbstractStreamGroupAggregateOperation):
+    def __init__(self, spec, keyed_state_backend):
+        self._window = spec.serialized_fn.group_window
+        self._named_property_extractor = self._create_named_property_function()
+        self._is_time_window = None
+        super(StreamGroupWindowAggregateOperation, self).__init__(spec, 
keyed_state_backend)
+
+    def create_process_function(self, user_defined_aggs, input_extractors, 
filter_args,
+                                distinct_indexes, distinct_view_descriptors, 
key_selector,
+                                state_value_coder):
+        self._is_time_window = self._window.is_time_window
+        if self._window.window_type == 
flink_fn_execution_pb2.GroupWindow.TUMBLING_GROUP_WINDOW:
+            if self._is_time_window:
+                window_assigner = TumblingWindowAssigner(
+                    self._window.window_size, 0, self._window.is_row_time)
+            else:
+                window_assigner = 
CountTumblingWindowAssigner(self._window.window_size)
+        elif self._window.window_type == 
flink_fn_execution_pb2.GroupWindow.SLIDING_GROUP_WINDOW:
+            raise Exception("General Python UDAF in Sliding window will be 
implemented in "
+                            "FLINK-21629")
+        else:
+            raise Exception("General Python UDAF in Sessiong window will be 
implemented in "
+                            "FLINK-21630")
+        if self._is_time_window:
+            if self._window.is_row_time:
+                trigger = EventTimeTrigger()
+            else:
+                trigger = ProcessingTimeTrigger()
+        else:
+            trigger = CountTrigger(self._window.window_size)
+
+        window_aggregator = SimpleNamespaceAggsHandleFunction(
+            user_defined_aggs,
+            input_extractors,
+            self.index_of_count_star,
+            self.count_star_inserted,
+            self._named_property_extractor,
+            self.data_view_specs,
+            filter_args,
+            distinct_indexes,
+            distinct_view_descriptors)
+        return GroupWindowAggFunction(
+            self._window.allowedLateness,
+            key_selector,
+            self.keyed_state_backend,
+            state_value_coder,
+            window_assigner,
+            window_aggregator,
+            trigger,
+            self._window.time_field_index)
+
+    def process_element_or_timer(self, input_datas: List[Tuple[int, Row, int, 
Row, int, int]]):
+        results = []
+        for input_data in input_datas:
+            if input_data[0] == NORMAL_RECORD:
+                self.group_agg_function.process_watermark(input_data[2])
+                result_datas = 
self.group_agg_function.process_element(input_data[1])
+                for result_data in result_datas:
+                    result = [NORMAL_RECORD, result_data, None]
+                    results.append(result)
+                timers = self.group_agg_function.get_timers()
+                for timer in timers:
+                    timer_operand_type = timer[0]  # type: TimerOperandType
+                    internal_timer = timer[1]  # type: InternalTimer
+                    window = internal_timer.get_namespace()
+                    key = internal_timer.get_key()
+                    timestamp = internal_timer.get_timestamp()
+                    if self._is_time_window:
+                        timer_data = \
+                            [TRIGGER_TIMER, None,
+                             [timer_operand_type.value, key, timestamp, 
window.start, window.end]]

Review comment:
       Yes. Make sense




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to