This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d137c12877f [FLINK-28688][python] Support DataStream 
PythonWindowOperator in Thread Mode
d137c12877f is described below

commit d137c12877fa2b21688acd3a89bb947975b0553a
Author: huangxingbo <h...@apache.org>
AuthorDate: Wed Aug 3 17:22:58 2022 +0800

    [FLINK-28688][python] Support DataStream PythonWindowOperator in Thread Mode
    
    This closes #20435.
---
 flink-python/dev/dev-requirements.txt              |   2 +-
 flink-python/pom.xml                               |   2 +-
 flink-python/pyflink/datastream/data_stream.py     |   7 +-
 .../pyflink/datastream/tests/test_window.py        |  90 +++++----
 .../fn_execution/datastream/embedded/operations.py | 116 +++++++++--
 .../datastream/embedded/process_function.py        |  16 ++
 .../datastream/embedded/runtime_context.py         |  35 ++--
 .../fn_execution/datastream/embedded/state_impl.py | 133 ++++++++++---
 .../datastream/embedded/timerservice_impl.py       |  29 +++
 .../datastream/window/window_operator.py           |  14 +-
 .../pyflink/fn_execution/embedded/converters.py    |  37 +++-
 .../fn_execution/embedded/operation_utils.py       |  10 +-
 .../pyflink/fn_execution/embedded/operations.py    |  12 +-
 .../pyflink/fn_execution/embedded/state_impl.py    |  87 +++++++++
 flink-python/setup.py                              |   2 +-
 .../chain/PythonOperatorChainingOptimizer.java     |   4 +-
 ...ractOneInputEmbeddedPythonFunctionOperator.java |   4 +-
 ...ractTwoInputEmbeddedPythonFunctionOperator.java |   4 +-
 .../embedded/EmbeddedPythonWindowOperator.java     | 211 +++++++++++++++++++++
 flink-python/src/main/resources/META-INF/NOTICE    |   2 +-
 20 files changed, 698 insertions(+), 119 deletions(-)

diff --git a/flink-python/dev/dev-requirements.txt 
b/flink-python/dev/dev-requirements.txt
index 61f21b556dc..fcc38508101 100755
--- a/flink-python/dev/dev-requirements.txt
+++ b/flink-python/dev/dev-requirements.txt
@@ -31,6 +31,6 @@ numpy>=1.14.3,<1.20; python_version < '3.7'
 fastavro>=1.1.0,<1.4.8
 grpcio>=1.29.0,<1.47
 grpcio-tools>=1.3.5,<=1.14.2
-pemja==0.2.3; python_version >= '3.7' and platform_system != 'Windows'
+pemja==0.2.4; python_version >= '3.7' and platform_system != 'Windows'
 httplib2>=0.19.0,<=0.20.4
 protobuf<3.18
\ No newline at end of file
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index bc376f844a6..eba9a397479 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -116,7 +116,7 @@ under the License.
                <dependency>
                        <groupId>com.alibaba</groupId>
                        <artifactId>pemja</artifactId>
-                       <version>0.2.3</version>
+                       <version>0.2.4</version>
                </dependency>
 
                <!-- Protobuf dependencies -->
diff --git a/flink-python/pyflink/datastream/data_stream.py 
b/flink-python/pyflink/datastream/data_stream.py
index 845cd156afc..4e8b5cbb229 100644
--- a/flink-python/pyflink/datastream/data_stream.py
+++ b/flink-python/pyflink/datastream/data_stream.py
@@ -2738,7 +2738,12 @@ def _get_one_input_stream_operator(data_stream: 
DataStream,
         else:
             j_namespace_serializer = \
                 
gateway.jvm.org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer()
-        j_python_function_operator = 
gateway.jvm.ExternalPythonKeyedProcessOperator(
+        if python_execution_mode == 'thread':
+            JDataStreamPythonWindowFunctionOperator = 
gateway.jvm.EmbeddedPythonWindowOperator
+        else:
+            JDataStreamPythonWindowFunctionOperator = 
gateway.jvm.ExternalPythonKeyedProcessOperator
+
+        j_python_function_operator = JDataStreamPythonWindowFunctionOperator(
             j_conf,
             j_data_stream_python_function_info,
             j_input_types,
diff --git a/flink-python/pyflink/datastream/tests/test_window.py 
b/flink-python/pyflink/datastream/tests/test_window.py
index b6433f409ce..886e8456949 100644
--- a/flink-python/pyflink/datastream/tests/test_window.py
+++ b/flink-python/pyflink/datastream/tests/test_window.py
@@ -15,8 +15,11 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+import sys
 from typing import Iterable, Tuple, Dict
 
+import pytest
+
 from pyflink.common import Configuration
 from pyflink.common.time import Time
 from pyflink.common.typeinfo import Types
@@ -36,7 +39,7 @@ from pyflink.testing.test_case_utils import 
PyFlinkStreamingTestCase
 from pyflink.util.java_utils import get_j_env_configuration
 
 
-class WindowTests(PyFlinkStreamingTestCase):
+class WindowTests(object):
 
     def setUp(self) -> None:
         super(WindowTests, self).setUp()
@@ -371,41 +374,6 @@ class WindowTests(PyFlinkStreamingTestCase):
         expected = ['(hi,1,7,4)', '(hi,8,12,2)', '(hi,15,18,1)']
         self.assert_equals_sorted(expected, results)
 
-    def test_side_output_late_data(self):
-        self.env.set_parallelism(1)
-        config = Configuration(
-            
j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment)
-        )
-        config.set_integer('python.fn-execution.bundle.size', 1)
-        jvm = get_gateway().jvm
-        watermark_strategy = WatermarkStrategy(
-            
jvm.org.apache.flink.api.common.eventtime.WatermarkStrategy.forGenerator(
-                jvm.org.apache.flink.streaming.api.functions.python.eventtime.
-                PerElementWatermarkGenerator.getSupplier()
-            )
-        ).with_timestamp_assigner(SecondColumnTimestampAssigner())
-
-        tag = OutputTag('late-data', type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
-        ds1 = self.env.from_collection([('a', 0), ('a', 8), ('a', 4), ('a', 
6)],
-                                       type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
-        ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \
-            .key_by(lambda e: e[0]) \
-            .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
-            .allowed_lateness(0) \
-            .side_output_late_data(tag) \
-            .process(CountWindowProcessFunction(),
-                     Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), 
Types.INT()]))
-        main_sink = DataStreamTestSinkFunction()
-        ds2.add_sink(main_sink)
-        side_sink = DataStreamTestSinkFunction()
-        ds2.get_side_output(tag).add_sink(side_sink)
-
-        self.env.execute('test_side_output_late_data')
-        main_expected = ['(a,0,5,1)', '(a,5,10,2)']
-        self.assert_equals_sorted(main_expected, main_sink.get_results())
-        side_expected = ['+I[a, 4]']
-        self.assert_equals_sorted(side_expected, side_sink.get_results())
-
     def test_global_window_with_purging_trigger(self):
         self.env.set_parallelism(1)
         data_stream = self.env.from_collection([
@@ -590,6 +558,56 @@ class WindowTests(PyFlinkStreamingTestCase):
         self.assert_equals_sorted(expected, results)
 
 
+class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
+    def setUp(self) -> None:
+        super(ProcessWindowTests, self).setUp()
+        config = 
get_j_env_configuration(self.env._j_stream_execution_environment)
+        config.setString("python.execution-mode", "process")
+
+    def test_side_output_late_data(self):
+        self.env.set_parallelism(1)
+        config = Configuration(
+            
j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment)
+        )
+        config.set_integer('python.fn-execution.bundle.size', 1)
+        jvm = get_gateway().jvm
+        watermark_strategy = WatermarkStrategy(
+            
jvm.org.apache.flink.api.common.eventtime.WatermarkStrategy.forGenerator(
+                jvm.org.apache.flink.streaming.api.functions.python.eventtime.
+                PerElementWatermarkGenerator.getSupplier()
+            )
+        ).with_timestamp_assigner(SecondColumnTimestampAssigner())
+
+        tag = OutputTag('late-data', type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
+        ds1 = self.env.from_collection([('a', 0), ('a', 8), ('a', 4), ('a', 
6)],
+                                       type_info=Types.ROW([Types.STRING(), 
Types.INT()]))
+        ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \
+            .key_by(lambda e: e[0]) \
+            .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
+            .allowed_lateness(0) \
+            .side_output_late_data(tag) \
+            .process(CountWindowProcessFunction(),
+                     Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), 
Types.INT()]))
+        main_sink = DataStreamTestSinkFunction()
+        ds2.add_sink(main_sink)
+        side_sink = DataStreamTestSinkFunction()
+        ds2.get_side_output(tag).add_sink(side_sink)
+
+        self.env.execute('test_side_output_late_data')
+        main_expected = ['(a,0,5,1)', '(a,5,10,2)']
+        self.assert_equals_sorted(main_expected, main_sink.get_results())
+        side_expected = ['+I[a, 4]']
+        self.assert_equals_sorted(side_expected, side_sink.get_results())
+
+
+@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7")
+class EmbeddedWindowTests(WindowTests, PyFlinkStreamingTestCase):
+    def setUp(self) -> None:
+        super(EmbeddedWindowTests, self).setUp()
+        config = 
get_j_env_configuration(self.env._j_stream_execution_environment)
+        config.setString("python.execution-mode", "thread")
+
+
 class SecondColumnTimestampAssigner(TimestampAssigner):
 
     def extract_timestamp(self, value, record_timestamp) -> int:
diff --git 
a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py 
b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
index 46567798d20..69757e7f128 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py
@@ -15,13 +15,19 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+from pyflink.datastream.window import WindowOperationDescriptor
 from pyflink.fn_execution import pickle
+from pyflink.fn_execution.coders import TimeWindowCoder, CountWindowCoder
 from pyflink.fn_execution.datastream import operations
 from pyflink.fn_execution.datastream.embedded.process_function import (
     InternalProcessFunctionContext, InternalKeyedProcessFunctionContext,
-    InternalKeyedProcessFunctionOnTimerContext)
+    InternalKeyedProcessFunctionOnTimerContext, InternalWindowTimerContext)
 from pyflink.fn_execution.datastream.embedded.runtime_context import 
StreamingRuntimeContext
+from pyflink.fn_execution.datastream.embedded.timerservice_impl import 
InternalTimerServiceImpl
+from pyflink.fn_execution.datastream.window.window_operator import 
WindowOperator
+from pyflink.fn_execution.embedded.converters import (TimeWindowConverter, 
CountWindowConverter,
+                                                      GlobalWindowConverter)
+from pyflink.fn_execution.embedded.state_impl import KeyedStateBackend
 
 
 class OneInputOperation(operations.OneInputOperation):
@@ -73,8 +79,8 @@ class TwoInputOperation(operations.TwoInputOperation):
 
 
 def extract_process_function(
-        user_defined_function_proto, runtime_context, function_context, 
timer_context,
-        job_parameters):
+        user_defined_function_proto, j_runtime_context, j_function_context, 
j_timer_context,
+        job_parameters, j_keyed_state_backend):
     from pyflink.fn_execution import flink_fn_execution_pb2
 
     user_defined_func = pickle.loads(user_defined_function_proto.payload)
@@ -83,7 +89,7 @@ def extract_process_function(
 
     UserDefinedDataStreamFunction = 
flink_fn_execution_pb2.UserDefinedDataStreamFunction
 
-    runtime_context = StreamingRuntimeContext.of(runtime_context, 
job_parameters)
+    runtime_context = StreamingRuntimeContext.of(j_runtime_context, 
job_parameters)
 
     def open_func():
         if hasattr(user_defined_func, "open"):
@@ -94,7 +100,7 @@ def extract_process_function(
             user_defined_func.close()
 
     if func_type == UserDefinedDataStreamFunction.PROCESS:
-        function_context = InternalProcessFunctionContext(function_context)
+        function_context = InternalProcessFunctionContext(j_function_context)
 
         process_element = user_defined_func.process_element
 
@@ -106,10 +112,16 @@ def extract_process_function(
     elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS:
 
         function_context = InternalKeyedProcessFunctionContext(
-            function_context, user_defined_function_proto.key_type_info)
+            j_function_context, user_defined_function_proto.key_type_info)
 
         timer_context = InternalKeyedProcessFunctionOnTimerContext(
-            timer_context, user_defined_function_proto.key_type_info)
+            j_timer_context, user_defined_function_proto.key_type_info)
+
+        keyed_state_backend = KeyedStateBackend(
+            function_context,
+            j_keyed_state_backend)
+
+        runtime_context.set_keyed_state_backend(keyed_state_backend)
 
         on_timer = user_defined_func.on_timer
 
@@ -125,7 +137,7 @@ def extract_process_function(
         return OneInputOperation(open_func, close_func, process_element_func, 
on_timer_func)
 
     elif func_type == UserDefinedDataStreamFunction.CO_PROCESS:
-        function_context = InternalProcessFunctionContext(function_context)
+        function_context = InternalProcessFunctionContext(j_function_context)
 
         process_element1 = user_defined_func.process_element1
         process_element2 = user_defined_func.process_element2
@@ -142,10 +154,16 @@ def extract_process_function(
     elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS:
 
         function_context = InternalKeyedProcessFunctionContext(
-            function_context, user_defined_function_proto.key_type_info)
+            j_function_context, user_defined_function_proto.key_type_info)
 
         timer_context = InternalKeyedProcessFunctionOnTimerContext(
-            timer_context, user_defined_function_proto.key_type_info)
+            j_timer_context, user_defined_function_proto.key_type_info)
+
+        keyed_state_backend = KeyedStateBackend(
+            function_context,
+            j_keyed_state_backend)
+
+        runtime_context.set_keyed_state_backend(keyed_state_backend)
 
         on_timer = user_defined_func.on_timer
 
@@ -167,5 +185,81 @@ def extract_process_function(
         return TwoInputOperation(
             open_func, close_func, process_element_func1, 
process_element_func2, on_timer_func)
 
+    elif func_type == UserDefinedDataStreamFunction.WINDOW:
+
+        window_operation_descriptor = (
+            user_defined_func
+        )  # type: WindowOperationDescriptor
+
+        def user_key_selector(normal_data):
+            return normal_data
+
+        window_assigner = window_operation_descriptor.assigner
+        window_trigger = window_operation_descriptor.trigger
+        allowed_lateness = window_operation_descriptor.allowed_lateness
+        late_data_output_tag = window_operation_descriptor.late_data_output_tag
+        window_state_descriptor = 
window_operation_descriptor.window_state_descriptor
+        internal_window_function = 
window_operation_descriptor.internal_window_function
+        window_serializer = window_operation_descriptor.window_serializer
+        window_coder = window_serializer._get_coder()
+
+        if isinstance(window_coder, TimeWindowCoder):
+            window_converter = TimeWindowConverter()
+        elif isinstance(window_coder, CountWindowCoder):
+            window_converter = CountWindowConverter()
+        else:
+            window_converter = GlobalWindowConverter()
+
+        internal_timer_service = InternalTimerServiceImpl(
+            j_timer_context.timerService(), window_converter)
+
+        function_context = InternalKeyedProcessFunctionContext(
+            j_function_context, user_defined_function_proto.key_type_info)
+
+        window_timer_context = InternalWindowTimerContext(
+            j_timer_context,
+            user_defined_function_proto.key_type_info,
+            window_converter)
+
+        keyed_state_backend = KeyedStateBackend(
+            function_context,
+            j_keyed_state_backend,
+            j_function_context.getWindowSerializer(),
+            window_converter)
+
+        runtime_context.set_keyed_state_backend(keyed_state_backend)
+
+        window_operator = WindowOperator(
+            window_assigner,
+            keyed_state_backend,
+            user_key_selector,
+            window_state_descriptor,
+            internal_window_function,
+            window_trigger,
+            allowed_lateness,
+            late_data_output_tag)
+
+        def open_func():
+            window_operator.open(runtime_context, internal_timer_service)
+
+        def close_func():
+            window_operator.close()
+
+        def process_element_func(value):
+            yield from window_operator.process_element(value[1], 
function_context.timestamp())
+
+        if window_assigner.is_event_time():
+            def on_timer_func(timestamp):
+                window = window_timer_context.window()
+                key = window_timer_context.get_current_key()
+                yield from window_operator.on_event_time(timestamp, key, 
window)
+        else:
+            def on_timer_func(timestamp):
+                window = window_timer_context.window()
+                key = window_timer_context.get_current_key()
+                yield from window_operator.on_processing_time(timestamp, key, 
window)
+
+        return OneInputOperation(open_func, close_func, process_element_func, 
on_timer_func)
+
     else:
         raise Exception("Unknown function type {0}.".format(func_type))
diff --git 
a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py 
b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
index 9ffca6aeedd..9cccce3ae94 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py
@@ -90,3 +90,19 @@ class 
InternalKeyedProcessFunctionOnTimerContext(KeyedProcessFunction.OnTimerCon
 
     def get_current_key(self):
         return self._key_converter.to_internal(self._context.getCurrentKey())
+
+
+class InternalWindowTimerContext(object):
+    def __init__(self, context, key_type_info, window_converter):
+        self._context = context
+        self._key_converter = from_type_info(key_type_info)
+        self._window_converter = window_converter
+
+    def timestamp(self) -> int:
+        return self._context.timestamp()
+
+    def window(self):
+        return self._window_converter.to_internal(self._context.getWindow())
+
+    def get_current_key(self):
+        return self._key_converter.to_internal(self._context.getCurrentKey())
diff --git 
a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py 
b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
index c7d9508446e..2d744c06a5f 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py
@@ -20,17 +20,14 @@ from pyflink.datastream.state import 
(AggregatingStateDescriptor, AggregatingSta
                                       ReducingStateDescriptor, ReducingState, 
MapStateDescriptor,
                                       MapState, ListStateDescriptor, ListState,
                                       ValueStateDescriptor, ValueState)
-from pyflink.fn_execution.datastream.embedded.state_impl import 
(ValueStateImpl, ListStateImpl,
-                                                                 MapStateImpl, 
ReducingStateImpl,
-                                                                 
AggregatingStateImpl)
-from pyflink.fn_execution.embedded.converters import from_type_info
-from pyflink.fn_execution.embedded.java_utils import to_java_state_descriptor
+from pyflink.fn_execution.embedded.state_impl import KeyedStateBackend
 
 
 class StreamingRuntimeContext(RuntimeContext):
     def __init__(self, runtime_context, job_parameters):
         self._runtime_context = runtime_context
         self._job_parameters = job_parameters
+        self._keyed_state_backend = None  # type: KeyedStateBackend
 
     def get_task_name(self) -> str:
         """
@@ -82,32 +79,26 @@ class StreamingRuntimeContext(RuntimeContext):
         return self._runtime_context.getMetricGroup()
 
     def get_state(self, state_descriptor: ValueStateDescriptor) -> ValueState:
-        return ValueStateImpl(
-            
self._runtime_context.getState(to_java_state_descriptor(state_descriptor)),
-            from_type_info(state_descriptor.type_info))
+        return self._keyed_state_backend.get_value_state(state_descriptor)
 
     def get_list_state(self, state_descriptor: ListStateDescriptor) -> 
ListState:
-        return ListStateImpl(
-            
self._runtime_context.getListState(to_java_state_descriptor(state_descriptor)),
-            from_type_info(state_descriptor.type_info))
+        return self._keyed_state_backend.get_list_state(state_descriptor)
 
     def get_map_state(self, state_descriptor: MapStateDescriptor) -> MapState:
-        return MapStateImpl(
-            
self._runtime_context.getMapState(to_java_state_descriptor(state_descriptor)),
-            from_type_info(state_descriptor.type_info))
+        return self._keyed_state_backend.get_map_state(state_descriptor)
 
     def get_reducing_state(self, state_descriptor: ReducingStateDescriptor) -> 
ReducingState:
-        return ReducingStateImpl(
-            
self._runtime_context.getState(to_java_state_descriptor(state_descriptor)),
-            from_type_info(state_descriptor.type_info),
-            state_descriptor.get_reduce_function())
+        return self._keyed_state_backend.get_reducing_state(state_descriptor)
 
     def get_aggregating_state(self,
                               state_descriptor: AggregatingStateDescriptor) -> 
AggregatingState:
-        return AggregatingStateImpl(
-            
self._runtime_context.getState(to_java_state_descriptor(state_descriptor)),
-            from_type_info(state_descriptor.type_info),
-            state_descriptor.get_agg_function())
+        return 
self._keyed_state_backend.get_aggregating_state(state_descriptor)
+
+    def set_keyed_state_backend(self, keyed_state_backend: KeyedStateBackend):
+        self._keyed_state_backend = keyed_state_backend
+
+    def get_keyed_state_backend(self):
+        return self._keyed_state_backend
 
     @staticmethod
     def of(runtime_context, job_parameters):
diff --git 
a/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py 
b/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py
index e88c6abc304..6973be038eb 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py
@@ -15,27 +15,42 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from typing import List, Iterable, Tuple, Dict
+from abc import ABC
+from typing import List, Iterable, Tuple, Dict, Collection
 
 from pyflink.datastream import ReduceFunction, AggregateFunction
-from pyflink.datastream.state import (ValueState, T, State, ListState, IN, 
OUT, ReducingState,
-                                      AggregatingState, MapState, V, K)
+from pyflink.datastream.state import (T, IN, OUT, V, K)
 from pyflink.fn_execution.embedded.converters import (DataConverter, 
DictDataConverter,
                                                       ListDataConverter)
+from pyflink.fn_execution.internal_state import (InternalValueState, 
InternalKvState,
+                                                 InternalListState, 
InternalReducingState,
+                                                 InternalAggregatingState, 
InternalMapState,
+                                                 N)
 
 
-class StateImpl(State):
-    def __init__(self, state, value_converter: DataConverter):
+class StateImpl(InternalKvState, ABC):
+    def __init__(self,
+                 state,
+                 value_converter: DataConverter,
+                 window_converter: DataConverter = None):
         self._state = state
         self._value_converter = value_converter
+        self._window_converter = window_converter
 
     def clear(self):
         self._state.clear()
 
+    def set_current_namespace(self, namespace) -> None:
+        j_window = self._window_converter.to_external(namespace)
+        self._state.setCurrentNamespace(j_window)
+
 
-class ValueStateImpl(StateImpl, ValueState):
-    def __init__(self, value_state, value_converter: DataConverter):
-        super(ValueStateImpl, self).__init__(value_state, value_converter)
+class ValueStateImpl(StateImpl, InternalValueState):
+    def __init__(self,
+                 value_state,
+                 value_converter: DataConverter,
+                 window_converter: DataConverter = None):
+        super(ValueStateImpl, self).__init__(value_state, value_converter, 
window_converter)
 
     def value(self) -> T:
         return self._value_converter.to_internal(self._state.value())
@@ -44,10 +59,13 @@ class ValueStateImpl(StateImpl, ValueState):
         self._state.update(self._value_converter.to_external(value))
 
 
-class ListStateImpl(StateImpl, ListState):
+class ListStateImpl(StateImpl, InternalListState):
 
-    def __init__(self, list_state, value_converter: ListDataConverter):
-        super(ListStateImpl, self).__init__(list_state, value_converter)
+    def __init__(self,
+                 list_state,
+                 value_converter: ListDataConverter,
+                 window_converter: DataConverter = None):
+        super(ListStateImpl, self).__init__(list_state, value_converter, 
window_converter)
         self._element_converter = value_converter._field_converter
 
     def update(self, values: List[T]) -> None:
@@ -57,19 +75,27 @@ class ListStateImpl(StateImpl, ListState):
         self._state.addAll(self._value_converter.to_external(values))
 
     def get(self) -> OUT:
-        return self._value_converter.to_internal(self._state.get())
+        states = self._value_converter.to_internal(self._state.get())
+        if states:
+            yield from states
 
     def add(self, value: IN) -> None:
         self._state.add(self._element_converter.to_external(value))
 
+    def merge_namespaces(self, target: N, sources: Collection[N]) -> None:
+        j_target = self._window_converter.to_external(target)
+        j_sources = [self._window_converter.to_external(window) for window in 
sources]
+        self._state.mergeNamespaces(j_target, j_sources)
+
 
-class ReducingStateImpl(StateImpl, ReducingState):
+class ReducingStateImpl(StateImpl, InternalReducingState):
 
     def __init__(self,
                  value_state,
                  value_converter: DataConverter,
-                 reduce_function: ReduceFunction):
-        super(ReducingStateImpl, self).__init__(value_state, value_converter)
+                 reduce_function: ReduceFunction,
+                 window_converter: DataConverter = None):
+        super(ReducingStateImpl, self).__init__(value_state, value_converter, 
window_converter)
         self._reduce_function = reduce_function
 
     def get(self) -> OUT:
@@ -88,13 +114,34 @@ class ReducingStateImpl(StateImpl, ReducingState):
 
             self._state.update(self._value_converter.to_external(reduce_value))
 
+    def merge_namespaces(self, target: N, sources: Collection[N]) -> None:
+        merged = None
+        for source in sources:
+            self.set_current_namespace(source)
+            source_state = self.get()
+
+            if source_state is None:
+                continue
 
-class AggregatingStateImpl(StateImpl, AggregatingState):
+            self.clear()
+
+            if merged is None:
+                merged = source_state
+            else:
+                merged = self._reduce_function.reduce(merged, source_state)
+
+        if merged is not None:
+            self.set_current_namespace(target)
+            self._state.update(self._value_converter.to_external(merged))
+
+
+class AggregatingStateImpl(StateImpl, InternalAggregatingState):
     def __init__(self,
                  value_state,
                  value_converter,
-                 agg_function: AggregateFunction):
-        super(AggregatingStateImpl, self).__init__(value_state, 
value_converter)
+                 agg_function: AggregateFunction,
+                 window_converter: DataConverter = None):
+        super(AggregatingStateImpl, self).__init__(value_state, 
value_converter, window_converter)
         self._agg_function = agg_function
 
     def get(self) -> OUT:
@@ -117,15 +164,38 @@ class AggregatingStateImpl(StateImpl, AggregatingState):
             accumulator = self._agg_function.add(value, accumulator)
             self._state.update(self._value_converter.to_external(accumulator))
 
+    def merge_namespaces(self, target: N, sources: Collection[N]) -> None:
+        merged = None
+        for source in sources:
+            self.set_current_namespace(source)
+            source_state = self.get()
+
+            if source_state is None:
+                continue
+
+            self.clear()
 
-class MapStateImpl(StateImpl, MapState):
-    def __init__(self, map_state, map_converter: DictDataConverter):
-        super(MapStateImpl, self).__init__(map_state, map_converter)
+            if merged is None:
+                merged = source_state
+            else:
+                merged = self._agg_function.merge(merged, source_state)
+
+        if merged is not None:
+            self.set_current_namespace(target)
+            self._state.update(self._value_converter.to_external(merged))
+
+
+class MapStateImpl(StateImpl, InternalMapState):
+    def __init__(self,
+                 map_state,
+                 map_converter: DictDataConverter,
+                 window_converter: DataConverter = None):
+        super(MapStateImpl, self).__init__(map_state, map_converter, 
window_converter)
         self._k_converter = map_converter._key_converter
         self._v_converter = map_converter._value_converter
 
     def get(self, key: K) -> V:
-        return self._value_converter.to_internal(
+        return self._v_converter.to_internal(
             self._state.get(self._k_converter.to_external(key)))
 
     def put(self, key: K, value: V) -> None:
@@ -142,17 +212,22 @@ class MapStateImpl(StateImpl, MapState):
 
     def items(self) -> Iterable[Tuple[K, V]]:
         entries = self._state.entries()
-        for entry in entries:
-            yield (self._k_converter.to_internal(entry.getKey()),
-                   self._v_converter.to_internal(entry.getValue()))
+        if entries:
+            for entry in entries:
+                yield (self._k_converter.to_internal(entry.getKey()),
+                       self._v_converter.to_internal(entry.getValue()))
 
     def keys(self) -> Iterable[K]:
-        for k in self._state.keys():
-            yield self._k_converter.to_internal(k)
+        keys = self._state.keys()
+        if keys:
+            for k in keys:
+                yield self._k_converter.to_internal(k)
 
     def values(self) -> Iterable[V]:
-        for v in self._state.values():
-            yield self._v_converter.to_internal(v)
+        values = self._state.values()
+        if values:
+            for v in values:
+                yield self._v_converter.to_internal(v)
 
     def is_empty(self) -> bool:
         return self._state.isEmpty()
diff --git 
a/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py 
b/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py
index fab9440b230..690abe10140 100644
--- a/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py
+++ b/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py
@@ -16,6 +16,7 @@
 # limitations under the License.
 
################################################################################
 from pyflink.datastream import TimerService
+from pyflink.fn_execution.datastream.timerservice import InternalTimerService, 
N
 
 
 class TimerServiceImpl(TimerService):
@@ -39,3 +40,31 @@ class TimerServiceImpl(TimerService):
 
     def delete_event_time_timer(self, timestamp: int):
         self._timer_service.deleteEventTimeTimer(timestamp)
+
+
+class InternalTimerServiceImpl(InternalTimerService[N]):
+    def __init__(self, timer_service, window_converter):
+        self._timer_service = timer_service
+        self._window_converter = window_converter
+
+    def current_processing_time(self):
+        return self._timer_service.currentProcessingTime()
+
+    def current_watermark(self):
+        return self._timer_service.currentWatermark()
+
+    def register_processing_time_timer(self, namespace: N, timestamp: int):
+        window = self._window_converter.to_external(namespace)
+        self._timer_service.registerProcessingTimeTimer(window, timestamp)
+
+    def register_event_time_timer(self, namespace: N, timestamp: int):
+        window = self._window_converter.to_external(namespace)
+        self._timer_service.registerEventTimeTimer(window, timestamp)
+
+    def delete_event_time_timer(self, namespace: N, timestamp: int):
+        window = self._window_converter.to_external(namespace)
+        self._timer_service.deleteEventTimeTimer(window, timestamp)
+
+    def delete_processing_time_timer(self, namespace: N, timestamp: int):
+        window = self._window_converter.to_external(namespace)
+        self._timer_service.deleteProcessingTimeTimer(window, timestamp)
diff --git 
a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py 
b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
index a5ceb7b6ab5..a2a8c0a8d06 100644
--- a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
+++ b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py
@@ -19,6 +19,7 @@ import typing
 from typing import TypeVar, Iterable, Collection, Optional
 
 from pyflink.common.constants import MAX_LONG_VALUE
+from pyflink.common.typeinfo import PickledBytesTypeInfo
 from pyflink.datastream import WindowAssigner, Trigger, MergingWindowAssigner, 
TriggerResult
 from pyflink.datastream.functions import KeyedStateStore, RuntimeContext, 
InternalWindowFunction
 from pyflink.datastream.output_tag import OutputTag
@@ -322,9 +323,16 @@ class WindowOperator(object):
             if isinstance(self.window_state, InternalMergingState):
                 self.window_merging_state = self.window_state
 
-            window_coder = self.keyed_state_backend.namespace_coder
-            self.merging_sets_state = self.keyed_state_backend.get_map_state(
-                "merging-window-set", window_coder, window_coder)
+            if hasattr(self.keyed_state_backend, 'namespace_coder'):
+                window_coder = self.keyed_state_backend.namespace_coder
+                self.merging_sets_state = 
self.keyed_state_backend.get_map_state(
+                    "merging-window-set", window_coder, window_coder)
+            else:
+                state_descriptor = MapStateDescriptor(
+                    "merging-window-set",
+                    PickledBytesTypeInfo(),
+                    PickledBytesTypeInfo())
+                self.merging_sets_state = 
self.keyed_state_backend.get_map_state(state_descriptor)
 
         self.merge_function = WindowMergeFunction(self)
 
diff --git a/flink-python/pyflink/fn_execution/embedded/converters.py 
b/flink-python/pyflink/fn_execution/embedded/converters.py
index ee75e6d5beb..d2ba951f884 100644
--- a/flink-python/pyflink/fn_execution/embedded/converters.py
+++ b/flink-python/pyflink/fn_execution/embedded/converters.py
@@ -15,19 +15,26 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from abc import ABC, abstractmethod
-
 import pickle
+from abc import ABC, abstractmethod
 from typing import TypeVar, List, Tuple
 
+from pemja import findClass
+
 from pyflink.common import Row, RowKind, TypeInformation
 from pyflink.common.typeinfo import (PickledBytesTypeInfo, 
PrimitiveArrayTypeInfo,
                                      BasicArrayTypeInfo, ObjectArrayTypeInfo, 
RowTypeInfo,
                                      TupleTypeInfo, MapTypeInfo, ListTypeInfo)
+from pyflink.datastream import TimeWindow, CountWindow, GlobalWindow
 
 IN = TypeVar('IN')
 OUT = TypeVar('OUT')
 
+# Java Window
+JTimeWindow = 
findClass('org.apache.flink.table.runtime.operators.window.TimeWindow')
+JCountWindow = 
findClass('org.apache.flink.table.runtime.operators.window.CountWindow')
+JGlobalWindow = 
findClass('org.apache.flink.streaming.api.windowing.windows.GlobalWindow')
+
 
 class DataConverter(ABC):
 
@@ -180,6 +187,32 @@ class DictDataConverter(DataConverter):
                 for k, v in value.items()}
 
 
+class TimeWindowConverter(DataConverter):
+    def to_internal(self, value) -> TimeWindow:
+        return TimeWindow(value.getStart(), value.getEnd())
+
+    def to_external(self, value: TimeWindow) -> OUT:
+        return JTimeWindow(value.start, value.end)
+
+
+class CountWindowConverter(DataConverter):
+
+    def to_internal(self, value) -> CountWindow:
+        return CountWindow(value.getId())
+
+    def to_external(self, value: CountWindow) -> OUT:
+        return JCountWindow(value.id)
+
+
+class GlobalWindowConverter(DataConverter):
+
+    def to_internal(self, value) -> IN:
+        return GlobalWindow()
+
+    def to_external(self, value) -> OUT:
+        return JGlobalWindow.get()
+
+
 def from_type_info_proto(type_info):
     # for data stream type information.
     from pyflink.fn_execution import flink_fn_execution_pb2
diff --git a/flink-python/pyflink/fn_execution/embedded/operation_utils.py 
b/flink-python/pyflink/fn_execution/embedded/operation_utils.py
index 673734dbf7e..0c0580671ec 100644
--- a/flink-python/pyflink/fn_execution/embedded/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/embedded/operation_utils.py
@@ -103,7 +103,7 @@ def create_table_operation_from_proto(proto, 
input_coder_info, output_coder_into
 
 def create_one_input_user_defined_data_stream_function_from_protos(
         function_infos, input_coder_info, output_coder_info, runtime_context,
-        function_context, timer_context, job_parameters):
+        function_context, timer_context, job_parameters, keyed_state_backend):
     serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
                       for proto in function_infos]
     input_data_converter = (
@@ -118,14 +118,15 @@ def 
create_one_input_user_defined_data_stream_function_from_protos(
         runtime_context,
         function_context,
         timer_context,
-        job_parameters)
+        job_parameters,
+        keyed_state_backend)
 
     return function_operation
 
 
 def create_two_input_user_defined_data_stream_function_from_protos(
         function_infos, input_coder_info1, input_coder_info2, 
output_coder_info, runtime_context,
-        function_context, timer_context, job_parameters):
+        function_context, timer_context, job_parameters, keyed_state_backend):
     serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
                       for proto in function_infos]
 
@@ -146,6 +147,7 @@ def 
create_two_input_user_defined_data_stream_function_from_protos(
         runtime_context,
         function_context,
         timer_context,
-        job_parameters)
+        job_parameters,
+        keyed_state_backend)
 
     return function_operation
diff --git a/flink-python/pyflink/fn_execution/embedded/operations.py 
b/flink-python/pyflink/fn_execution/embedded/operations.py
index 940a98c4677..88eee565224 100644
--- a/flink-python/pyflink/fn_execution/embedded/operations.py
+++ b/flink-python/pyflink/fn_execution/embedded/operations.py
@@ -66,14 +66,16 @@ class OneInputFunctionOperation(FunctionOperation):
                  runtime_context,
                  function_context,
                  timer_context,
-                 job_parameters):
+                 job_parameters,
+                 keyed_state_backend):
         operations = (
             [extract_process_function(
                 serialized_fn,
                 runtime_context,
                 function_context,
                 timer_context,
-                job_parameters)
+                job_parameters,
+                keyed_state_backend)
                 for serialized_fn in serialized_fns])
         super(OneInputFunctionOperation, self).__init__(operations, 
output_data_converter)
         self._input_data_converter = input_data_converter
@@ -96,14 +98,16 @@ class TwoInputFunctionOperation(FunctionOperation):
                  runtime_context,
                  function_context,
                  timer_context,
-                 job_parameters):
+                 job_parameters,
+                 keyed_state_backend):
         operations = (
             [extract_process_function(
                 serialized_fn,
                 runtime_context,
                 function_context,
                 timer_context,
-                job_parameters)
+                job_parameters,
+                keyed_state_backend)
                 for serialized_fn in serialized_fns])
         super(TwoInputFunctionOperation, self).__init__(operations, 
output_data_converter)
         self._input_data_converter1 = input_data_converter1
diff --git a/flink-python/pyflink/fn_execution/embedded/state_impl.py 
b/flink-python/pyflink/fn_execution/embedded/state_impl.py
new file mode 100644
index 00000000000..21600df3211
--- /dev/null
+++ b/flink-python/pyflink/fn_execution/embedded/state_impl.py
@@ -0,0 +1,87 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pemja import findClass
+
+from pyflink.datastream.state import (ValueStateDescriptor, 
ListStateDescriptor, MapStateDescriptor,
+                                      StateDescriptor, ReducingStateDescriptor,
+                                      AggregatingStateDescriptor)
+from pyflink.fn_execution.datastream.embedded.state_impl import 
(ValueStateImpl, ListStateImpl,
+                                                                 MapStateImpl, 
ReducingStateImpl,
+                                                                 
AggregatingStateImpl)
+from pyflink.fn_execution.embedded.converters import from_type_info
+from pyflink.fn_execution.embedded.java_utils import to_java_state_descriptor
+
+JVoidNamespace = findClass('org.apache.flink.runtime.state.VoidNamespace')
+JVoidNamespaceSerializer = 
findClass('org.apache.flink.runtime.state.VoidNamespaceSerializer')
+
+JVoidNamespace_INSTANCE = JVoidNamespace.INSTANCE
+JVoidNamespaceSerializer_INSTANCE = JVoidNamespaceSerializer.INSTANCE
+
+
+class KeyedStateBackend(object):
+    def __init__(self,
+                 function_context,
+                 keyed_state_backend,
+                 window_serializer=JVoidNamespaceSerializer_INSTANCE,
+                 window_converter=None):
+        self._function_context = function_context
+        self._keyed_state_backend = keyed_state_backend
+        self._window_serializer = window_serializer
+        self._window_converter = window_converter
+
+    def get_current_key(self):
+        return self._function_context.get_current_key()
+
+    def get_value_state(self, state_descriptor: ValueStateDescriptor) -> 
ValueStateImpl:
+        return ValueStateImpl(
+            self._get_or_create_keyed_state(state_descriptor),
+            from_type_info(state_descriptor.type_info),
+            self._window_converter)
+
+    def get_list_state(self, state_descriptor: ListStateDescriptor) -> 
ListStateImpl:
+        return ListStateImpl(
+            self._get_or_create_keyed_state(state_descriptor),
+            from_type_info(state_descriptor.type_info),
+            self._window_converter)
+
+    def get_map_state(self, state_descriptor: MapStateDescriptor) -> 
MapStateImpl:
+        return MapStateImpl(
+            self._get_or_create_keyed_state(state_descriptor),
+            from_type_info(state_descriptor.type_info),
+            self._window_converter)
+
+    def get_reducing_state(self, state_descriptor: ReducingStateDescriptor):
+        return ReducingStateImpl(
+            self._get_or_create_keyed_state(state_descriptor),
+            from_type_info(state_descriptor.type_info),
+            state_descriptor.get_reduce_function(),
+            self._window_converter)
+
+    def get_aggregating_state(self, state_descriptor: 
AggregatingStateDescriptor):
+        return AggregatingStateImpl(
+            self._get_or_create_keyed_state(state_descriptor),
+            from_type_info(state_descriptor.type_info),
+            state_descriptor.get_agg_function(),
+            self._window_converter)
+
+    def _get_or_create_keyed_state(self, state_descriptor: StateDescriptor):
+        return self._keyed_state_backend.getPartitionedState(
+            JVoidNamespace_INSTANCE,
+            self._window_serializer,
+            to_java_state_descriptor(state_descriptor))
diff --git a/flink-python/setup.py b/flink-python/setup.py
index 1fe2e3ac33a..82e5e9b4488 100644
--- a/flink-python/setup.py
+++ b/flink-python/setup.py
@@ -309,7 +309,7 @@ try:
                         'cloudpickle==2.1.0', 
'avro-python3>=1.8.1,!=1.9.2,<1.10.0',
                         'pytz>=2018.3', 'fastavro>=1.1.0,<1.4.8', 
'requests>=2.26.0',
                         'protobuf<3.18',
-                        'pemja==0.2.3;'
+                        'pemja==0.2.4;'
                         'python_full_version >= "3.7" and platform_system != 
"Windows"',
                         'httplib2>=0.19.0,<=0.20.4', 
apache_flink_libraries_dependency]
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
 
b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
index ce682b26de4..a4e99e9a39b 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonCo
 import 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonKeyedCoProcessOperator;
 import 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonKeyedProcessOperator;
 import 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator;
+import 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonWindowOperator;
 import 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator;
 import 
org.apache.flink.streaming.api.operators.python.process.ExternalPythonCoProcessOperator;
 import 
org.apache.flink.streaming.api.operators.python.process.ExternalPythonKeyedCoProcessOperator;
@@ -429,7 +430,8 @@ public class PythonOperatorChainingOptimizer {
                         && (upOperator instanceof 
EmbeddedPythonKeyedProcessOperator
                                 || upOperator instanceof 
EmbeddedPythonKeyedCoProcessOperator
                                 || upOperator instanceof 
EmbeddedPythonProcessOperator
-                                || upOperator instanceof 
EmbeddedPythonCoProcessOperator));
+                                || upOperator instanceof 
EmbeddedPythonCoProcessOperator
+                                || upOperator instanceof 
EmbeddedPythonWindowOperator));
     }
 
     private static boolean arePythonOperatorsInSameExecutionEnvironment(
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
index 701b5cf84b9..296f0e5a697 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
@@ -115,6 +115,7 @@ public abstract class 
AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
         interpreter.set("function_context", getFunctionContext());
         interpreter.set("timer_context", getTimerContext());
         interpreter.set("job_parameters", getJobParameters());
+        interpreter.set("keyed_state_backend", getKeyedStateBackend());
 
         interpreter.exec(
                 "from pyflink.fn_execution.embedded.operation_utils import 
create_one_input_user_defined_data_stream_function_from_protos");
@@ -127,7 +128,8 @@ public abstract class 
AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
                         + "runtime_context,"
                         + "function_context,"
                         + "timer_context,"
-                        + "job_parameters)");
+                        + "job_parameters,"
+                        + "keyed_state_backend)");
 
         interpreter.invokeMethod("operation", "open");
     }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
index 8915766f45d..232ccda4575 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
@@ -135,6 +135,7 @@ public abstract class 
AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
         interpreter.set("runtime_context", getRuntimeContext());
         interpreter.set("function_context", getFunctionContext());
         interpreter.set("timer_context", getTimerContext());
+        interpreter.set("keyed_state_backend", getKeyedStateBackend());
         interpreter.set("job_parameters", getJobParameters());
 
         interpreter.exec(
@@ -149,7 +150,8 @@ public abstract class 
AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
                         + "runtime_context,"
                         + "function_context,"
                         + "timer_context,"
-                        + "job_parameters)");
+                        + "job_parameters,"
+                        + "keyed_state_backend)");
 
         interpreter.invokeMethod("operation", "open");
     }
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
new file mode 100644
index 00000000000..94c68045205
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python.embedded;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.fnexecution.v1.FlinkFnApi;
+import org.apache.flink.python.util.ProtoUtils;
+import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import 
org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
+import org.apache.flink.streaming.api.utils.PythonTypeUtils;
+import org.apache.flink.table.runtime.operators.window.Window;
+import org.apache.flink.types.Row;
+
+import pemja.core.object.PyIterator;
+
+import java.util.List;
+
+import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE;
+import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
+import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
+import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE;
+import static 
org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link EmbeddedPythonWindowOperator} is responsible for executing user 
defined python
+ * ProcessWindowFunction in embedded Python environment.
+ */
+@Internal
+public class EmbeddedPythonWindowOperator<K, IN, OUT, W extends Window>
+        extends AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
+        implements Triggerable<K, W> {
+
+    private static final long serialVersionUID = 1L;
+
+    /** For serializing the window in checkpoints. */
+    private final TypeSerializer<W> windowSerializer;
+
+    /** The TypeInformation of the key. */
+    private transient TypeInformation<K> keyTypeInfo;
+
+    private transient PythonTypeUtils.DataConverter<K, Object> keyConverter;
+
+    private transient WindowContextImpl windowContext;
+
+    private transient WindowTimerContextImpl windowTimerContext;
+
+    public EmbeddedPythonWindowOperator(
+            Configuration config,
+            DataStreamPythonFunctionInfo pythonFunctionInfo,
+            TypeInformation<IN> inputTypeInfo,
+            TypeInformation<OUT> outputTypeInfo,
+            TypeSerializer<W> windowSerializer) {
+        super(config, pythonFunctionInfo, inputTypeInfo, outputTypeInfo);
+        this.windowSerializer = checkNotNull(windowSerializer);
+    }
+
+    @Override
+    public void open() throws Exception {
+        keyTypeInfo = ((RowTypeInfo) this.getInputTypeInfo()).getTypeAt(0);
+
+        keyConverter = 
PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(keyTypeInfo);
+
+        InternalTimerService<W> internalTimerService =
+                getInternalTimerService("window-timers", windowSerializer, 
this);
+
+        windowContext = new WindowContextImpl(internalTimerService);
+
+        windowTimerContext = new WindowTimerContextImpl(internalTimerService);
+
+        super.open();
+    }
+
+    @Override
+    public List<FlinkFnApi.UserDefinedDataStreamFunction> 
createUserDefinedFunctionsProto() {
+        return ProtoUtils.createUserDefinedDataStreamStatefulFunctionProtos(
+                getPythonFunctionInfo(),
+                getRuntimeContext(),
+                getJobParameters(),
+                keyTypeInfo,
+                inBatchExecutionMode(getKeyedStateBackend()),
+                config.get(PYTHON_METRIC_ENABLED),
+                config.get(PYTHON_PROFILE_ENABLED),
+                false,
+                config.get(STATE_CACHE_SIZE),
+                config.get(MAP_STATE_READ_CACHE_SIZE),
+                config.get(MAP_STATE_WRITE_CACHE_SIZE));
+    }
+
+    @Override
+    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+        collector.setAbsoluteTimestamp(timer.getTimestamp());
+        invokeUserFunction(timer);
+    }
+
+    @Override
+    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+        collector.eraseTimestamp();
+        invokeUserFunction(timer);
+    }
+
+    @Override
+    public Object getFunctionContext() {
+        return windowContext;
+    }
+
+    @Override
+    public Object getTimerContext() {
+        return windowTimerContext;
+    }
+
+    @Override
+    public <T> DataStreamPythonFunctionOperator<T> copy(
+            DataStreamPythonFunctionInfo pythonFunctionInfo, 
TypeInformation<T> outputTypeInfo) {
+        return null;
+    }
+
+    private void invokeUserFunction(InternalTimer<K, W> timer) throws 
Exception {
+        windowTimerContext.timer = timer;
+        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp());
+
+        PyIterator results =
+                (PyIterator)
+                        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp());
+
+        while (results.hasNext()) {
+            OUT result = outputDataConverter.toInternal(results.next());
+            collector.collect(result);
+        }
+        results.close();
+
+        windowTimerContext.timer = null;
+    }
+
+    private class WindowContextImpl {
+        private final InternalTimerService<W> timerService;
+
+        WindowContextImpl(InternalTimerService<W> timerService) {
+            this.timerService = timerService;
+        }
+
+        public TypeSerializer<W> getWindowSerializer() {
+            return windowSerializer;
+        }
+
+        public long timestamp() {
+            return timestamp;
+        }
+
+        public InternalTimerService<W> timerService() {
+            return timerService;
+        }
+
+        @SuppressWarnings("unchecked")
+        public Object getCurrentKey() {
+            return keyConverter.toExternal(
+                    (K) ((Row) 
EmbeddedPythonWindowOperator.this.getCurrentKey()).getField(0));
+        }
+    }
+
+    private class WindowTimerContextImpl {
+        private final InternalTimerService<W> timerService;
+
+        private InternalTimer<K, W> timer;
+
+        WindowTimerContextImpl(InternalTimerService<W> timerService) {
+            this.timerService = timerService;
+        }
+
+        public InternalTimerService<W> timerService() {
+            return timerService;
+        }
+
+        public long timestamp() {
+            return timer.getTimestamp();
+        }
+
+        public W getWindow() {
+            return timer.getNamespace();
+        }
+
+        @SuppressWarnings("unchecked")
+        public Object getCurrentKey() {
+            return keyConverter.toExternal((K) ((Row) 
timer.getKey()).getField(0));
+        }
+    }
+}
diff --git a/flink-python/src/main/resources/META-INF/NOTICE 
b/flink-python/src/main/resources/META-INF/NOTICE
index a217a9497e4..61382eb13cc 100644
--- a/flink-python/src/main/resources/META-INF/NOTICE
+++ b/flink-python/src/main/resources/META-INF/NOTICE
@@ -28,7 +28,7 @@ This project bundles the following dependencies under the 
Apache Software Licens
 - org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.38.0
 - org.apache.beam:beam-vendor-guava-26_0-jre:0.1
 - org.apache.beam:beam-vendor-grpc-1_43_2:0.1
-- com.alibaba:pemja:0.2.3
+- com.alibaba:pemja:0.2.4
 
 This project bundles the following dependencies under the BSD license.
 See bundled license files for details

Reply via email to