[
https://issues.apache.org/jira/browse/FLINK-35929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869788#comment-17869788
]
Qiu commented on FLINK-35929:
-----------------------------
@[~githubbot] Hello, please help me look into this issue.
> In flink insert mode, it supports modifying the parallelism of jdbc sink when
> the parallelism of source and sink is the same.
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35929
> URL: https://issues.apache.org/jira/browse/FLINK-35929
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / JDBC
> Affects Versions: jdbc-3.1.2
> Reporter: Qiu
> Priority: Major
>
> In insert mode, when the source and sink parallelism are consistent, if you
> reduce or increase the jdbc sink parallelism, the SQL verification will
> report an error. The following is the error message.
> configured sink parallelism is: 8, while the input parallelism is: -1. Since
> configured parallelism is different from input parallelism and the changelog
> mode contains [INSERT,UPDATE_AFTER,DELETE], which is not INSERT_ONLY mode,
> primary key is required but no primary key is found!
> {code:java}
> //代码占位符
> module: flink-connector-jdbc
> class: org.apache.flink.connector.jdbc.table.JdbcDynamicTableSink
> public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
> validatePrimaryKey(requestedMode);
> ChangelogMode.Builder changelogModeBuilder = ChangelogMode.newBuilder()
> .addContainedKind(RowKind.INSERT);
> if (tableSchema.getPrimaryKey().isPresent()) {
> changelogModeBuilder
> .addContainedKind(RowKind.UPDATE_AFTER)
> .addContainedKind(RowKind.DELETE);
> }
> return changelogModeBuilder.build();
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)