[ 
https://issues.apache.org/jira/browse/FLINK-34625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859585#comment-17859585
 ] 

marksix commented on FLINK-34625:
---------------------------------

Is it time yet?

> TTL doesn't seem to work in pyflink 
> ------------------------------------
>
>                 Key: FLINK-34625
>                 URL: https://issues.apache.org/jira/browse/FLINK-34625
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.18.1
>         Environment: Image used: flink:1.18.1-scala_2.12-java11
>            Reporter: Mark Lidenberg
>            Priority: Major
>
> I've made a simple example to test the ttl and couldn't get the expected 
> results. I went further and replicated this example in Java and it worked 
> just fine. There is an inconsistency in behavior, so there is something wrong 
> in pyflink or my pyflink setup. 
> Here is a code to reproduce. In the example I create a state with ttl 1 
> second and then process events every 1.5 seconds and print current state.  I 
> expect it to print `None, None, None, ...` (because ttl expires after 1.5 
> seconds), but instead it prints `None, "state", "state, ...`. In Java it 
> works as expected, prints `Null, Null, ...`
> ```python
> import time
> from pyflink.common import Time, Types
> from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
> StreamExecutionEnvironment
> from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor
> class Processor(KeyedProcessFunction):
>     def open(self, runtime_context: RuntimeContext):
>         state_descriptor = ValueStateDescriptor(
>             name="my_state",
>             value_type_info=Types.STRING(),
>         )
>         state_descriptor.enable_time_to_live(
>             ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
>             .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
>             
> .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>             .build()
>         )
>         self.state = runtime_context.get_state(state_descriptor)
>     def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
>         # Print current state
>         print(self.state.value())
>         # expect to print `None` all the time, but prints: `None, 'state', 
> 'state', ...` instead
>         # Update state
>         self.state.update("state")
>         # sleep to reset the state
>         time.sleep(1.5)
> if __name__ == "__main__":
>     # Init environment
>     environment = 
> StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
>     # Setup pipeline
>     (
>         environment.from_collection(
>             collection=list(range(10)),
>         )
>         .key_by(lambda value: 0)
>         .process(Processor())
>     )
>     # Execute pipeline
>     environment.execute("ttl_test")
> ```
>  
> ```java
> import org.apache.flink.api.common.state.StateTtlConfig;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.time.Time;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.metrics.Histogram;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import java.io.IOException;
> import java.time.LocalDateTime;
> public class Processor extends KeyedProcessFunction<Integer, String, String> {
>     private transient ValueState<String> state;
>     @Override
>     public void open(Configuration parameters) {
>         var stateTtlConfig = StateTtlConfig
>                 .newBuilder(Time.seconds(1))
>                 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>                 
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
>                 .build();
>         var stateDescriptor = new ValueStateDescriptor<>("state", 
> String.class);
>         stateDescriptor.enableTimeToLive(stateTtlConfig);
>         state = getRuntimeContext().getState(stateDescriptor);
>     }
>     @Override
>     public void processElement(String event, Context context, 
> Collector<String> collector) throws IOException, InterruptedException {
>         // print state
>         var state = state.value();
>         System.out.println(state); # prints `Null, Null, ...` 
>         // update state
>         state.update(LocalDateTime.now().toString());
>         // sleep to reset the state
>         Thread.sleep(1500);
>     }
> }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to