This is an automated email from the ASF dual-hosted git repository. hxb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d137c12877f [FLINK-28688][python] Support DataStream PythonWindowOperator in Thread Mode d137c12877f is described below commit d137c12877fa2b21688acd3a89bb947975b0553a Author: huangxingbo <h...@apache.org> AuthorDate: Wed Aug 3 17:22:58 2022 +0800 [FLINK-28688][python] Support DataStream PythonWindowOperator in Thread Mode This closes #20435. --- flink-python/dev/dev-requirements.txt | 2 +- flink-python/pom.xml | 2 +- flink-python/pyflink/datastream/data_stream.py | 7 +- .../pyflink/datastream/tests/test_window.py | 90 +++++---- .../fn_execution/datastream/embedded/operations.py | 116 +++++++++-- .../datastream/embedded/process_function.py | 16 ++ .../datastream/embedded/runtime_context.py | 35 ++-- .../fn_execution/datastream/embedded/state_impl.py | 133 ++++++++++--- .../datastream/embedded/timerservice_impl.py | 29 +++ .../datastream/window/window_operator.py | 14 +- .../pyflink/fn_execution/embedded/converters.py | 37 +++- .../fn_execution/embedded/operation_utils.py | 10 +- .../pyflink/fn_execution/embedded/operations.py | 12 +- .../pyflink/fn_execution/embedded/state_impl.py | 87 +++++++++ flink-python/setup.py | 2 +- .../chain/PythonOperatorChainingOptimizer.java | 4 +- ...ractOneInputEmbeddedPythonFunctionOperator.java | 4 +- ...ractTwoInputEmbeddedPythonFunctionOperator.java | 4 +- .../embedded/EmbeddedPythonWindowOperator.java | 211 +++++++++++++++++++++ flink-python/src/main/resources/META-INF/NOTICE | 2 +- 20 files changed, 698 insertions(+), 119 deletions(-) diff --git a/flink-python/dev/dev-requirements.txt b/flink-python/dev/dev-requirements.txt index 61f21b556dc..fcc38508101 100755 --- a/flink-python/dev/dev-requirements.txt +++ b/flink-python/dev/dev-requirements.txt @@ -31,6 +31,6 @@ numpy>=1.14.3,<1.20; python_version < '3.7' fastavro>=1.1.0,<1.4.8 grpcio>=1.29.0,<1.47 grpcio-tools>=1.3.5,<=1.14.2 -pemja==0.2.3; python_version >= '3.7' and platform_system != 'Windows' +pemja==0.2.4; python_version >= '3.7' and platform_system != 'Windows' httplib2>=0.19.0,<=0.20.4 protobuf<3.18 \ No newline at end of file diff --git a/flink-python/pom.xml b/flink-python/pom.xml index bc376f844a6..eba9a397479 100644 --- a/flink-python/pom.xml +++ b/flink-python/pom.xml @@ -116,7 +116,7 @@ under the License. <dependency> <groupId>com.alibaba</groupId> <artifactId>pemja</artifactId> - <version>0.2.3</version> + <version>0.2.4</version> </dependency> <!-- Protobuf dependencies --> diff --git a/flink-python/pyflink/datastream/data_stream.py b/flink-python/pyflink/datastream/data_stream.py index 845cd156afc..4e8b5cbb229 100644 --- a/flink-python/pyflink/datastream/data_stream.py +++ b/flink-python/pyflink/datastream/data_stream.py @@ -2738,7 +2738,12 @@ def _get_one_input_stream_operator(data_stream: DataStream, else: j_namespace_serializer = \ gateway.jvm.org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer() - j_python_function_operator = gateway.jvm.ExternalPythonKeyedProcessOperator( + if python_execution_mode == 'thread': + JDataStreamPythonWindowFunctionOperator = gateway.jvm.EmbeddedPythonWindowOperator + else: + JDataStreamPythonWindowFunctionOperator = gateway.jvm.ExternalPythonKeyedProcessOperator + + j_python_function_operator = JDataStreamPythonWindowFunctionOperator( j_conf, j_data_stream_python_function_info, j_input_types, diff --git a/flink-python/pyflink/datastream/tests/test_window.py b/flink-python/pyflink/datastream/tests/test_window.py index b6433f409ce..886e8456949 100644 --- a/flink-python/pyflink/datastream/tests/test_window.py +++ b/flink-python/pyflink/datastream/tests/test_window.py @@ -15,8 +15,11 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import sys from typing import Iterable, Tuple, Dict +import pytest + from pyflink.common import Configuration from pyflink.common.time import Time from pyflink.common.typeinfo import Types @@ -36,7 +39,7 @@ from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase from pyflink.util.java_utils import get_j_env_configuration -class WindowTests(PyFlinkStreamingTestCase): +class WindowTests(object): def setUp(self) -> None: super(WindowTests, self).setUp() @@ -371,41 +374,6 @@ class WindowTests(PyFlinkStreamingTestCase): expected = ['(hi,1,7,4)', '(hi,8,12,2)', '(hi,15,18,1)'] self.assert_equals_sorted(expected, results) - def test_side_output_late_data(self): - self.env.set_parallelism(1) - config = Configuration( - j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment) - ) - config.set_integer('python.fn-execution.bundle.size', 1) - jvm = get_gateway().jvm - watermark_strategy = WatermarkStrategy( - jvm.org.apache.flink.api.common.eventtime.WatermarkStrategy.forGenerator( - jvm.org.apache.flink.streaming.api.functions.python.eventtime. - PerElementWatermarkGenerator.getSupplier() - ) - ).with_timestamp_assigner(SecondColumnTimestampAssigner()) - - tag = OutputTag('late-data', type_info=Types.ROW([Types.STRING(), Types.INT()])) - ds1 = self.env.from_collection([('a', 0), ('a', 8), ('a', 4), ('a', 6)], - type_info=Types.ROW([Types.STRING(), Types.INT()])) - ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \ - .key_by(lambda e: e[0]) \ - .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \ - .allowed_lateness(0) \ - .side_output_late_data(tag) \ - .process(CountWindowProcessFunction(), - Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) - main_sink = DataStreamTestSinkFunction() - ds2.add_sink(main_sink) - side_sink = DataStreamTestSinkFunction() - ds2.get_side_output(tag).add_sink(side_sink) - - self.env.execute('test_side_output_late_data') - main_expected = ['(a,0,5,1)', '(a,5,10,2)'] - self.assert_equals_sorted(main_expected, main_sink.get_results()) - side_expected = ['+I[a, 4]'] - self.assert_equals_sorted(side_expected, side_sink.get_results()) - def test_global_window_with_purging_trigger(self): self.env.set_parallelism(1) data_stream = self.env.from_collection([ @@ -590,6 +558,56 @@ class WindowTests(PyFlinkStreamingTestCase): self.assert_equals_sorted(expected, results) +class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase): + def setUp(self) -> None: + super(ProcessWindowTests, self).setUp() + config = get_j_env_configuration(self.env._j_stream_execution_environment) + config.setString("python.execution-mode", "process") + + def test_side_output_late_data(self): + self.env.set_parallelism(1) + config = Configuration( + j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment) + ) + config.set_integer('python.fn-execution.bundle.size', 1) + jvm = get_gateway().jvm + watermark_strategy = WatermarkStrategy( + jvm.org.apache.flink.api.common.eventtime.WatermarkStrategy.forGenerator( + jvm.org.apache.flink.streaming.api.functions.python.eventtime. + PerElementWatermarkGenerator.getSupplier() + ) + ).with_timestamp_assigner(SecondColumnTimestampAssigner()) + + tag = OutputTag('late-data', type_info=Types.ROW([Types.STRING(), Types.INT()])) + ds1 = self.env.from_collection([('a', 0), ('a', 8), ('a', 4), ('a', 6)], + type_info=Types.ROW([Types.STRING(), Types.INT()])) + ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \ + .key_by(lambda e: e[0]) \ + .window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \ + .allowed_lateness(0) \ + .side_output_late_data(tag) \ + .process(CountWindowProcessFunction(), + Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) + main_sink = DataStreamTestSinkFunction() + ds2.add_sink(main_sink) + side_sink = DataStreamTestSinkFunction() + ds2.get_side_output(tag).add_sink(side_sink) + + self.env.execute('test_side_output_late_data') + main_expected = ['(a,0,5,1)', '(a,5,10,2)'] + self.assert_equals_sorted(main_expected, main_sink.get_results()) + side_expected = ['+I[a, 4]'] + self.assert_equals_sorted(side_expected, side_sink.get_results()) + + +@pytest.mark.skipif(sys.version_info < (3, 7), reason="requires python3.7") +class EmbeddedWindowTests(WindowTests, PyFlinkStreamingTestCase): + def setUp(self) -> None: + super(EmbeddedWindowTests, self).setUp() + config = get_j_env_configuration(self.env._j_stream_execution_environment) + config.setString("python.execution-mode", "thread") + + class SecondColumnTimestampAssigner(TimestampAssigner): def extract_timestamp(self, value, record_timestamp) -> int: diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py index 46567798d20..69757e7f128 100644 --- a/flink-python/pyflink/fn_execution/datastream/embedded/operations.py +++ b/flink-python/pyflink/fn_execution/datastream/embedded/operations.py @@ -15,13 +15,19 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - +from pyflink.datastream.window import WindowOperationDescriptor from pyflink.fn_execution import pickle +from pyflink.fn_execution.coders import TimeWindowCoder, CountWindowCoder from pyflink.fn_execution.datastream import operations from pyflink.fn_execution.datastream.embedded.process_function import ( InternalProcessFunctionContext, InternalKeyedProcessFunctionContext, - InternalKeyedProcessFunctionOnTimerContext) + InternalKeyedProcessFunctionOnTimerContext, InternalWindowTimerContext) from pyflink.fn_execution.datastream.embedded.runtime_context import StreamingRuntimeContext +from pyflink.fn_execution.datastream.embedded.timerservice_impl import InternalTimerServiceImpl +from pyflink.fn_execution.datastream.window.window_operator import WindowOperator +from pyflink.fn_execution.embedded.converters import (TimeWindowConverter, CountWindowConverter, + GlobalWindowConverter) +from pyflink.fn_execution.embedded.state_impl import KeyedStateBackend class OneInputOperation(operations.OneInputOperation): @@ -73,8 +79,8 @@ class TwoInputOperation(operations.TwoInputOperation): def extract_process_function( - user_defined_function_proto, runtime_context, function_context, timer_context, - job_parameters): + user_defined_function_proto, j_runtime_context, j_function_context, j_timer_context, + job_parameters, j_keyed_state_backend): from pyflink.fn_execution import flink_fn_execution_pb2 user_defined_func = pickle.loads(user_defined_function_proto.payload) @@ -83,7 +89,7 @@ def extract_process_function( UserDefinedDataStreamFunction = flink_fn_execution_pb2.UserDefinedDataStreamFunction - runtime_context = StreamingRuntimeContext.of(runtime_context, job_parameters) + runtime_context = StreamingRuntimeContext.of(j_runtime_context, job_parameters) def open_func(): if hasattr(user_defined_func, "open"): @@ -94,7 +100,7 @@ def extract_process_function( user_defined_func.close() if func_type == UserDefinedDataStreamFunction.PROCESS: - function_context = InternalProcessFunctionContext(function_context) + function_context = InternalProcessFunctionContext(j_function_context) process_element = user_defined_func.process_element @@ -106,10 +112,16 @@ def extract_process_function( elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS: function_context = InternalKeyedProcessFunctionContext( - function_context, user_defined_function_proto.key_type_info) + j_function_context, user_defined_function_proto.key_type_info) timer_context = InternalKeyedProcessFunctionOnTimerContext( - timer_context, user_defined_function_proto.key_type_info) + j_timer_context, user_defined_function_proto.key_type_info) + + keyed_state_backend = KeyedStateBackend( + function_context, + j_keyed_state_backend) + + runtime_context.set_keyed_state_backend(keyed_state_backend) on_timer = user_defined_func.on_timer @@ -125,7 +137,7 @@ def extract_process_function( return OneInputOperation(open_func, close_func, process_element_func, on_timer_func) elif func_type == UserDefinedDataStreamFunction.CO_PROCESS: - function_context = InternalProcessFunctionContext(function_context) + function_context = InternalProcessFunctionContext(j_function_context) process_element1 = user_defined_func.process_element1 process_element2 = user_defined_func.process_element2 @@ -142,10 +154,16 @@ def extract_process_function( elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS: function_context = InternalKeyedProcessFunctionContext( - function_context, user_defined_function_proto.key_type_info) + j_function_context, user_defined_function_proto.key_type_info) timer_context = InternalKeyedProcessFunctionOnTimerContext( - timer_context, user_defined_function_proto.key_type_info) + j_timer_context, user_defined_function_proto.key_type_info) + + keyed_state_backend = KeyedStateBackend( + function_context, + j_keyed_state_backend) + + runtime_context.set_keyed_state_backend(keyed_state_backend) on_timer = user_defined_func.on_timer @@ -167,5 +185,81 @@ def extract_process_function( return TwoInputOperation( open_func, close_func, process_element_func1, process_element_func2, on_timer_func) + elif func_type == UserDefinedDataStreamFunction.WINDOW: + + window_operation_descriptor = ( + user_defined_func + ) # type: WindowOperationDescriptor + + def user_key_selector(normal_data): + return normal_data + + window_assigner = window_operation_descriptor.assigner + window_trigger = window_operation_descriptor.trigger + allowed_lateness = window_operation_descriptor.allowed_lateness + late_data_output_tag = window_operation_descriptor.late_data_output_tag + window_state_descriptor = window_operation_descriptor.window_state_descriptor + internal_window_function = window_operation_descriptor.internal_window_function + window_serializer = window_operation_descriptor.window_serializer + window_coder = window_serializer._get_coder() + + if isinstance(window_coder, TimeWindowCoder): + window_converter = TimeWindowConverter() + elif isinstance(window_coder, CountWindowCoder): + window_converter = CountWindowConverter() + else: + window_converter = GlobalWindowConverter() + + internal_timer_service = InternalTimerServiceImpl( + j_timer_context.timerService(), window_converter) + + function_context = InternalKeyedProcessFunctionContext( + j_function_context, user_defined_function_proto.key_type_info) + + window_timer_context = InternalWindowTimerContext( + j_timer_context, + user_defined_function_proto.key_type_info, + window_converter) + + keyed_state_backend = KeyedStateBackend( + function_context, + j_keyed_state_backend, + j_function_context.getWindowSerializer(), + window_converter) + + runtime_context.set_keyed_state_backend(keyed_state_backend) + + window_operator = WindowOperator( + window_assigner, + keyed_state_backend, + user_key_selector, + window_state_descriptor, + internal_window_function, + window_trigger, + allowed_lateness, + late_data_output_tag) + + def open_func(): + window_operator.open(runtime_context, internal_timer_service) + + def close_func(): + window_operator.close() + + def process_element_func(value): + yield from window_operator.process_element(value[1], function_context.timestamp()) + + if window_assigner.is_event_time(): + def on_timer_func(timestamp): + window = window_timer_context.window() + key = window_timer_context.get_current_key() + yield from window_operator.on_event_time(timestamp, key, window) + else: + def on_timer_func(timestamp): + window = window_timer_context.window() + key = window_timer_context.get_current_key() + yield from window_operator.on_processing_time(timestamp, key, window) + + return OneInputOperation(open_func, close_func, process_element_func, on_timer_func) + else: raise Exception("Unknown function type {0}.".format(func_type)) diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py index 9ffca6aeedd..9cccce3ae94 100644 --- a/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py +++ b/flink-python/pyflink/fn_execution/datastream/embedded/process_function.py @@ -90,3 +90,19 @@ class InternalKeyedProcessFunctionOnTimerContext(KeyedProcessFunction.OnTimerCon def get_current_key(self): return self._key_converter.to_internal(self._context.getCurrentKey()) + + +class InternalWindowTimerContext(object): + def __init__(self, context, key_type_info, window_converter): + self._context = context + self._key_converter = from_type_info(key_type_info) + self._window_converter = window_converter + + def timestamp(self) -> int: + return self._context.timestamp() + + def window(self): + return self._window_converter.to_internal(self._context.getWindow()) + + def get_current_key(self): + return self._key_converter.to_internal(self._context.getCurrentKey()) diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py index c7d9508446e..2d744c06a5f 100644 --- a/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py +++ b/flink-python/pyflink/fn_execution/datastream/embedded/runtime_context.py @@ -20,17 +20,14 @@ from pyflink.datastream.state import (AggregatingStateDescriptor, AggregatingSta ReducingStateDescriptor, ReducingState, MapStateDescriptor, MapState, ListStateDescriptor, ListState, ValueStateDescriptor, ValueState) -from pyflink.fn_execution.datastream.embedded.state_impl import (ValueStateImpl, ListStateImpl, - MapStateImpl, ReducingStateImpl, - AggregatingStateImpl) -from pyflink.fn_execution.embedded.converters import from_type_info -from pyflink.fn_execution.embedded.java_utils import to_java_state_descriptor +from pyflink.fn_execution.embedded.state_impl import KeyedStateBackend class StreamingRuntimeContext(RuntimeContext): def __init__(self, runtime_context, job_parameters): self._runtime_context = runtime_context self._job_parameters = job_parameters + self._keyed_state_backend = None # type: KeyedStateBackend def get_task_name(self) -> str: """ @@ -82,32 +79,26 @@ class StreamingRuntimeContext(RuntimeContext): return self._runtime_context.getMetricGroup() def get_state(self, state_descriptor: ValueStateDescriptor) -> ValueState: - return ValueStateImpl( - self._runtime_context.getState(to_java_state_descriptor(state_descriptor)), - from_type_info(state_descriptor.type_info)) + return self._keyed_state_backend.get_value_state(state_descriptor) def get_list_state(self, state_descriptor: ListStateDescriptor) -> ListState: - return ListStateImpl( - self._runtime_context.getListState(to_java_state_descriptor(state_descriptor)), - from_type_info(state_descriptor.type_info)) + return self._keyed_state_backend.get_list_state(state_descriptor) def get_map_state(self, state_descriptor: MapStateDescriptor) -> MapState: - return MapStateImpl( - self._runtime_context.getMapState(to_java_state_descriptor(state_descriptor)), - from_type_info(state_descriptor.type_info)) + return self._keyed_state_backend.get_map_state(state_descriptor) def get_reducing_state(self, state_descriptor: ReducingStateDescriptor) -> ReducingState: - return ReducingStateImpl( - self._runtime_context.getState(to_java_state_descriptor(state_descriptor)), - from_type_info(state_descriptor.type_info), - state_descriptor.get_reduce_function()) + return self._keyed_state_backend.get_reducing_state(state_descriptor) def get_aggregating_state(self, state_descriptor: AggregatingStateDescriptor) -> AggregatingState: - return AggregatingStateImpl( - self._runtime_context.getState(to_java_state_descriptor(state_descriptor)), - from_type_info(state_descriptor.type_info), - state_descriptor.get_agg_function()) + return self._keyed_state_backend.get_aggregating_state(state_descriptor) + + def set_keyed_state_backend(self, keyed_state_backend: KeyedStateBackend): + self._keyed_state_backend = keyed_state_backend + + def get_keyed_state_backend(self): + return self._keyed_state_backend @staticmethod def of(runtime_context, job_parameters): diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py b/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py index e88c6abc304..6973be038eb 100644 --- a/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py +++ b/flink-python/pyflink/fn_execution/datastream/embedded/state_impl.py @@ -15,27 +15,42 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from typing import List, Iterable, Tuple, Dict +from abc import ABC +from typing import List, Iterable, Tuple, Dict, Collection from pyflink.datastream import ReduceFunction, AggregateFunction -from pyflink.datastream.state import (ValueState, T, State, ListState, IN, OUT, ReducingState, - AggregatingState, MapState, V, K) +from pyflink.datastream.state import (T, IN, OUT, V, K) from pyflink.fn_execution.embedded.converters import (DataConverter, DictDataConverter, ListDataConverter) +from pyflink.fn_execution.internal_state import (InternalValueState, InternalKvState, + InternalListState, InternalReducingState, + InternalAggregatingState, InternalMapState, + N) -class StateImpl(State): - def __init__(self, state, value_converter: DataConverter): +class StateImpl(InternalKvState, ABC): + def __init__(self, + state, + value_converter: DataConverter, + window_converter: DataConverter = None): self._state = state self._value_converter = value_converter + self._window_converter = window_converter def clear(self): self._state.clear() + def set_current_namespace(self, namespace) -> None: + j_window = self._window_converter.to_external(namespace) + self._state.setCurrentNamespace(j_window) + -class ValueStateImpl(StateImpl, ValueState): - def __init__(self, value_state, value_converter: DataConverter): - super(ValueStateImpl, self).__init__(value_state, value_converter) +class ValueStateImpl(StateImpl, InternalValueState): + def __init__(self, + value_state, + value_converter: DataConverter, + window_converter: DataConverter = None): + super(ValueStateImpl, self).__init__(value_state, value_converter, window_converter) def value(self) -> T: return self._value_converter.to_internal(self._state.value()) @@ -44,10 +59,13 @@ class ValueStateImpl(StateImpl, ValueState): self._state.update(self._value_converter.to_external(value)) -class ListStateImpl(StateImpl, ListState): +class ListStateImpl(StateImpl, InternalListState): - def __init__(self, list_state, value_converter: ListDataConverter): - super(ListStateImpl, self).__init__(list_state, value_converter) + def __init__(self, + list_state, + value_converter: ListDataConverter, + window_converter: DataConverter = None): + super(ListStateImpl, self).__init__(list_state, value_converter, window_converter) self._element_converter = value_converter._field_converter def update(self, values: List[T]) -> None: @@ -57,19 +75,27 @@ class ListStateImpl(StateImpl, ListState): self._state.addAll(self._value_converter.to_external(values)) def get(self) -> OUT: - return self._value_converter.to_internal(self._state.get()) + states = self._value_converter.to_internal(self._state.get()) + if states: + yield from states def add(self, value: IN) -> None: self._state.add(self._element_converter.to_external(value)) + def merge_namespaces(self, target: N, sources: Collection[N]) -> None: + j_target = self._window_converter.to_external(target) + j_sources = [self._window_converter.to_external(window) for window in sources] + self._state.mergeNamespaces(j_target, j_sources) + -class ReducingStateImpl(StateImpl, ReducingState): +class ReducingStateImpl(StateImpl, InternalReducingState): def __init__(self, value_state, value_converter: DataConverter, - reduce_function: ReduceFunction): - super(ReducingStateImpl, self).__init__(value_state, value_converter) + reduce_function: ReduceFunction, + window_converter: DataConverter = None): + super(ReducingStateImpl, self).__init__(value_state, value_converter, window_converter) self._reduce_function = reduce_function def get(self) -> OUT: @@ -88,13 +114,34 @@ class ReducingStateImpl(StateImpl, ReducingState): self._state.update(self._value_converter.to_external(reduce_value)) + def merge_namespaces(self, target: N, sources: Collection[N]) -> None: + merged = None + for source in sources: + self.set_current_namespace(source) + source_state = self.get() + + if source_state is None: + continue -class AggregatingStateImpl(StateImpl, AggregatingState): + self.clear() + + if merged is None: + merged = source_state + else: + merged = self._reduce_function.reduce(merged, source_state) + + if merged is not None: + self.set_current_namespace(target) + self._state.update(self._value_converter.to_external(merged)) + + +class AggregatingStateImpl(StateImpl, InternalAggregatingState): def __init__(self, value_state, value_converter, - agg_function: AggregateFunction): - super(AggregatingStateImpl, self).__init__(value_state, value_converter) + agg_function: AggregateFunction, + window_converter: DataConverter = None): + super(AggregatingStateImpl, self).__init__(value_state, value_converter, window_converter) self._agg_function = agg_function def get(self) -> OUT: @@ -117,15 +164,38 @@ class AggregatingStateImpl(StateImpl, AggregatingState): accumulator = self._agg_function.add(value, accumulator) self._state.update(self._value_converter.to_external(accumulator)) + def merge_namespaces(self, target: N, sources: Collection[N]) -> None: + merged = None + for source in sources: + self.set_current_namespace(source) + source_state = self.get() + + if source_state is None: + continue + + self.clear() -class MapStateImpl(StateImpl, MapState): - def __init__(self, map_state, map_converter: DictDataConverter): - super(MapStateImpl, self).__init__(map_state, map_converter) + if merged is None: + merged = source_state + else: + merged = self._agg_function.merge(merged, source_state) + + if merged is not None: + self.set_current_namespace(target) + self._state.update(self._value_converter.to_external(merged)) + + +class MapStateImpl(StateImpl, InternalMapState): + def __init__(self, + map_state, + map_converter: DictDataConverter, + window_converter: DataConverter = None): + super(MapStateImpl, self).__init__(map_state, map_converter, window_converter) self._k_converter = map_converter._key_converter self._v_converter = map_converter._value_converter def get(self, key: K) -> V: - return self._value_converter.to_internal( + return self._v_converter.to_internal( self._state.get(self._k_converter.to_external(key))) def put(self, key: K, value: V) -> None: @@ -142,17 +212,22 @@ class MapStateImpl(StateImpl, MapState): def items(self) -> Iterable[Tuple[K, V]]: entries = self._state.entries() - for entry in entries: - yield (self._k_converter.to_internal(entry.getKey()), - self._v_converter.to_internal(entry.getValue())) + if entries: + for entry in entries: + yield (self._k_converter.to_internal(entry.getKey()), + self._v_converter.to_internal(entry.getValue())) def keys(self) -> Iterable[K]: - for k in self._state.keys(): - yield self._k_converter.to_internal(k) + keys = self._state.keys() + if keys: + for k in keys: + yield self._k_converter.to_internal(k) def values(self) -> Iterable[V]: - for v in self._state.values(): - yield self._v_converter.to_internal(v) + values = self._state.values() + if values: + for v in values: + yield self._v_converter.to_internal(v) def is_empty(self) -> bool: return self._state.isEmpty() diff --git a/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py b/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py index fab9440b230..690abe10140 100644 --- a/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py +++ b/flink-python/pyflink/fn_execution/datastream/embedded/timerservice_impl.py @@ -16,6 +16,7 @@ # limitations under the License. ################################################################################ from pyflink.datastream import TimerService +from pyflink.fn_execution.datastream.timerservice import InternalTimerService, N class TimerServiceImpl(TimerService): @@ -39,3 +40,31 @@ class TimerServiceImpl(TimerService): def delete_event_time_timer(self, timestamp: int): self._timer_service.deleteEventTimeTimer(timestamp) + + +class InternalTimerServiceImpl(InternalTimerService[N]): + def __init__(self, timer_service, window_converter): + self._timer_service = timer_service + self._window_converter = window_converter + + def current_processing_time(self): + return self._timer_service.currentProcessingTime() + + def current_watermark(self): + return self._timer_service.currentWatermark() + + def register_processing_time_timer(self, namespace: N, timestamp: int): + window = self._window_converter.to_external(namespace) + self._timer_service.registerProcessingTimeTimer(window, timestamp) + + def register_event_time_timer(self, namespace: N, timestamp: int): + window = self._window_converter.to_external(namespace) + self._timer_service.registerEventTimeTimer(window, timestamp) + + def delete_event_time_timer(self, namespace: N, timestamp: int): + window = self._window_converter.to_external(namespace) + self._timer_service.deleteEventTimeTimer(window, timestamp) + + def delete_processing_time_timer(self, namespace: N, timestamp: int): + window = self._window_converter.to_external(namespace) + self._timer_service.deleteProcessingTimeTimer(window, timestamp) diff --git a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py index a5ceb7b6ab5..a2a8c0a8d06 100644 --- a/flink-python/pyflink/fn_execution/datastream/window/window_operator.py +++ b/flink-python/pyflink/fn_execution/datastream/window/window_operator.py @@ -19,6 +19,7 @@ import typing from typing import TypeVar, Iterable, Collection, Optional from pyflink.common.constants import MAX_LONG_VALUE +from pyflink.common.typeinfo import PickledBytesTypeInfo from pyflink.datastream import WindowAssigner, Trigger, MergingWindowAssigner, TriggerResult from pyflink.datastream.functions import KeyedStateStore, RuntimeContext, InternalWindowFunction from pyflink.datastream.output_tag import OutputTag @@ -322,9 +323,16 @@ class WindowOperator(object): if isinstance(self.window_state, InternalMergingState): self.window_merging_state = self.window_state - window_coder = self.keyed_state_backend.namespace_coder - self.merging_sets_state = self.keyed_state_backend.get_map_state( - "merging-window-set", window_coder, window_coder) + if hasattr(self.keyed_state_backend, 'namespace_coder'): + window_coder = self.keyed_state_backend.namespace_coder + self.merging_sets_state = self.keyed_state_backend.get_map_state( + "merging-window-set", window_coder, window_coder) + else: + state_descriptor = MapStateDescriptor( + "merging-window-set", + PickledBytesTypeInfo(), + PickledBytesTypeInfo()) + self.merging_sets_state = self.keyed_state_backend.get_map_state(state_descriptor) self.merge_function = WindowMergeFunction(self) diff --git a/flink-python/pyflink/fn_execution/embedded/converters.py b/flink-python/pyflink/fn_execution/embedded/converters.py index ee75e6d5beb..d2ba951f884 100644 --- a/flink-python/pyflink/fn_execution/embedded/converters.py +++ b/flink-python/pyflink/fn_execution/embedded/converters.py @@ -15,19 +15,26 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from abc import ABC, abstractmethod - import pickle +from abc import ABC, abstractmethod from typing import TypeVar, List, Tuple +from pemja import findClass + from pyflink.common import Row, RowKind, TypeInformation from pyflink.common.typeinfo import (PickledBytesTypeInfo, PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo, RowTypeInfo, TupleTypeInfo, MapTypeInfo, ListTypeInfo) +from pyflink.datastream import TimeWindow, CountWindow, GlobalWindow IN = TypeVar('IN') OUT = TypeVar('OUT') +# Java Window +JTimeWindow = findClass('org.apache.flink.table.runtime.operators.window.TimeWindow') +JCountWindow = findClass('org.apache.flink.table.runtime.operators.window.CountWindow') +JGlobalWindow = findClass('org.apache.flink.streaming.api.windowing.windows.GlobalWindow') + class DataConverter(ABC): @@ -180,6 +187,32 @@ class DictDataConverter(DataConverter): for k, v in value.items()} +class TimeWindowConverter(DataConverter): + def to_internal(self, value) -> TimeWindow: + return TimeWindow(value.getStart(), value.getEnd()) + + def to_external(self, value: TimeWindow) -> OUT: + return JTimeWindow(value.start, value.end) + + +class CountWindowConverter(DataConverter): + + def to_internal(self, value) -> CountWindow: + return CountWindow(value.getId()) + + def to_external(self, value: CountWindow) -> OUT: + return JCountWindow(value.id) + + +class GlobalWindowConverter(DataConverter): + + def to_internal(self, value) -> IN: + return GlobalWindow() + + def to_external(self, value) -> OUT: + return JGlobalWindow.get() + + def from_type_info_proto(type_info): # for data stream type information. from pyflink.fn_execution import flink_fn_execution_pb2 diff --git a/flink-python/pyflink/fn_execution/embedded/operation_utils.py b/flink-python/pyflink/fn_execution/embedded/operation_utils.py index 673734dbf7e..0c0580671ec 100644 --- a/flink-python/pyflink/fn_execution/embedded/operation_utils.py +++ b/flink-python/pyflink/fn_execution/embedded/operation_utils.py @@ -103,7 +103,7 @@ def create_table_operation_from_proto(proto, input_coder_info, output_coder_into def create_one_input_user_defined_data_stream_function_from_protos( function_infos, input_coder_info, output_coder_info, runtime_context, - function_context, timer_context, job_parameters): + function_context, timer_context, job_parameters, keyed_state_backend): serialized_fns = [pare_user_defined_data_stream_function_proto(proto) for proto in function_infos] input_data_converter = ( @@ -118,14 +118,15 @@ def create_one_input_user_defined_data_stream_function_from_protos( runtime_context, function_context, timer_context, - job_parameters) + job_parameters, + keyed_state_backend) return function_operation def create_two_input_user_defined_data_stream_function_from_protos( function_infos, input_coder_info1, input_coder_info2, output_coder_info, runtime_context, - function_context, timer_context, job_parameters): + function_context, timer_context, job_parameters, keyed_state_backend): serialized_fns = [pare_user_defined_data_stream_function_proto(proto) for proto in function_infos] @@ -146,6 +147,7 @@ def create_two_input_user_defined_data_stream_function_from_protos( runtime_context, function_context, timer_context, - job_parameters) + job_parameters, + keyed_state_backend) return function_operation diff --git a/flink-python/pyflink/fn_execution/embedded/operations.py b/flink-python/pyflink/fn_execution/embedded/operations.py index 940a98c4677..88eee565224 100644 --- a/flink-python/pyflink/fn_execution/embedded/operations.py +++ b/flink-python/pyflink/fn_execution/embedded/operations.py @@ -66,14 +66,16 @@ class OneInputFunctionOperation(FunctionOperation): runtime_context, function_context, timer_context, - job_parameters): + job_parameters, + keyed_state_backend): operations = ( [extract_process_function( serialized_fn, runtime_context, function_context, timer_context, - job_parameters) + job_parameters, + keyed_state_backend) for serialized_fn in serialized_fns]) super(OneInputFunctionOperation, self).__init__(operations, output_data_converter) self._input_data_converter = input_data_converter @@ -96,14 +98,16 @@ class TwoInputFunctionOperation(FunctionOperation): runtime_context, function_context, timer_context, - job_parameters): + job_parameters, + keyed_state_backend): operations = ( [extract_process_function( serialized_fn, runtime_context, function_context, timer_context, - job_parameters) + job_parameters, + keyed_state_backend) for serialized_fn in serialized_fns]) super(TwoInputFunctionOperation, self).__init__(operations, output_data_converter) self._input_data_converter1 = input_data_converter1 diff --git a/flink-python/pyflink/fn_execution/embedded/state_impl.py b/flink-python/pyflink/fn_execution/embedded/state_impl.py new file mode 100644 index 00000000000..21600df3211 --- /dev/null +++ b/flink-python/pyflink/fn_execution/embedded/state_impl.py @@ -0,0 +1,87 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +from pemja import findClass + +from pyflink.datastream.state import (ValueStateDescriptor, ListStateDescriptor, MapStateDescriptor, + StateDescriptor, ReducingStateDescriptor, + AggregatingStateDescriptor) +from pyflink.fn_execution.datastream.embedded.state_impl import (ValueStateImpl, ListStateImpl, + MapStateImpl, ReducingStateImpl, + AggregatingStateImpl) +from pyflink.fn_execution.embedded.converters import from_type_info +from pyflink.fn_execution.embedded.java_utils import to_java_state_descriptor + +JVoidNamespace = findClass('org.apache.flink.runtime.state.VoidNamespace') +JVoidNamespaceSerializer = findClass('org.apache.flink.runtime.state.VoidNamespaceSerializer') + +JVoidNamespace_INSTANCE = JVoidNamespace.INSTANCE +JVoidNamespaceSerializer_INSTANCE = JVoidNamespaceSerializer.INSTANCE + + +class KeyedStateBackend(object): + def __init__(self, + function_context, + keyed_state_backend, + window_serializer=JVoidNamespaceSerializer_INSTANCE, + window_converter=None): + self._function_context = function_context + self._keyed_state_backend = keyed_state_backend + self._window_serializer = window_serializer + self._window_converter = window_converter + + def get_current_key(self): + return self._function_context.get_current_key() + + def get_value_state(self, state_descriptor: ValueStateDescriptor) -> ValueStateImpl: + return ValueStateImpl( + self._get_or_create_keyed_state(state_descriptor), + from_type_info(state_descriptor.type_info), + self._window_converter) + + def get_list_state(self, state_descriptor: ListStateDescriptor) -> ListStateImpl: + return ListStateImpl( + self._get_or_create_keyed_state(state_descriptor), + from_type_info(state_descriptor.type_info), + self._window_converter) + + def get_map_state(self, state_descriptor: MapStateDescriptor) -> MapStateImpl: + return MapStateImpl( + self._get_or_create_keyed_state(state_descriptor), + from_type_info(state_descriptor.type_info), + self._window_converter) + + def get_reducing_state(self, state_descriptor: ReducingStateDescriptor): + return ReducingStateImpl( + self._get_or_create_keyed_state(state_descriptor), + from_type_info(state_descriptor.type_info), + state_descriptor.get_reduce_function(), + self._window_converter) + + def get_aggregating_state(self, state_descriptor: AggregatingStateDescriptor): + return AggregatingStateImpl( + self._get_or_create_keyed_state(state_descriptor), + from_type_info(state_descriptor.type_info), + state_descriptor.get_agg_function(), + self._window_converter) + + def _get_or_create_keyed_state(self, state_descriptor: StateDescriptor): + return self._keyed_state_backend.getPartitionedState( + JVoidNamespace_INSTANCE, + self._window_serializer, + to_java_state_descriptor(state_descriptor)) diff --git a/flink-python/setup.py b/flink-python/setup.py index 1fe2e3ac33a..82e5e9b4488 100644 --- a/flink-python/setup.py +++ b/flink-python/setup.py @@ -309,7 +309,7 @@ try: 'cloudpickle==2.1.0', 'avro-python3>=1.8.1,!=1.9.2,<1.10.0', 'pytz>=2018.3', 'fastavro>=1.1.0,<1.4.8', 'requests>=2.26.0', 'protobuf<3.18', - 'pemja==0.2.3;' + 'pemja==0.2.4;' 'python_full_version >= "3.7" and platform_system != "Windows"', 'httplib2>=0.19.0,<=0.20.4', apache_flink_libraries_dependency] diff --git a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java index ce682b26de4..a4e99e9a39b 100644 --- a/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java +++ b/flink-python/src/main/java/org/apache/flink/python/chain/PythonOperatorChainingOptimizer.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonCo import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonKeyedCoProcessOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonKeyedProcessOperator; import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator; +import org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonWindowOperator; import org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator; import org.apache.flink.streaming.api.operators.python.process.ExternalPythonCoProcessOperator; import org.apache.flink.streaming.api.operators.python.process.ExternalPythonKeyedCoProcessOperator; @@ -429,7 +430,8 @@ public class PythonOperatorChainingOptimizer { && (upOperator instanceof EmbeddedPythonKeyedProcessOperator || upOperator instanceof EmbeddedPythonKeyedCoProcessOperator || upOperator instanceof EmbeddedPythonProcessOperator - || upOperator instanceof EmbeddedPythonCoProcessOperator)); + || upOperator instanceof EmbeddedPythonCoProcessOperator + || upOperator instanceof EmbeddedPythonWindowOperator)); } private static boolean arePythonOperatorsInSameExecutionEnvironment( diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java index 701b5cf84b9..296f0e5a697 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java @@ -115,6 +115,7 @@ public abstract class AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT> interpreter.set("function_context", getFunctionContext()); interpreter.set("timer_context", getTimerContext()); interpreter.set("job_parameters", getJobParameters()); + interpreter.set("keyed_state_backend", getKeyedStateBackend()); interpreter.exec( "from pyflink.fn_execution.embedded.operation_utils import create_one_input_user_defined_data_stream_function_from_protos"); @@ -127,7 +128,8 @@ public abstract class AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT> + "runtime_context," + "function_context," + "timer_context," - + "job_parameters)"); + + "job_parameters," + + "keyed_state_backend)"); interpreter.invokeMethod("operation", "open"); } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java index 8915766f45d..232ccda4575 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java @@ -135,6 +135,7 @@ public abstract class AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O interpreter.set("runtime_context", getRuntimeContext()); interpreter.set("function_context", getFunctionContext()); interpreter.set("timer_context", getTimerContext()); + interpreter.set("keyed_state_backend", getKeyedStateBackend()); interpreter.set("job_parameters", getJobParameters()); interpreter.exec( @@ -149,7 +150,8 @@ public abstract class AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O + "runtime_context," + "function_context," + "timer_context," - + "job_parameters)"); + + "job_parameters," + + "keyed_state_backend)"); interpreter.invokeMethod("operation", "open"); } diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java new file mode 100644 index 00000000000..94c68045205 --- /dev/null +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.python.embedded; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.fnexecution.v1.FlinkFnApi; +import org.apache.flink.python.util.ProtoUtils; +import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator; +import org.apache.flink.streaming.api.utils.PythonTypeUtils; +import org.apache.flink.table.runtime.operators.window.Window; +import org.apache.flink.types.Row; + +import pemja.core.object.PyIterator; + +import java.util.List; + +import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.MAP_STATE_WRITE_CACHE_SIZE; +import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED; +import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED; +import static org.apache.flink.python.PythonOptions.STATE_CACHE_SIZE; +import static org.apache.flink.streaming.api.utils.PythonOperatorUtils.inBatchExecutionMode; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link EmbeddedPythonWindowOperator} is responsible for executing user defined python + * ProcessWindowFunction in embedded Python environment. + */ +@Internal +public class EmbeddedPythonWindowOperator<K, IN, OUT, W extends Window> + extends AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT> + implements Triggerable<K, W> { + + private static final long serialVersionUID = 1L; + + /** For serializing the window in checkpoints. */ + private final TypeSerializer<W> windowSerializer; + + /** The TypeInformation of the key. */ + private transient TypeInformation<K> keyTypeInfo; + + private transient PythonTypeUtils.DataConverter<K, Object> keyConverter; + + private transient WindowContextImpl windowContext; + + private transient WindowTimerContextImpl windowTimerContext; + + public EmbeddedPythonWindowOperator( + Configuration config, + DataStreamPythonFunctionInfo pythonFunctionInfo, + TypeInformation<IN> inputTypeInfo, + TypeInformation<OUT> outputTypeInfo, + TypeSerializer<W> windowSerializer) { + super(config, pythonFunctionInfo, inputTypeInfo, outputTypeInfo); + this.windowSerializer = checkNotNull(windowSerializer); + } + + @Override + public void open() throws Exception { + keyTypeInfo = ((RowTypeInfo) this.getInputTypeInfo()).getTypeAt(0); + + keyConverter = PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(keyTypeInfo); + + InternalTimerService<W> internalTimerService = + getInternalTimerService("window-timers", windowSerializer, this); + + windowContext = new WindowContextImpl(internalTimerService); + + windowTimerContext = new WindowTimerContextImpl(internalTimerService); + + super.open(); + } + + @Override + public List<FlinkFnApi.UserDefinedDataStreamFunction> createUserDefinedFunctionsProto() { + return ProtoUtils.createUserDefinedDataStreamStatefulFunctionProtos( + getPythonFunctionInfo(), + getRuntimeContext(), + getJobParameters(), + keyTypeInfo, + inBatchExecutionMode(getKeyedStateBackend()), + config.get(PYTHON_METRIC_ENABLED), + config.get(PYTHON_PROFILE_ENABLED), + false, + config.get(STATE_CACHE_SIZE), + config.get(MAP_STATE_READ_CACHE_SIZE), + config.get(MAP_STATE_WRITE_CACHE_SIZE)); + } + + @Override + public void onEventTime(InternalTimer<K, W> timer) throws Exception { + collector.setAbsoluteTimestamp(timer.getTimestamp()); + invokeUserFunction(timer); + } + + @Override + public void onProcessingTime(InternalTimer<K, W> timer) throws Exception { + collector.eraseTimestamp(); + invokeUserFunction(timer); + } + + @Override + public Object getFunctionContext() { + return windowContext; + } + + @Override + public Object getTimerContext() { + return windowTimerContext; + } + + @Override + public <T> DataStreamPythonFunctionOperator<T> copy( + DataStreamPythonFunctionInfo pythonFunctionInfo, TypeInformation<T> outputTypeInfo) { + return null; + } + + private void invokeUserFunction(InternalTimer<K, W> timer) throws Exception { + windowTimerContext.timer = timer; + interpreter.invokeMethod("operation", "on_timer", timer.getTimestamp()); + + PyIterator results = + (PyIterator) + interpreter.invokeMethod("operation", "on_timer", timer.getTimestamp()); + + while (results.hasNext()) { + OUT result = outputDataConverter.toInternal(results.next()); + collector.collect(result); + } + results.close(); + + windowTimerContext.timer = null; + } + + private class WindowContextImpl { + private final InternalTimerService<W> timerService; + + WindowContextImpl(InternalTimerService<W> timerService) { + this.timerService = timerService; + } + + public TypeSerializer<W> getWindowSerializer() { + return windowSerializer; + } + + public long timestamp() { + return timestamp; + } + + public InternalTimerService<W> timerService() { + return timerService; + } + + @SuppressWarnings("unchecked") + public Object getCurrentKey() { + return keyConverter.toExternal( + (K) ((Row) EmbeddedPythonWindowOperator.this.getCurrentKey()).getField(0)); + } + } + + private class WindowTimerContextImpl { + private final InternalTimerService<W> timerService; + + private InternalTimer<K, W> timer; + + WindowTimerContextImpl(InternalTimerService<W> timerService) { + this.timerService = timerService; + } + + public InternalTimerService<W> timerService() { + return timerService; + } + + public long timestamp() { + return timer.getTimestamp(); + } + + public W getWindow() { + return timer.getNamespace(); + } + + @SuppressWarnings("unchecked") + public Object getCurrentKey() { + return keyConverter.toExternal((K) ((Row) timer.getKey()).getField(0)); + } + } +} diff --git a/flink-python/src/main/resources/META-INF/NOTICE b/flink-python/src/main/resources/META-INF/NOTICE index a217a9497e4..61382eb13cc 100644 --- a/flink-python/src/main/resources/META-INF/NOTICE +++ b/flink-python/src/main/resources/META-INF/NOTICE @@ -28,7 +28,7 @@ This project bundles the following dependencies under the Apache Software Licens - org.apache.beam:beam-vendor-sdks-java-extensions-protobuf:2.38.0 - org.apache.beam:beam-vendor-guava-26_0-jre:0.1 - org.apache.beam:beam-vendor-grpc-1_43_2:0.1 -- com.alibaba:pemja:0.2.3 +- com.alibaba:pemja:0.2.4 This project bundles the following dependencies under the BSD license. See bundled license files for details