Yun Tang created FLINK-24105:
--------------------------------
Summary: state TTL might not take effect for pyflink
Key: FLINK-24105
URL: https://issues.apache.org/jira/browse/FLINK-24105
Project: Flink
Issue Type: Bug
Components: API / Python, Runtime / State Backends
Reporter: Yun Tang
Fix For: 1.14.0
Since pyflink has its own data cache on python side, it might still read the
data from python side even TTL has expired.
Scripts below could reproduce this:
{code:python}
from pyflink.common.time import Time
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig,
ListStateDescriptor, MapStateDescriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic,
RuntimeContext, KeyedProcessFunction, \
EmbeddedRocksDBStateBackend
import time
from datetime import datetime
def test_keyed_process_function_with_state():
env = StreamExecutionEnvironment.get_execution_environment()
env.get_config().set_auto_watermark_interval(2000)
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_state_backend(EmbeddedRocksDBStateBackend())
data_stream = env.from_collection([(1, 'hi', '1603708211000'),
(3, 'hi', '1603708226000'),
(10, 'hi', '1603708226000'),
(6, 'hello', '1603708293000')],
type_info=Types.ROW([Types.INT(),
Types.STRING(),
Types.STRING()]))
class MyProcessFunction(KeyedProcessFunction):
def __init__(self):
self.value_state = None
self.list_state = None
self.map_state = None
def open(self, runtime_context: RuntimeContext):
state_ttl_config = StateTtlConfig \
.new_builder(Time.seconds(1)) \
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
.never_return_expired() \
.build()
value_state_descriptor = ValueStateDescriptor('value_state',
Types.INT())
value_state_descriptor.enable_time_to_live(state_ttl_config)
self.value_state = runtime_context.get_state(value_state_descriptor)
list_state_descriptor = ListStateDescriptor('list_state',
Types.INT())
list_state_descriptor.enable_time_to_live(state_ttl_config)
self.list_state =
runtime_context.get_list_state(list_state_descriptor)
map_state_descriptor = MapStateDescriptor('map_state', Types.INT(),
Types.STRING())
map_state_descriptor.enable_time_to_live(state_ttl_config)
self.map_state = runtime_context.get_map_state(map_state_descriptor)
def process_element(self, value, ctx):
time.sleep(20)
current_value = self.value_state.value()
self.value_state.update(value[0])
current_list = [_ for _ in self.list_state.get()]
self.list_state.add(value[0])
map_entries_string = []
for k, v in self.map_state.items():
map_entries_string.append(str(k) + ': ' + str(v))
map_entries_string = '{' + ', '.join(map_entries_string) + '}'
self.map_state.put(value[0], value[1])
current_key = ctx.get_current_key()
yield "time: {}, current key: {}, current value state: {}, current
list state: {}, " \
"current map state: {}, current value:
{}".format(str(datetime.now().time()),
str(current_key),
str(current_value),
str(current_list),
map_entries_string,
str(value))
def on_timer(self, timestamp, ctx):
pass
data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
.process(MyProcessFunction(), output_type=Types.STRING()) \
.print()
env.execute('test time stamp assigner with keyed process function')
if __name__ == '__main__':
test_keyed_process_function_with_state()
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)