bgeng777 commented on code in PR #27170:
URL: https://github.com/apache/flink/pull/27170#discussion_r2476221681
##########
flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py:
##########
@@ -265,9 +275,10 @@ def put(self, windowed_value, timestamp, watermark,
record) -> ResultFuture[OUT]
return entry
def advance_watermark(self, watermark):
- with self._lock:
- if watermark > self._current_watermark:
- self._current_watermark = watermark
+ if watermark > self._current_watermark:
+ self._current_watermark = watermark
+
+ with self._lock:
Review Comment:
`_current_watermark` is only updated when main thread calling
`self._queue.advance_watermark(watermark)` so I think it should be fine to just
check and update it here without the `_lock`;
But I am also curious about why we change it here. Do we meet some deadlock
case?
##########
flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py:
##########
@@ -343,3 +354,65 @@ def _add_segment(self, capacity) ->
'UnorderedStreamElementQueue.Segment':
new_segment = UnorderedStreamElementQueue.Segment(capacity)
self._segments.append(new_segment)
return new_segment
+
+
+class OrderedStreamElementQueue(StreamElementQueue):
+
+ def __init__(self, capacity: int, exception_checker):
+ self._capacity = capacity
+ self._exception_checker = exception_checker
+ self._queue = collections.deque()
+ self._lock = threading.RLock()
+ self._not_full = threading.Condition(self._lock)
+ self._not_empty = threading.Condition(self._lock)
+ self._number_of_pending_entries = 0
+
+ def put(self, windowed_value, timestamp, watermark, record) ->
ResultFuture[OUT]:
+ with self._not_full:
+ while self.size() >= self._capacity:
+ self._not_full.wait(1)
+ self._exception_checker()
+
+ entry = StreamRecordQueueEntry(windowed_value, timestamp,
watermark, record)
+ entry.on_complete(self.on_complete_handler)
+ self._queue.append(entry)
+ self._number_of_pending_entries += 1
+ return entry
+
+ def advance_watermark(self, watermark):
+ # do nothing in ordered mode
+ pass
+
+ def emit_completed_element(self, output_processor):
+ with self._not_full:
+ if not self.has_completed_elements():
+ return
+
+ self._queue.popleft().emit_result(output_processor)
+ self._number_of_pending_entries -= 1
+ self._not_full.notify_all()
+
+ def has_completed_elements(self) -> bool:
+ return len(self._queue) > 0 and self._queue[0].is_done()
+
+ def wait_for_completed_elements(self):
+ with self._not_empty:
+ while not self.has_completed_elements():
+ self._not_empty.wait()
+
+ def wait_for_in_flight_elements_processed(self, timeout=1):
+ with self._not_full:
+ if self._number_of_pending_entries != 0:
+ self._not_full.wait(timeout)
+
+ def is_empty(self) -> bool:
+ with self._lock:
+ return self._number_of_pending_entries == 0
+
+ def size(self) -> int:
+ return self._number_of_pending_entries
Review Comment:
why is `with self._lock:` not needed here
##########
flink-python/pyflink/fn_execution/datastream/process/async_function/queue.py:
##########
@@ -343,3 +354,65 @@ def _add_segment(self, capacity) ->
'UnorderedStreamElementQueue.Segment':
new_segment = UnorderedStreamElementQueue.Segment(capacity)
self._segments.append(new_segment)
return new_segment
+
+
+class OrderedStreamElementQueue(StreamElementQueue):
+
+ def __init__(self, capacity: int, exception_checker):
+ self._capacity = capacity
+ self._exception_checker = exception_checker
+ self._queue = collections.deque()
+ self._lock = threading.RLock()
+ self._not_full = threading.Condition(self._lock)
+ self._not_empty = threading.Condition(self._lock)
+ self._number_of_pending_entries = 0
+
+ def put(self, windowed_value, timestamp, watermark, record) ->
ResultFuture[OUT]:
+ with self._not_full:
+ while self.size() >= self._capacity:
+ self._not_full.wait(1)
+ self._exception_checker()
+
+ entry = StreamRecordQueueEntry(windowed_value, timestamp,
watermark, record)
+ entry.on_complete(self.on_complete_handler)
+ self._queue.append(entry)
+ self._number_of_pending_entries += 1
+ return entry
+
+ def advance_watermark(self, watermark):
+ # do nothing in ordered mode
+ pass
+
+ def emit_completed_element(self, output_processor):
+ with self._not_full:
+ if not self.has_completed_elements():
+ return
+
+ self._queue.popleft().emit_result(output_processor)
+ self._number_of_pending_entries -= 1
+ self._not_full.notify_all()
+
+ def has_completed_elements(self) -> bool:
+ return len(self._queue) > 0 and self._queue[0].is_done()
Review Comment:
why is with self._lock: not needed here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]