WeiZhong94 commented on a change in pull request #14758:
URL: https://github.com/apache/flink/pull/14758#discussion_r565170564
##########
File path: flink-python/pyflink/common/state.py
##########
@@ -94,6 +98,29 @@ def add(self, value: T) -> None:
"""
pass
+
+class MergingState(AppendingState[IN, OUT], ABC):
Review comment:
```suggestion
class MergingState(AppendingState[IN, OUT]):
```
##########
File path: flink-python/pyflink/common/state.py
##########
@@ -67,17 +71,17 @@ def update(self, value: T) -> None:
pass
-class ListState(State, Generic[T]):
+class AppendingState(State, Generic[IN, OUT]):
"""
- :class:`State` interface for partitioned list state in Operations.
- The state is accessed and modified by user functions, and checkpointed
consistently
- by the system as part of the distributed snapshots.
+ Base interface for partitioned state taht supports adding elements and
inspecting the current
+ state. Elements can either be kept in a buffer (list-like) or aggregated
into one value.
- Currently only keyed list state is supported.
+ This state is accessed and modified by user functions, and checkpointed
consistently by the
+ system as part of the distributed snapshots.
- When it is a keyed list state, the state key is automatically supplied by
the system, so the
- user function always sees the value mapped to the key of the current
element. That way, the
- system can handle stream and state partitioning consistently together.
+ The state is only accessible by functions applied on a KeyedStream. The
key is automatically
+ supplied by the system, so the function always sees the value mapped to
the key of the current
+ element. That way, the system can handle stream and state partitioning
consistently together.
"""
@abstractmethod
Review comment:
def get(self) -> OUT:
##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -500,6 +503,43 @@ def generate_func(self, serialized_fn) -> Tuple:
self.keyed_state_backend)
return func, [proc_func]
+ def open(self):
+ for user_defined_func in self.user_defined_funcs:
+ if hasattr(user_defined_func, 'open'):
+ runtime_context =
KeyedProcessFunctionOperation.InternalRuntimeContext(
+ self.spec.serialized_fn.runtime_context.task_name,
+
self.spec.serialized_fn.runtime_context.task_name_with_subtasks,
+
self.spec.serialized_fn.runtime_context.number_of_parallel_subtasks,
+
self.spec.serialized_fn.runtime_context.max_number_of_parallel_subtasks,
+
self.spec.serialized_fn.runtime_context.index_of_this_subtask,
+ self.spec.serialized_fn.runtime_context.attempt_number,
+ {p.key: p.value for p in
+ self.spec.serialized_fn.runtime_context.job_parameters},
+ self.keyed_state_backend)
+ user_defined_func.open(runtime_context)
+
+ class InternalRuntimeContext(RuntimeContext):
Review comment:
The InternalRuntimeContext should be extracted to outer scope as other
operations will use it in the future.
##########
File path: flink-python/pyflink/fn_execution/operations.py
##########
@@ -500,6 +503,43 @@ def generate_func(self, serialized_fn) -> Tuple:
self.keyed_state_backend)
return func, [proc_func]
+ def open(self):
+ for user_defined_func in self.user_defined_funcs:
+ if hasattr(user_defined_func, 'open'):
+ runtime_context =
KeyedProcessFunctionOperation.InternalRuntimeContext(
+ self.spec.serialized_fn.runtime_context.task_name,
+
self.spec.serialized_fn.runtime_context.task_name_with_subtasks,
+
self.spec.serialized_fn.runtime_context.number_of_parallel_subtasks,
+
self.spec.serialized_fn.runtime_context.max_number_of_parallel_subtasks,
+
self.spec.serialized_fn.runtime_context.index_of_this_subtask,
+ self.spec.serialized_fn.runtime_context.attempt_number,
+ {p.key: p.value for p in
+ self.spec.serialized_fn.runtime_context.job_parameters},
+ self.keyed_state_backend)
+ user_defined_func.open(runtime_context)
+
+ class InternalRuntimeContext(RuntimeContext):
+
+ def __init__(self, task_name: str, task_name_with_subtasks: str,
+ number_of_parallel_subtasks: int,
max_number_of_parallel_subtasks: int,
+ index_of_this_subtask: int, attempt_number: int,
+ job_parameters: Dict[str, str], keyed_state_backend:
RemoteKeyedStateBackend):
+ super(KeyedProcessFunctionOperation.InternalRuntimeContext,
self).__init__(
+ task_name, task_name_with_subtasks,
number_of_parallel_subtasks,
+ max_number_of_parallel_subtasks, index_of_this_subtask,
attempt_number,
+ job_parameters)
+ self.keyed_state_backend = keyed_state_backend
Review comment:
The private member should have a "_" prefix. Users can touch the
instance of this class in their code so we need to eliminate unnecessary
confusion.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]