WeiZhong94 commented on a change in pull request #14800:
URL: https://github.com/apache/flink/pull/14800#discussion_r590104166



##########
File path: flink-python/pyflink/fn_execution/state_impl.py
##########
@@ -80,59 +81,99 @@ def __iter__(self):
         return iter(self._cache.values())
 
 
-class SynchronousValueRuntimeState(ValueState):
+class InternalKVState(object):
+    def __init__(self, name: str, remote_state_backend: 
'RemoteKeyedStateBackend'):
+        self.name = name
+        self._remote_state_backend = remote_state_backend
+        self._internal_state = None
+        self.namespace = None

Review comment:
       we can store the encoded namespace and only encoding at the first time.

##########
File path: 
flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java
##########
@@ -543,12 +543,14 @@ private ExecutableStage 
createExecutableStage(RunnerApi.Environment environment)
     private static StateRequestHandler getStateRequestHandler(
             KeyedStateBackend keyedStateBackend,
             TypeSerializer keySerializer,
+            TypeSerializer windowSerializer,

Review comment:
       If we decide to rename `window_coder` to `namespace_coder` at python 
side, we also need to rename the `windowSerializer` here.

##########
File path: flink-python/pyflink/fn_execution/state_impl.py
##########
@@ -798,6 +857,11 @@ def __init__(self,
             state_handler, map_state_read_cache_size)
         from pyflink.fn_execution.coders import FlattenRowCoder
         self._key_coder_impl = 
FlattenRowCoder(key_coder._field_coders).get_impl()
+        self.window_coder = window_coder

Review comment:
       I think we do not need to store the window_coder because it is not used 
anywhere except the init method.

##########
File path: flink-python/pyflink/fn_execution/state_impl.py
##########
@@ -959,7 +1028,8 @@ def clear_cached_iterators(self):
 
     @staticmethod
     def commit_internal_state(internal_state):
-        internal_state.commit()
+        if internal_state is not None:
+            internal_state.commit()

Review comment:
       else return, the following code is also not need to be executed.

##########
File path: flink-python/pyflink/fn_execution/state_impl.py
##########
@@ -798,6 +857,11 @@ def __init__(self,
             state_handler, map_state_read_cache_size)
         from pyflink.fn_execution.coders import FlattenRowCoder
         self._key_coder_impl = 
FlattenRowCoder(key_coder._field_coders).get_impl()
+        self.window_coder = window_coder
+        if window_coder:

Review comment:
       It would be better if we rename it to `namespace_coder` and its 
implementation to `_namespace_coder_impl` to make the meaning clearer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to