[
https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803482#comment-17803482
]
yong yang commented on FLINK-34001:
-----------------------------------
{code:java}
//代码占位符
package com.yy.state.OperatorStateTTL
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.{StreamStatementSet,
StreamTableEnvironment}
import java.time.ZoneId
import scala.util.Random
/**
* 看下 EXECUTE PLAN 和 statementset 怎么结合
* 适配flink web ui 通过打包为jar提交flink任务 报错: Cannot have more than one execute() or
executeAsync() call in a single environment
* idea执行没有问题
* 参考: https://blog.csdn.net/tianlangstudio/article/details/123086300
*/
object TTLDemoV2 {
def main(args: Array[String]): Unit = {
val conf = new Configuration
conf.setInteger(RestOptions.PORT, 28080)
// 从指定的checkpoint启动 不配置则无状态启动
//
conf.setString("execution.savepoint.path","file:///Users/thomas990p/checkpointdir/41f86441111ad4492002188d8d4e1009/chk-136")
// 不用local 使用下面的方式 就可以使用多个 execute sql 且都可以生效
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 本地任务停止时,保留 checkpoint 数据
env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.setParallelism(1)
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE)
env.setStateBackend(new
FsStateBackend("file:///Users/thomas990p/checkpointdir"))
env.disableOperatorChaining() // 禁用全局任务链
val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
// 指定国内时区
tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
tEnv.executeSql(
"""
|CREATE TABLE s1 (
| id String
| ,name string
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 's1',
| 'scan.startup.mode' = 'latest-offset',
| 'properties.group.id' = 'g1',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'format' = 'csv'
|)
|""".stripMargin)
/*
不允许配置: 'scan.startup.mode' = 'latest-offset' 否则报错:
Unsupported options found for 'upsert-kafka'. 他必须从earliest消费 无法修改
*/
tEnv.executeSql(
"""
|CREATE TABLE s2 (
| id String
| ,age int
| ,primary key(id) not enforced
|) WITH (
| 'connector' = 'upsert-kafka',
| 'topic' = 's3',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'key.format' = 'csv'
| ,'value.format' = 'csv'
|)
|""".stripMargin)
tEnv.executeSql(
"""
|CREATE TABLE s1_sink1 (
| id String
| ,name string
|) WITH (
| 'connector' = 'print'
| ,'print-identifier'='s1 sink>>>>'
|)
|""".stripMargin)
tEnv.executeSql(
"""
|CREATE TABLE s2_sink1 (
| id String
| ,age int
|) WITH (
| 'connector' = 'print'
| ,'print-identifier'='s2 sink>>>>'
|)
|""".stripMargin)
tEnv.executeSql(
"""
|CREATE TABLE sink1 (
| id String
| ,name string
| ,age int
|) WITH (
| 'connector' = 'print'
| ,'print-identifier'='>>>>'
|)
|""".stripMargin)
tEnv.executeSql("insert into s1_sink1 select * from s1")
tEnv.executeSql("insert into s2_sink1 select * from s2")
tEnv.executeSql("EXECUTE PLAN
'file:///Users/thomas990p/flink-plain/plan1.json' ") // two 0ms keep left and
right state forever
}
}
{code}
> doc diff from code
> ------------------
>
> Key: FLINK-34001
> URL: https://issues.apache.org/jira/browse/FLINK-34001
> Project: Flink
> Issue Type: Bug
> Components: API / State Processor
> Affects Versions: 1.18.0
> Reporter: yong yang
> Priority: Major
>
> doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which
> means the state retention is not enabled.
>
> but i test find :
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which
> means the state is permanence keep!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)