Qiu created FLINK-35929:
---------------------------
Summary: In flink insert mode, it supports modifying the
parallelism of jdbc sink when the parallelism of source and sink is the same.
Key: FLINK-35929
URL: https://issues.apache.org/jira/browse/FLINK-35929
Project: Flink
Issue Type: Improvement
Components: Connectors / JDBC
Affects Versions: jdbc-3.1.2
Reporter: Qiu
Attachments: image-2024-07-30-19-57-45-033.png,
image-2024-07-30-19-57-50-868.png
In insert mode, when the source and sink parallelism are consistent, if you
reduce or increase the jdbc sink parallelism, the SQL verification will report
an error. The following is the error message.
configured sink parallelism is: 8, while the input parallelism is: -1. Since
configured parallelism is different from input parallelism and the changelog
mode contains [INSERT,UPDATE_AFTER,DELETE], which is not INSERT_ONLY mode,
primary key is required but no primary key is found!
{code:java}
//代码占位符
module: flink-connector-jdbc
class: org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
validatePrimaryKey(requestedMode);
ChangelogMode.Builder changelogModeBuilder = ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT);
if (tableSchema.getPrimaryKey().isPresent()) {
changelogModeBuilder
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE);
}
return changelogModeBuilder.build();
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)