[ 
https://issues.apache.org/jira/browse/FLINK-14951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16982556#comment-16982556
 ] 

Andrey Zagrebin commented on FLINK-14951:
-----------------------------------------

[~karmagyz] 
Could we fix the underlying concurrency problem?
This should be doable if we substitute _freeze/unfreeze_ methods with e.g. this 
kind of method:
{code:java}
<T> MonotonicTTLTimeProvider#doWithFrozenTime(Function<Long, T> action) {
  synchronized (lock) {
    return action.accept(getCurrentTimestamp());
  }
}{code}
and use it in TtlVerifyUpdateFunction#performUpdate like:

 
{code:java}
return MonotonicTTLTimeProvider.doWithFrozenTime(frozenTimestamp -> {
  State state = states.get(verifier.getId());
  Object valueBeforeUpdate = verifier.get(state);
  verifier.update(state, update);
  Object updatedValue = verifier.get(state);
  return new TtlUpdateContext<>(valueBeforeUpdate, update, updatedValue, 
frozenTimestamp);
});
{code}
wdyt?

> State TTL backend end-to-end test fail when taskManager has multiple slot
> -------------------------------------------------------------------------
>
>                 Key: FLINK-14951
>                 URL: https://issues.apache.org/jira/browse/FLINK-14951
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>         Environment: centos 7
> java 8
>            Reporter: Yangze Guo
>            Priority: Minor
>
> When I run flink end to end tests, the State TTL backend tests fail. The log 
> of TaskManager show below:
> 2019-11-26 20:22:03,837 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - TtlVerifyUpdateFunction -> Sink: PrintFailedVerifications 
> (3/3) (23f969ddb3e13fcdd3ba9823f50b0eab) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Timestamps before and after the update do 
> not match.
>       at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>       at 
> org.apache.flink.streaming.tests.TtlVerifyUpdateFunction.performUpdate(TtlVerifyUpdateFunction.java:124)
>       at 
> org.apache.flink.streaming.tests.TtlVerifyUpdateFunction.generateUpdateAndVerificationContext(TtlVerifyUpdateFunction.java:101)
>       at 
> org.apache.flink.streaming.tests.TtlVerifyUpdateFunction.flatMap(TtlVerifyUpdateFunction.java:88)
>       at 
> org.apache.flink.streaming.tests.TtlVerifyUpdateFunction.flatMap(TtlVerifyUpdateFunction.java:67)
>       at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>       at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:284)
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:155)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:445)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
>       at java.lang.Thread.run(Thread.java:834)
> It is cause by the MonotonicTTLTimeProvider:freeze and 
> MonotonicTTLTimeProvider:unfreezeTime called by multithread when 
> taskmanager.numberOfTaskSlots set greater than 1. We could set it to 1 in 
> test_stream_state_ttl.sh. That will fix the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to