[ 
https://issues.apache.org/jira/browse/FLINK-27462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob updated FLINK-27462:
------------------------------
    Description: 
I found that it could not trigger checkpoints despite I had enable it in 
environment. 

I think this problem may be related to I had used a table function in my DML. 
When I deploy the application the task dedicated to the table function turned 
to be finished immediately despite the table function had declarate the 
property `isDeterministic` to false.

Below is my basic code to recur the issue:

 
{code:java}
// table function
public class GetStreamingModelSinkFilter extends TableFunction<Integer> {
    private boolean status = false;
    private String statusRedisValue;
    private final String redisKeyOfStatus;
    private final RedisInfo redisInfo;
    private RedisManager redisManager;
    private RedisCommands<String, String> redisCommands;
    private long lastCheckTimestamp = 0L;
    private long currentTimestamp;

    public GetStreamingModelSinkFilter() {
        ...initial something
        }
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        redisManager = new RedisManager(redisInfo);
        redisCommands = redisManager.getCommands();
    }

    @Override
    public boolean isDeterministic() {
        return false;
    }

    public void eval() {
        if (status) {
            collect(1);
        } else {
            currentTimestamp = System.currentTimeMillis();
            if (currentTimestamp - lastCheckTimestamp < 1000) {
                collect(0);
            } else {
                statusRedisValue = redisCommands.get(redisKeyOfStatus);
                if (Objects.equals(statusRedisValue, "1")) {
                    status = true;
                    collect(1);
                } else {
                    lastCheckTimestamp = currentTimestamp;
                    collect(0);
                }
            }

        }
    }

    @Override
    public void close() throws Exception {
        redisManager.close();
    }
} {code}
Below's the DML:

 

 
{code:java}
INSERT INTO TEST SELECT .... T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
T(MARK) ON TRUE WHERE MARK = 1
{code}
TASKS SNAPSHOT OF THE APPLICATION

!image-2022-05-01-11-22-03-127.png!

!image-2022-05-01-11-23-07-063.png!

!image-2022-05-01-11-23-24-938.png!

 

  was:
I found that it could not trigger checkpoints despite I had enable it in 
environment. 

I think this problem may be related to I had used a table function in my DML. 
When I deploy the application the task dedicated to the table function turned 
to be finished immediately despite the table function had declarate the 
property `isDeterministic` to false.

Below is my basic code to recur the issue:

 
{code:java}
// table function
public class GetStreamingModelSinkFilter extends TableFunction<Integer> {
    private boolean status = false;
    private String statusRedisValue;
    private final String redisKeyOfStatus;
    private final RedisInfo redisInfo;
    private RedisManager redisManager;
    private RedisCommands<String, String> redisCommands;
    private long lastCheckTimestamp = 0L;
    private long currentTimestamp;

    public GetStreamingModelSinkFilter() {
        ...initial something
        }
    }

    @Override
    public void open(FunctionContext context) throws Exception {
        redisManager = new RedisManager(redisInfo);
        redisCommands = redisManager.getCommands();
    }

    @Override
    public boolean isDeterministic() {
        return false;
    }

    public void eval() {
        if (status) {
            collect(1);
        } else {
            currentTimestamp = System.currentTimeMillis();
            if (currentTimestamp - lastCheckTimestamp < 1000) {
                collect(0);
            } else {
                statusRedisValue = redisCommands.get(redisKeyOfStatus);
                if (Objects.equals(statusRedisValue, "1")) {
                    status = true;
                    collect(1);
                } else {
                    lastCheckTimestamp = currentTimestamp;
                    collect(0);
                }
            }

        }
    }

    @Override
    public void close() throws Exception {
        redisManager.close();
    }
} {code}
Below's the DML:

 

 
{code:java}
INSERT INTO TEST SELECT .... T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
T(MARK) ON TRUE WHERE MARK = 1
{code}
TASKS SNAPSHOT OF THE APPLICATION

!image-2022-05-01-11-22-03-127.png!

 


> could not trigger checkpoint when using table function
> ------------------------------------------------------
>
>                 Key: FLINK-27462
>                 URL: https://issues.apache.org/jira/browse/FLINK-27462
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.14.3
>            Reporter: Spongebob
>            Priority: Major
>         Attachments: image-2022-05-01-11-22-03-127.png, 
> image-2022-05-01-11-23-07-063.png, image-2022-05-01-11-23-24-938.png
>
>
> I found that it could not trigger checkpoints despite I had enable it in 
> environment. 
> I think this problem may be related to I had used a table function in my DML. 
> When I deploy the application the task dedicated to the table function turned 
> to be finished immediately despite the table function had declarate the 
> property `isDeterministic` to false.
> Below is my basic code to recur the issue:
>  
> {code:java}
> // table function
> public class GetStreamingModelSinkFilter extends TableFunction<Integer> {
>     private boolean status = false;
>     private String statusRedisValue;
>     private final String redisKeyOfStatus;
>     private final RedisInfo redisInfo;
>     private RedisManager redisManager;
>     private RedisCommands<String, String> redisCommands;
>     private long lastCheckTimestamp = 0L;
>     private long currentTimestamp;
>     public GetStreamingModelSinkFilter() {
>         ...initial something
>         }
>     }
>     @Override
>     public void open(FunctionContext context) throws Exception {
>         redisManager = new RedisManager(redisInfo);
>         redisCommands = redisManager.getCommands();
>     }
>     @Override
>     public boolean isDeterministic() {
>         return false;
>     }
>     public void eval() {
>         if (status) {
>             collect(1);
>         } else {
>             currentTimestamp = System.currentTimeMillis();
>             if (currentTimestamp - lastCheckTimestamp < 1000) {
>                 collect(0);
>             } else {
>                 statusRedisValue = redisCommands.get(redisKeyOfStatus);
>                 if (Objects.equals(statusRedisValue, "1")) {
>                     status = true;
>                     collect(1);
>                 } else {
>                     lastCheckTimestamp = currentTimestamp;
>                     collect(0);
>                 }
>             }
>         }
>     }
>     @Override
>     public void close() throws Exception {
>         redisManager.close();
>     }
> } {code}
> Below's the DML:
>  
>  
> {code:java}
> INSERT INTO TEST SELECT .... T1 JOIN LATERAL TABLE (MY_TABLE_FUNCTION()) AS 
> T(MARK) ON TRUE WHERE MARK = 1
> {code}
> TASKS SNAPSHOT OF THE APPLICATION
> !image-2022-05-01-11-22-03-127.png!
> !image-2022-05-01-11-23-07-063.png!
> !image-2022-05-01-11-23-24-938.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to