Hi Tamas,

I think `StreamStatementSet.attachAsDataStream()` is what you are looking
for.
Here is detailed documentation. Please take a look:
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#adding-table-api-pipelines-to-datastream-api

Best,
Jark


On Fri, 5 Aug 2022 at 10:06, Tamas Kiss <tak...@cloudera.com.invalid> wrote:

> Hi Experts,
>
> We are trying to upgrade to flink-1.1.5 and have to refactor our code due
> to the following method has been removed from TableEnvironment: void
> sqlUpdate(String sql). In our code we mix the SQL and DataStream API and
> use the sqlUpdate method to buffer several insert statements, also add
> other inserts by getting and configuring retractStream from the same
> TableEnvironment object. We then call tableEnv.executeAsync() in a later
> step and get all the previous inserts working in one Flink job.
>
> This FLIP
> <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >
> suggests
> to use the following pattern instead of using sqlUpdate
>
> createStatementSet() -> StatementSet -> execute()
>
> This basically works for the inserts defined via sqlUpdate but the others
> managed through DataStream API do not get called.
>
> I was considering the following workarounds but all of them have issues:
> 1. Replacing sqlUpdate with executeSql does not work because this submits
> the job immediately and does not let create multiple inserts.
> 2. Replacing calls to DataStream API with SQL API. This can work but
> requires further investigation and seems to be a lot of work at the moment.
> 3. Keeping both and calling statementSet.execute() and
> tableEnv.executeAsync() leads to have two separate jobs that would be hard
> to manage.
>
> So at the moment the only way I can think of is to use StatementSet and mix
> it with calls to the DataStream API. Is there any solution to make this
> scenario work and have only Flink job executed?
>
> Thanks
> Tamas
>

Reply via email to