[
https://issues.apache.org/jira/browse/FLINK-14524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu resolved FLINK-14524.
-----------------------------
Fix Version/s: 1.9.2
1.10.0
Resolution: Fixed
1.10.0: 1cefbb8b5d0ba0e687635441c04d1790e33ef76c
1.9.2: 7086a8958a40804a604171483d2f374de236cfdf
> PostgreSQL JDBC sink generates invalid SQL in upsert mode
> ---------------------------------------------------------
>
> Key: FLINK-14524
> URL: https://issues.apache.org/jira/browse/FLINK-14524
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.10.0, 1.9.1
> Reporter: Fawad Halim
> Assignee: Fawad Halim
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.10.0, 1.9.2
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> The "upsert" query generated for the PostgreSQL dialect is missing a closing
> parenthesis in the ON CONFLICT clause, causing the INSERT statement to error
> out with the error
>
> {{ERROR o.a.f.s.runtime.tasks.StreamTask - Error during disposal of stream
> operator.}}
> {{java.lang.RuntimeException: Writing records to JDBC failed.}}
> {{ at
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.checkFlushException(JDBCUpsertOutputFormat.java:135)}}
> {{ at
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.close(JDBCUpsertOutputFormat.java:184)}}
> {{ at
> org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.close(JDBCUpsertSinkFunction.java:61)}}
> {{ at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)}}
> {{ at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117)}}
> {{ at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:585)}}
> {{ at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:484)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)}}
> {{ at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)}}
> {{ at java.lang.Thread.run(Thread.java:748)}}
> {{Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO
> "public.temperature"("id", "timestamp", "temperature") VALUES ('sensor_17',
> '2019-10-25 00:39:10-05', 20.27573964210997) ON CONFLICT ("id", "timestamp"
> DO UPDATE SET "id"=EXCLUDED."id", "timestamp"=EXCLUDED."timestamp",
> "temperature"=EXCLUDED."temperature" was aborted: ERROR: syntax error at or
> near "DO"}}
> {{ Position: 119 Call getNextException to see other errors in the batch.}}
> {{ at
> org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:163)}}
> {{ at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:838)}}
> {{ at
> org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1546)}}
> {{ at
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingUpsertStatement.internalExecuteBatch(UpsertWriter.java:177)}}
> {{ at
> org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:117)}}
> {{ at
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)}}
> {{ at
> org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)}}
> {{ at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)}}
> {{ at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)}}
> {{ at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)}}
> {{ at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)}}
> {{ at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)}}
> {{ at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)}}
> {{ ... 1 common frames omitted}}
> {{Caused by: org.postgresql.util.PSQLException: ERROR: syntax error at or
> near "DO"}}
> {{ Position: 119}}
> {{ at
> org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2497)}}
> {{ at
> org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2233)}}
> {{ at
> org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:310)}}
> {{ at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:834)}}
> {{ ... 12 common frames omitted}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)