[
https://issues.apache.org/jira/browse/FLINK-34625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859585#comment-17859585
]
marksix commented on FLINK-34625:
---------------------------------
Is it time yet?
> TTL doesn't seem to work in pyflink
> ------------------------------------
>
> Key: FLINK-34625
> URL: https://issues.apache.org/jira/browse/FLINK-34625
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.18.1
> Environment: Image used: flink:1.18.1-scala_2.12-java11
> Reporter: Mark Lidenberg
> Priority: Major
>
> I've made a simple example to test the ttl and couldn't get the expected
> results. I went further and replicated this example in Java and it worked
> just fine. There is an inconsistency in behavior, so there is something wrong
> in pyflink or my pyflink setup.
> Here is a code to reproduce. In the example I create a state with ttl 1
> second and then process events every 1.5 seconds and print current state. I
> expect it to print `None, None, None, ...` (because ttl expires after 1.5
> seconds), but instead it prints `None, "state", "state, ...`. In Java it
> works as expected, prints `Null, Null, ...`
> ```python
> import time
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext,
> StreamExecutionEnvironment
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
> class Processor(KeyedProcessFunction):
> def open(self, runtime_context: RuntimeContext):
> state_descriptor = ValueStateDescriptor(
> name="my_state",
> value_type_info=Types.STRING(),
> )
> state_descriptor.enable_time_to_live(
> ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
> .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build()
> )
> self.state = runtime_context.get_state(state_descriptor)
> def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
> # Print current state
> print(self.state.value())
> # expect to print `None` all the time, but prints: `None, 'state',
> 'state', ...` instead
> # Update state
> self.state.update("state")
> # sleep to reset the state
> time.sleep(1.5)
> if __name__ == "__main__":
> # Init environment
> environment =
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
> # Setup pipeline
> (
> environment.from_collection(
> collection=list(range(10)),
> )
> .key_by(lambda value: 0)
> .process(Processor())
> )
> # Execute pipeline
> environment.execute("ttl_test")
> ```
>
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import java.io.IOException;
> import java.time.LocalDateTime;
> public class Processor extends KeyedProcessFunction<Integer, String, String> {
> private transient ValueState<String> state;
> @Override
> public void open(Configuration parameters) {
> var stateTtlConfig = StateTtlConfig
> .newBuilder(Time.seconds(1))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> .build();
> var stateDescriptor = new ValueStateDescriptor<>("state",
> String.class);
> stateDescriptor.enableTimeToLive(stateTtlConfig);
> state = getRuntimeContext().getState(stateDescriptor);
> }
> @Override
> public void processElement(String event, Context context,
> Collector<String> collector) throws IOException, InterruptedException {
> // print state
> var state = state.value();
> System.out.println(state); # prints `Null, Null, ...`
> // update state
> state.update(LocalDateTime.now().toString());
> // sleep to reset the state
> Thread.sleep(1500);
> }
> }
> ```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)