[
https://issues.apache.org/jira/browse/FLINK-17459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17096221#comment-17096221
]
ranqiqiang commented on FLINK-17459:
------------------------------------
Thanks , Maybe I did wrong ! use JDBCUpsertTableSink to append
code like:
{code:java}
//代码占位符
tableEnv.createTemporaryView("test",streamSource);
Table appendTable = tableEnv.sqlQuery("select order_id,user_id,status from
test");
DataStream stream = tableEnv.toAppendStream(appendTable, Row.class);
JDBCUpsertTableSink sink = JDBCUpsertTableSink.builder()
.setOptions(options)
.setTableSchema(schema)
.setFlushIntervalMills(3000)
.build();
// error : java.lang.UnsupportedOperationException: JDBCUpsertTableSink can not
support
// If I add "sink.setIsAppendOnly(true)"
// error : org.apache.flink.types.Row cannot be cast to
org.apache.flink.api.java.tuple.Tuple2
// so I do like :
DataStream stream = tableEnv.toRetractStream(appendTable, Row.class);
sink.setIsAppendOnly(true)
// result is OK ! I just want to insert/append data
// I don't know where to set by framework ?
if (!isAppendOnly && (keyFields == null || keyFields.length == 0)) {
throw new UnsupportedOperationException("JDBCUpsertTableSink can not support
");
}
{code}
> JDBCAppendTableSink not support flush by flushIntervalMills
> --------------------------------------------------------------
>
> Key: FLINK-17459
> URL: https://issues.apache.org/jira/browse/FLINK-17459
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / JDBC
> Affects Versions: 1.10.0
> Reporter: ranqiqiang
> Priority: Major
>
> {{JDBCAppendTableSink just support append by
> "JDBCAppendTableSinkBuilder#batchSize",}}{{not support like
> "JDBCUpsertTableSink#flushIntervalMills"}}
>
> {{If batchSize=5000 , my data rows=5000*N+1 ,then last one record could not
> be append !!}}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)