dianfu commented on a change in pull request #14758:
URL: https://github.com/apache/flink/pull/14758#discussion_r566662632
##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -310,7 +310,7 @@ def wrapped_keyed_process_function(value):
on_timer_ctx.set_timestamp(value[1])
on_timer_ctx.timer_service().set_current_watermark(value[2])
current_key = value[3]
- on_timer_ctx.set_current_key(current_key)
+ on_timer_ctx.set_current_key(current_key[0])
Review comment:
What's the purpose of this change?
##########
File path: flink-python/pyflink/common/state.py
##########
@@ -67,33 +71,56 @@ def update(self, value: T) -> None:
pass
-class ListState(State, Generic[T]):
+class AppendingState(State, Generic[IN, OUT]):
"""
- :class:`State` interface for partitioned list state in Operations.
- The state is accessed and modified by user functions, and checkpointed
consistently
- by the system as part of the distributed snapshots.
+ Base interface for partitioned state taht supports adding elements and
inspecting the current
+ state. Elements can either be kept in a buffer (list-like) or aggregated
into one value.
- Currently only keyed list state is supported.
+ This state is accessed and modified by user functions, and checkpointed
consistently by the
+ system as part of the distributed snapshots.
- When it is a keyed list state, the state key is automatically supplied by
the system, so the
- user function always sees the value mapped to the key of the current
element. That way, the
- system can handle stream and state partitioning consistently together.
+ The state is only accessible by functions applied on a KeyedStream. The
key is automatically
+ supplied by the system, so the function always sees the value mapped to
the key of the current
+ element. That way, the system can handle stream and state partitioning
consistently together.
"""
@abstractmethod
- def get(self) -> Iterable[T]:
+ def get(self) -> OUT:
"""
Returns the elements under the current key.
"""
pass
@abstractmethod
- def add(self, value: T) -> None:
+ def add(self, value: IN) -> None:
"""
Adding the given value to the tail of this list state.
"""
pass
+
+class MergingState(AppendingState[IN, OUT]):
+ """
+ Extension of AppendingState that allows merging of state. That is, two
instance of MergingState
+ can be combined into a single instance that contains all the information
of the two merged
+ states.
+ """
+ pass
+
+
+class ListState(MergingState[T, Iterable[T]]):
Review comment:
Should we export these kinds of classes to users? If so, what about
declare them in the __all__ = []?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]