JingsongLi commented on code in PR #580:
URL: https://github.com/apache/flink-table-store/pull/580#discussion_r1126161296
##########
flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/action/ActionBase.java:
##########
@@ -125,12 +131,41 @@ protected boolean compatibleCheck(List<DataType>
actualTypes, List<DataType> exp
return true;
}
- /** Sink {@link DataStream} dataStream to table. */
- protected void sink(DataStream<RowData> dataStream) throws Exception {
- new FlinkSinkBuilder((FileStoreTable) table)
- .withInput(dataStream)
-
.withLockFactory(Lock.factory(catalog.lockFactory().orElse(null), identifier))
- .build();
- env.execute();
+ /** Sink {@link DataStream} dataStream to table with Flink Table API in
batch environment. */
+ protected void batchSink(DataStream<RowData> dataStream) {
+ List<Transformation<?>> transformations =
+ Collections.singletonList(
+ new FlinkSinkBuilder((FileStoreTable) table)
+ .withInput(dataStream)
+ .withLockFactory(
+ Lock.factory(
+
catalog.lockFactory().orElse(null), identifier))
+ .build()
+ .getTransformation());
+
+ List<String> sinkIdentifierNames =
Collections.singletonList(identifier.getFullName());
+
+ // invoke TableEnvironmentImpl#executeInternal through reflecting
+ try {
+ // for Flink 1.14
Review Comment:
This looks more complicated to maintain.
Can we just introduce a class `TableEnvironmentUtils` for this? 1.14 and
common can have their own classes.
And you can add a simple ITCase for 1.14, 1.15 too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]