[
https://issues.apache.org/jira/browse/FLINK-24105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Huang Xingbo reassigned FLINK-24105:
------------------------------------
Assignee: Huang Xingbo
> state TTL might not take effect for pyflink
> -------------------------------------------
>
> Key: FLINK-24105
> URL: https://issues.apache.org/jira/browse/FLINK-24105
> Project: Flink
> Issue Type: Sub-task
> Components: API / Python, Runtime / State Backends
> Reporter: Yun Tang
> Assignee: Huang Xingbo
> Priority: Blocker
> Fix For: 1.14.0
>
>
> Since pyflink has its own data cache on python side, it might still read the
> data from python side even TTL has expired.
> Scripts below could reproduce this:
> {code:python}
> from pyflink.common.time import Time
> from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
> ListStateDescriptor, MapStateDescriptor
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment,
> TimeCharacteristic, RuntimeContext, KeyedProcessFunction, \
> EmbeddedRocksDBStateBackend
> import time
> from datetime import datetime
> def test_keyed_process_function_with_state():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.get_config().set_auto_watermark_interval(2000)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> env.set_state_backend(EmbeddedRocksDBStateBackend())
> data_stream = env.from_collection([(1, 'hi', '1603708211000'),
> (3, 'hi', '1603708226000'),
> (10, 'hi', '1603708226000'),
> (6, 'hello', '1603708293000')],
> type_info=Types.ROW([Types.INT(),
> Types.STRING(),
>
> Types.STRING()]))
> class MyProcessFunction(KeyedProcessFunction):
> def __init__(self):
> self.value_state = None
> self.list_state = None
> self.map_state = None
> def open(self, runtime_context: RuntimeContext):
> state_ttl_config = StateTtlConfig \
> .new_builder(Time.seconds(1)) \
> .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
> .never_return_expired() \
> .build()
> value_state_descriptor = ValueStateDescriptor('value_state',
> Types.INT())
> value_state_descriptor.enable_time_to_live(state_ttl_config)
> self.value_state =
> runtime_context.get_state(value_state_descriptor)
> list_state_descriptor = ListStateDescriptor('list_state',
> Types.INT())
> list_state_descriptor.enable_time_to_live(state_ttl_config)
> self.list_state =
> runtime_context.get_list_state(list_state_descriptor)
> map_state_descriptor = MapStateDescriptor('map_state',
> Types.INT(), Types.STRING())
> map_state_descriptor.enable_time_to_live(state_ttl_config)
> self.map_state =
> runtime_context.get_map_state(map_state_descriptor)
> def process_element(self, value, ctx):
> time.sleep(20)
> current_value = self.value_state.value()
> self.value_state.update(value[0])
> current_list = [_ for _ in self.list_state.get()]
> self.list_state.add(value[0])
> map_entries_string = []
> for k, v in self.map_state.items():
> map_entries_string.append(str(k) + ': ' + str(v))
> map_entries_string = '{' + ', '.join(map_entries_string) + '}'
> self.map_state.put(value[0], value[1])
> current_key = ctx.get_current_key()
> yield "time: {}, current key: {}, current value state: {},
> current list state: {}, " \
> "current map state: {}, current value:
> {}".format(str(datetime.now().time()),
>
> str(current_key),
>
> str(current_value),
>
> str(current_list),
>
> map_entries_string,
>
> str(value))
> def on_timer(self, timestamp, ctx):
> pass
> data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
> .process(MyProcessFunction(), output_type=Types.STRING()) \
> .print()
> env.execute('test time stamp assigner with keyed process function')
> if __name__ == '__main__':
> test_keyed_process_function_with_state()
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)