[ https://issues.apache.org/jira/browse/FLINK-15728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17020991#comment-17020991 ]
Bhagavan commented on FLINK-15728: ---------------------------------- Sorry, should have added more context. Added, H2Dialect as below (i.e. no support for UPSERT) which similar to Derby other than driver name URL Note: not overriding method * _getUpsertStatement_* from JDBCDialect similar to derby. So this would use *UpsertWriterUsingInsertUpdateStatement* {code:java} private static class H2Dialect implements JDBCDialect { private static final long serialVersionUID = 1L; @Override public boolean canHandle(String url) { return url.startsWith("jdbc:h2:"); } @Override public Optional<String> defaultDriverName() { return Optional.of("org.h2.Driver"); } @Override public String quoteIdentifier(String identifier) { return identifier; } }{code} With this, if you run the test JDBCUpsertOutputFormatTest. You will get {code:java} Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#<index>" is not set [90012-200] {code} Also, with current *UpsertWriter* implementation Dialect upsert statement can only have bind parameters ('?') same length as Row and in the same order {code:sql} //MySql uses VALUES so each num of ? is same as Row length INSERT INTO `TAB`(`id`, `msg`) VALUES (?, ?) ON DUPLICATE KEY UPDATE `id`=VALUES(`id`), `msg`=VALUES(`msg`) //postgres uses EXCLUDED so each num of ? is same as Row length INSERT INTO TAB(id, msg) VALUES (?, ?) ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, msg=EXCLUDED.msg {code} But if I write oracle upsert as below, setRecordToStatement in UpsertWriter.. will not work {code:sql} MERGE INTO TAB USING dual ON ( "id"=? ) WHEN MATCHED THEN UPDATE SET "msg"=? WHEN NOT MATCHED THEN INSERT ("id","msg") VALUES ( ?, ? ) {code} Hope, I have clarified the issue. > JDBCUpsertOutputFormat does not set bind parameter keyFields in > updateStatement > ------------------------------------------------------------------------------- > > Key: FLINK-15728 > URL: https://issues.apache.org/jira/browse/FLINK-15728 > Project: Flink > Issue Type: Bug > Components: Connectors / JDBC > Affects Versions: 1.9.1 > Reporter: Bhagavan > Assignee: Bhagavan > Priority: Major > > When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses > UpsertWriterUsingInsertUpdateStatement, code fails with below error. > {code:java} > Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set > [90012-200] > at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) > at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) > at org.h2.message.DbException.get(DbException.java:205) > at org.h2.message.DbException.get(DbException.java:181) > at org.h2.expression.Parameter.checkSet(Parameter.java:83) > at > org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) > at > org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233) > > at > org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111) > {code} > This is due to UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch > does not set all bind paramters in case of Update. > This bug does get surfaced while using Derby DB. > In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce > the bug. > The fix is trivial. Happy to raise PR. > {code:java} > //for update case replace below > setRecordToStatement(updateStatement, fieldTypes, row); > //with > setRecordToStatement(updateStatement, fieldTypes + pkTypes, row + pkRow); > //NOTE: as prepared updateStatement contains additional where clause we need > pass additional bind values and its sql Types > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)