dianfu commented on a change in pull request #14758:
URL: https://github.com/apache/flink/pull/14758#discussion_r566662632



##########
File path: flink-python/pyflink/fn_execution/operation_utils.py
##########
@@ -310,7 +310,7 @@ def wrapped_keyed_process_function(value):
             on_timer_ctx.set_timestamp(value[1])
             on_timer_ctx.timer_service().set_current_watermark(value[2])
             current_key = value[3]
-            on_timer_ctx.set_current_key(current_key)
+            on_timer_ctx.set_current_key(current_key[0])

Review comment:
       What's the purpose of this change?

##########
File path: flink-python/pyflink/common/state.py
##########
@@ -67,33 +71,56 @@ def update(self, value: T) -> None:
         pass
 
 
-class ListState(State, Generic[T]):
+class AppendingState(State, Generic[IN, OUT]):
     """
-    :class:`State` interface for partitioned list state in Operations.
-    The state is accessed and modified by user functions, and checkpointed 
consistently
-    by the system as part of the distributed snapshots.
+    Base interface for partitioned state taht supports adding elements and 
inspecting the current
+    state. Elements can either be kept in a buffer (list-like) or aggregated 
into one value.
 
-    Currently only keyed list state is supported.
+    This state is accessed and modified by user functions, and checkpointed 
consistently by the
+    system as part of the distributed snapshots.
 
-    When it is a keyed list state, the state key is automatically supplied by 
the system, so the
-    user function always sees the value mapped to the key of the current 
element. That way, the
-    system can handle stream and state partitioning consistently together.
+    The state is only accessible by functions applied on a KeyedStream. The 
key is automatically
+    supplied by the system, so the function always sees the value mapped to 
the key of the current
+    element. That way, the system can handle stream and state partitioning 
consistently together.
     """
 
     @abstractmethod
-    def get(self) -> Iterable[T]:
+    def get(self) -> OUT:
         """
         Returns the elements under the current key.
         """
         pass
 
     @abstractmethod
-    def add(self, value: T) -> None:
+    def add(self, value: IN) -> None:
         """
         Adding the given value to the tail of this list state.
         """
         pass
 
+
+class MergingState(AppendingState[IN, OUT]):
+    """
+    Extension of AppendingState that allows merging of state. That is, two 
instance of MergingState
+    can be combined into a single instance that contains all the information 
of the two merged
+    states.
+    """
+    pass
+
+
+class ListState(MergingState[T, Iterable[T]]):

Review comment:
       Should we export these kinds of classes to users? If so, what about 
declare them in the __all__ = []?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to