[
https://issues.apache.org/jira/browse/FLINK-20552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17252928#comment-17252928
]
mei jie commented on FLINK-20552:
---------------------------------
Hi, [~jark]
Here is my test code, it seems unable create SinkOperator from TableResult. so
i think maybe try to use JdbcDynamicTableSink to create SinkOperator. Look
forward your advices, I will try it again tomorrow night.
{code:java}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
List<Long> data = new LinkedList<>();
data.add(1l);
data.add(2l);
DataStream<Long> stream = env.fromCollection(data);
// Transformation<Long> transformation =
stream.getTransformation();
// StreamSource<Long, ?> operator =
((LegacySourceTransformation<Long>) transformation).getOperator();
// operator.setup(
// new
SourceOperatorStreamTask<Long>(SourceOperatorStateContextUtil.getTestingEnvironment()),
// new MockStreamConfig(new Configuration(), 1),
// new MockOutput<>(new ArrayList<>()));
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env);
Table t = tEnv.fromDataStream(stream, $("id"));
tEnv.createTemporaryView("data_table", t);
tEnv.executeSql("CREATE TABLE checkpoint_sink (\n" +
" id BIGINT\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '" + DB_URL + "',\n" +
" 'table-name' = '" + OUTPUT_TABLE5 + "',\n" +
" 'sink.buffer-flush.interval' = '0'\n" + // disable
async flush
")");
tEnv.executeSql("INSERT INTO checkpoint_sink \n" +
"SELECT id from data_table");
// operator.initializeState(new StreamTaskStateInitializerImpl(
// SourceOperatorStateContextUtil.getTestingEnvironment(),
new MemoryStateBackend()));
// operator.snapshotState(new
StateSnapshotContextSynchronousImpl(100L, 100L));
check(new Row[]{Row.of(1l), Row.of(2l)}, DB_URL, OUTPUT_TABLE5,
new String[]{"id"});
{code}
> JdbcDynamicTableSink doesn't sink buffered data on checkpoint
> -------------------------------------------------------------
>
> Key: FLINK-20552
> URL: https://issues.apache.org/jira/browse/FLINK-20552
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC, Table SQL / Ecosystem
> Reporter: mei jie
> Assignee: mei jie
> Priority: Major
> Labels: starter
> Fix For: 1.13.0
>
>
> JdbcBatchingOutputFormat is wrapped to OutputFormatSinkFunction``` when
> createSinkTransformation at CommonPhysicalSink class. but
> OutputFormatSinkFunction don't implement CheckpointedFunction interface, so
> the flush method of JdbcBatchingOutputFormat can't be called when checkpoint
--
This message was sent by Atlassian Jira
(v8.3.4#803005)