[
https://issues.apache.org/jira/browse/FLINK-28345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17561403#comment-17561403
]
Feng Jin commented on FLINK-28345:
----------------------------------
I think we can't solve this in the Dialect. Here is the error message, The
error message comes from inside the Driver. I think the better way is to skip
the empty batch statement before flush
{code:java}
//代码占位符
Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:154)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:160)
at
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49)
at
org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:73)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:113)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:94)
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:40)
at
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:163)
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
at
org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
at
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:128)
at
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:275)
at
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:67)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:619)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:583)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: java.sql.SQLException: Please call addBatch
method at least once before batch execution
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:190)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.lambda$open$0(JdbcBatchingOutputFormat.java:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.sql.SQLException: Please call addBatch method at least once
before batch execution
at
com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:43)
at
com.clickhouse.jdbc.SqlExceptionUtils.emptyBatchError(SqlExceptionUtils.java:111)
at
com.clickhouse.jdbc.internal.InputBasedPreparedStatement.executeAny(InputBasedPreparedStatement.java:95)
at
com.clickhouse.jdbc.internal.AbstractPreparedStatement.executeLargeBatch(AbstractPreparedStatement.java:85)
at
com.clickhouse.jdbc.internal.ClickHouseStatementImpl.executeBatch(ClickHouseStatementImpl.java:568)
at
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor.executeBatch(TableBufferedStatementExecutor.java:64)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216)
at
org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184)
... 8 more{code}
> Flink JDBC connector should check batch count before flush
> ----------------------------------------------------------
>
> Key: FLINK-28345
> URL: https://issues.apache.org/jira/browse/FLINK-28345
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / JDBC
> Affects Versions: 1.15.0, 1.14.5
> Reporter: Feng Jin
> Priority: Major
>
> org.apache.flink.connector.jdbc.internal.JdbcOutputFormat#flush
> {code:java}
> //代码占位符
> @Override
> public synchronized void flush() throws IOException {
> checkFlushException();
> for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
> try {
> attemptFlush();
> batchCount = 0;
> break;
> ....{code}
> When flush the batch, we should check batchCount is grater than 0. Other
> wise it would cause some problem with some drivers that do not support empty
> batches, like clickhouse jdbc driver.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)