[
https://issues.apache.org/jira/browse/FLINK-34001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17803483#comment-17803483
]
yong yang commented on FLINK-34001:
-----------------------------------
{code:java}
//代码占位符
// plan1.json
{
"flinkVersion" : "1.18",
"nodes" : [ {
"id" : 7,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`s1`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "id",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "name",
"dataType" : "VARCHAR(2147483647)"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ],
"options" : {
"properties.bootstrap.servers" : "localhost:9092",
"connector" : "kafka",
"format" : "csv",
"topic" : "s1",
"properties.group.id" : "g1",
"scan.startup.mode" : "latest-offset"
}
}
}
},
"outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647)>",
"description" : "TableSourceScan(table=[[default_catalog, default_database,
s1]], fields=[id, name])",
"inputProperties" : [ ]
}, {
"id" : 8,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "HASH",
"keys" : [ 0 ]
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647)>",
"description" : "Exchange(distribution=[hash[id]])"
}, {
"id" : 9,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`s2`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "id",
"dataType" : "VARCHAR(2147483647) NOT NULL"
}, {
"name" : "age",
"dataType" : "INT"
} ],
"watermarkSpecs" : [ ],
"primaryKey" : {
"name" : "PK_3386",
"type" : "PRIMARY_KEY",
"columns" : [ "id" ]
}
},
"partitionKeys" : [ ],
"options" : {
"properties.bootstrap.servers" : "localhost:9092",
"key.format" : "csv",
"topic" : "s3",
"connector" : "upsert-kafka",
"value.format" : "csv"
}
}
}
},
"outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
"description" : "TableSourceScan(table=[[default_catalog, default_database,
s2]], fields=[id, age])",
"inputProperties" : [ ]
}, {
"id" : 10,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "HASH",
"keys" : [ 0 ]
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
"description" : "Exchange(distribution=[hash[id]])"
}, {
"id" : 11,
"type" : "stream-exec-changelog-normalize_1",
"configuration" : {
"table.exec.mini-batch.enabled" : "false",
"table.exec.mini-batch.size" : "-1"
},
"uniqueKeys" : [ 0 ],
"generateUpdateBefore" : true,
"state" : [ {
"index" : 0,
"ttl" : "0 ms",
"name" : "changelogNormalizeState"
} ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
"description" : "ChangelogNormalize(key=[id])"
}, {
"id" : 12,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "HASH",
"keys" : [ 0 ]
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` VARCHAR(2147483647) NOT NULL, `age` INT>",
"description" : "Exchange(distribution=[hash[id]])"
}, {
"id" : 13,
"type" : "stream-exec-join_1",
"joinSpec" : {
"joinType" : "INNER",
"leftKeys" : [ 0 ],
"rightKeys" : [ 0 ],
"filterNulls" : [ true ],
"nonEquiCondition" : null
},
"rightUpsertKeys" : [ [ 0 ] ],
"state" : [ {
"index" : 0,
"ttl" : "0 ms",
"name" : "leftState"
}, {
"index" : 1,
"ttl" : "0 ms",
"name" : "rightState"
} ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
}, {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647),
`id0` VARCHAR(2147483647) NOT NULL, `age` INT>",
"description" : "Join(joinType=[InnerJoin], where=[(id = id0)], select=[id,
name, id0, age], leftInputSpec=[NoUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey])"
}, {
"id" : 14,
"type" : "stream-exec-calc_1",
"projection" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : "VARCHAR(2147483647)"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 1,
"type" : "VARCHAR(2147483647)"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 3,
"type" : "INT"
} ],
"condition" : null,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647),
`age` INT>",
"description" : "Calc(select=[id, name, age])"
}, {
"id" : 15,
"type" : "stream-exec-sink_1",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.rowtime-inserter" : "ENABLED",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`sink1`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "id",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "name",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "age",
"dataType" : "INT"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ],
"options" : {
"connector" : "print",
"print-identifier" : ">>>>"
}
}
}
},
"inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER",
"DELETE" ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`id` VARCHAR(2147483647), `name` VARCHAR(2147483647),
`age` INT>",
"description" : "Sink(table=[default_catalog.default_database.sink1],
fields=[id, name, age])"
} ],
"edges" : [ {
"source" : 7,
"target" : 8,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 9,
"target" : 10,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 10,
"target" : 11,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 11,
"target" : 12,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 8,
"target" : 13,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 12,
"target" : 13,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 13,
"target" : 14,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 14,
"target" : 15,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}{code}
> doc diff from code
> ------------------
>
> Key: FLINK-34001
> URL: https://issues.apache.org/jira/browse/FLINK-34001
> Project: Flink
> Issue Type: Bug
> Components: API / State Processor
> Affects Versions: 1.18.0
> Reporter: yong yang
> Priority: Major
>
> doc:
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/concepts/overview/#idle-state-retention-time
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which
> means the state retention is not enabled.
>
> but i test find :
> The current TTL value for both left and right side is {{{}"0 ms"{}}}, which
> means the state is permanence keep!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)