[
https://issues.apache.org/jira/browse/FLINK-32600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-32600:
-----------------------------------
Labels: pull-request-available (was: )
> SQL Server JDBC Connector wrong quoteIdentifier + issue when there is no
> update statement in upsert statement
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-32600
> URL: https://issues.apache.org/jira/browse/FLINK-32600
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: jdbc-3.1.1
> Reporter: Maxime Radigue
> Assignee: Maxime Radigue
> Priority: Major
> Labels: pull-request-available
>
> Hi, i've discovered 2 Issues in the MS SQL JDBC connector. They are located
> in the file SqlServerDialect.java
>
> * the MS SQL Server quote identifier is [identifier].
> * Assuming you want to upsert in this sql table: MyTable(id1,id2) with
> primary key is the coumpound (id1,id2). There is an syntax error because
> there are no fields to update
>
> These are the fix I've tested against the 2 functions involved:
>
> *@Override*
> *public String quoteIdentifier(String identifier) {*
> return "[" + identifier + "]";
> *}*
> *@Override*
> *public Optional<String> getUpsertStatement(*
> String tableName, String[] fieldNames, String[] uniqueKeyFields) {
> List<String> nonUniqueKeyFields =
> Arrays.stream(fieldNames)
> .filter(f ->
> !Arrays.asList(uniqueKeyFields).contains(f))
> .collect(Collectors.toList());
> String fieldsProjection =
> Arrays.stream(fieldNames)
> .map(this::quoteIdentifier)
> .collect(Collectors.joining(", "));
> String valuesBinding =
> Arrays.stream(fieldNames)
> .map(f -> ":" + f + " " + quoteIdentifier(f))
> .collect(Collectors.joining(", "));
> String usingClause = String.format("SELECT %s", valuesBinding);
> String onConditions =
> Arrays.stream(uniqueKeyFields)
> .map(
> f ->
> "[TARGET]."
> + quoteIdentifier(f)
> + "=[SOURCE]."
> + quoteIdentifier(f))
> .collect(Collectors.joining(" AND "));
> String updateSetClause =
> nonUniqueKeyFields.stream()
> .map(
> f ->
> "[TARGET]."
> + quoteIdentifier(f)
> + "=[SOURCE]."
> + quoteIdentifier(f))
> .collect(Collectors.joining(", "));
> String insertValues =
> Arrays.stream(fieldNames)
> .map(f -> "[SOURCE]." + quoteIdentifier(f))
> .collect(Collectors.joining(", "));
> StringBuilder sb = new StringBuilder();
> sb.append(
> String.format(
> "MERGE INTO %s AS [TARGET] USING (%s) AS [SOURCE] ON
> (%s)",
> quoteIdentifier(tableName), usingClause,
> onConditions));
> if (StringUtils.isNotEmpty(updateSetClause)) {
> sb.append(String.format(" WHEN MATCHED THEN UPDATE SET %s",
> updateSetClause));
> }
> sb.append(
> String.format(
> " WHEN NOT MATCHED THEN INSERT (%s) VALUES (%s);",
> fieldsProjection, insertValues));
> return Optional.of(sb.toString());
> *}*
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)