BTW, If there are no other more blocking issue / comments, I would like to start a VOTE in another thread this wednesday 6.14
Thanks, Aitozi. Aitozi <gjying1...@gmail.com> 于2023年6月12日周一 23:34写道: > Hi, Jing, > Thanks for your explanation. I get your point now. > > For the performance part, I think it's a good idea to run with returning a > big table case, the memory consumption > should be a point to be taken care about. Because in the ordered mode, the > head element in buffer may affect the > total memory consumption. > > > Thanks, > Aitozi. > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月12日周一 20:28写道: > >> Hi Aitozi, >> >> Which key will be used for lookup is not an issue, only one row will be >> required for each key in order to enrich it. True, it depends on the >> implementation whether multiple rows or single row for each key will be >> returned. However, for the lookup & enrichment scenario, one row/key is >> recommended, otherwise, like I mentioned previously, enrichment won't >> work. >> >> I am a little bit concerned about returning a big table for each key, >> since >> it will take the async call longer to return and need more memory. The >> performance tests should cover this scenario. This is not a blocking issue >> for this FLIP. >> >> Best regards, >> Jing >> >> On Sat, Jun 10, 2023 at 4:11 AM Aitozi <gjying1...@gmail.com> wrote: >> >> > Hi Jing, >> > I means the join key is not necessary to be the primary key or >> unique >> > index of the database. >> > In this situation, we may queried out multi rows for one join key. I >> think >> > that's why the >> > LookupFunction#lookup will return a collection of RowData. >> > >> > BTW, I think the behavior of lookup join will not affect the semantic of >> > the async udtf. >> > We use the Async TableFunction here and the table function can collect >> > multiple rows. >> > >> > Thanks, >> > Atiozi. >> > >> > >> > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月10日周六 00:15写道: >> > >> > > Hi Aitozi, >> > > >> > > The keyRow used in this case contains all keys[1]. >> > > >> > > Best regards, >> > > Jing >> > > >> > > [1] >> > > >> > > >> > >> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49 >> > > >> > > >> > > On Fri, Jun 9, 2023 at 3:42 PM Aitozi <gjying1...@gmail.com> wrote: >> > > >> > > > Hi Jing, >> > > > >> > > > The performance test is added to the FLIP. >> > > > >> > > > As I know, The lookup join can return multi rows, it depends on >> > > > whether the join key >> > > > is the primary key of the external database or not. The `lookup` [1] >> > will >> > > > return a collection of >> > > > joined result, and each of them will be collected >> > > > >> > > > >> > > > [1]: >> > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L52 >> > > > >> > > > >> > > > Thanks, >> > > > Aitozi. >> > > > >> > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 17:05写道: >> > > > >> > > > > Hi Aitozi, >> > > > > >> > > > > Thanks for the feedback. Looking forward to the performance tests. >> > > > > >> > > > > Afaik, lookup returns one row for each key [1] [2]. Conceptually, >> the >> > > > > lookup function is used to enrich column(s) from the dimension >> table. >> > > If, >> > > > > for the given key, there will be more than one row, there will be >> no >> > > way >> > > > to >> > > > > know which row will be used to enrich the key. >> > > > > >> > > > > [1] >> > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L49 >> > > > > [2] >> > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableFunction.java#L196 >> > > > > >> > > > > Best regards, >> > > > > Jing >> > > > > >> > > > > On Fri, Jun 9, 2023 at 5:18 AM Aitozi <gjying1...@gmail.com> >> wrote: >> > > > > >> > > > > > Hi Jing >> > > > > > Thanks for your good questions. I have updated the example >> to >> > the >> > > > > FLIP. >> > > > > > >> > > > > > > Only one row for each lookup >> > > > > > lookup can also return multi rows, based on the query result. >> [1] >> > > > > > >> > > > > > [1]: >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/191ec6ca3943d7119f14837efe112e074d815c47/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/LookupFunction.java#L56 >> > > > > > >> > > > > > > If we use async calls with lateral join, my gut feeling is >> > > > > > that we might have many more async calls than lookup join. I am >> not >> > > > > really >> > > > > > sure if we will be facing potential issues in this case or not. >> > > > > > >> > > > > > IMO, the work pattern is similar to the lookup function, for >> each >> > row >> > > > > from >> > > > > > the left table, >> > > > > > it will evaluate the eval method once, so the async call numbers >> > will >> > > > not >> > > > > > change. >> > > > > > and the maximum calls in flight is limited by the Async >> operators >> > > > buffer >> > > > > > capacity >> > > > > > which will be controlled by the option. >> > > > > > >> > > > > > BTW, for the naming of these option, I updated the FLIP about >> this >> > > you >> > > > > can >> > > > > > refer to >> > > > > > the section of "ConfigOption" and "Rejected Alternatives" >> > > > > > >> > > > > > In the end, for the performance evaluation, I'd like to do some >> > tests >> > > > and >> > > > > > will update it to the FLIP doc >> > > > > > >> > > > > > Thanks, >> > > > > > Aitozi. >> > > > > > >> > > > > > >> > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月9日周五 07:23写道: >> > > > > > >> > > > > > > Hi Aitozi, >> > > > > > > >> > > > > > > Thanks for the clarification. The code example looks >> > interesting. I >> > > > > would >> > > > > > > suggest adding them into the FLIP. The description with code >> > > examples >> > > > > > will >> > > > > > > help readers understand the motivation and how to use it. >> Afaiac, >> > > it >> > > > > is a >> > > > > > > valid feature for Flink users. >> > > > > > > >> > > > > > > As we knew, lookup join is based on temporal join, i.e. FOR >> > > > SYSTEM_TIME >> > > > > > AS >> > > > > > > OF which is also used in your code example. Temporal join >> > performs >> > > > the >> > > > > > > lookup based on the processing time match. Only one row for >> each >> > > > > > > lookup(afaiu, I need to check the source code to double >> confirm) >> > > will >> > > > > > > return for further enrichment. One the other hand, lateral >> join >> > > will >> > > > > have >> > > > > > > sub-queries correlated with every individual value of the >> > reference >> > > > > table >> > > > > > > from the preceding part of the query and each sub query will >> > return >> > > > > > > multiple rows. If we use async calls with lateral join, my gut >> > > > feeling >> > > > > is >> > > > > > > that we might have many more async calls than lookup join. I >> am >> > not >> > > > > > really >> > > > > > > sure if we will be facing potential issues in this case or >> not. >> > > > > Possible >> > > > > > > issues I can think of now e.g. too many PRC calls, too many >> async >> > > > calls >> > > > > > > processing, the sub query will return a table which might be >> > (too) >> > > > big, >> > > > > > and >> > > > > > > might cause performance issues. I would suggest preparing some >> > use >> > > > > cases >> > > > > > > and running some performance tests to check it. These are my >> > > concerns >> > > > > > about >> > > > > > > using async calls with lateral join and I'd like to share with >> > you, >> > > > > happy >> > > > > > > to discuss with you and hear different opinions, hopefully the >> > > > > > > discussion could help me understand it more deeply. Please >> > correct >> > > me >> > > > > if >> > > > > > I >> > > > > > > am wrong. >> > > > > > > >> > > > > > > Best regards, >> > > > > > > Jing >> > > > > > > >> > > > > > > >> > > > > > > On Thu, Jun 8, 2023 at 7:22 AM Aitozi <gjying1...@gmail.com> >> > > wrote: >> > > > > > > >> > > > > > > > Hi Mason, >> > > > > > > > Thanks for your input. I think if we support the user >> > defined >> > > > > async >> > > > > > > > table function, >> > > > > > > > user will be able to use it to hold a batch data then >> handle it >> > > at >> > > > > one >> > > > > > > time >> > > > > > > > in the customized function. >> > > > > > > > >> > > > > > > > AsyncSink is meant for the sink operator. I have not figure >> out >> > > how >> > > > > to >> > > > > > > > integrate in this case. >> > > > > > > > >> > > > > > > > Thanks, >> > > > > > > > Atiozi. >> > > > > > > > >> > > > > > > > >> > > > > > > > Mason Chen <mas.chen6...@gmail.com> 于2023年6月8日周四 12:40写道: >> > > > > > > > >> > > > > > > > > Hi Aitozi, >> > > > > > > > > >> > > > > > > > > I think it makes sense to make it easier for SQL users to >> > make >> > > > > RPCs. >> > > > > > Do >> > > > > > > > you >> > > > > > > > > think your proposal can extend to the ability to batch >> data >> > for >> > > > the >> > > > > > > RPC? >> > > > > > > > > This is also another common strategy to increase >> throughput. >> > > > Also, >> > > > > > have >> > > > > > > > you >> > > > > > > > > considered solving this a bit differently by leveraging >> > Flink's >> > > > > > > > AsyncSink? >> > > > > > > > > >> > > > > > > > > Best, >> > > > > > > > > Mason >> > > > > > > > > >> > > > > > > > > On Mon, Jun 5, 2023 at 1:50 AM Aitozi < >> gjying1...@gmail.com> >> > > > > wrote: >> > > > > > > > > >> > > > > > > > > > One more thing for discussion: >> > > > > > > > > > >> > > > > > > > > > In our internal implementation, we reuse the option >> > > > > > > > > > `table.exec.async-lookup.buffer-capacity` and >> > > > > > > > > > `table.exec.async-lookup.timeout` to config >> > > > > > > > > > the async udtf. Do you think we should add two extra >> option >> > > to >> > > > > > > > > distinguish >> > > > > > > > > > from the lookup option such as >> > > > > > > > > > >> > > > > > > > > > `table.exec.async-udtf.buffer-capacity` >> > > > > > > > > > `table.exec.async-udtf.timeout` >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Best, >> > > > > > > > > > Aitozi. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Aitozi <gjying1...@gmail.com> 于2023年6月5日周一 12:20写道: >> > > > > > > > > > >> > > > > > > > > > > Hi Jing, >> > > > > > > > > > > >> > > > > > > > > > > > what is the difference between the RPC call or >> > query >> > > > you >> > > > > > > > > mentioned >> > > > > > > > > > > and the lookup in a very >> > > > > > > > > > > general way >> > > > > > > > > > > >> > > > > > > > > > > I think the RPC call or query service is quite >> similar to >> > > the >> > > > > > > lookup >> > > > > > > > > > join. >> > > > > > > > > > > But lookup join should work >> > > > > > > > > > > with `LookupTableSource`. >> > > > > > > > > > > >> > > > > > > > > > > Let's see how we can perform an async RPC call with >> > lookup >> > > > > join: >> > > > > > > > > > > >> > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC call >> logic. >> > > > > > > > > > > (2) Implement a `LookupTableSource` connector run with >> > the >> > > > > async >> > > > > > > udtf >> > > > > > > > > > > defined in (1). >> > > > > > > > > > > (3) Then define a DDL of this look up table in SQL >> > > > > > > > > > > >> > > > > > > > > > > CREATE TEMPORARY TABLE Customers ( >> > > > > > > > > > > id INT, >> > > > > > > > > > > name STRING, >> > > > > > > > > > > country STRING, >> > > > > > > > > > > zip STRING >> > > > > > > > > > > ) WITH ( >> > > > > > > > > > > 'connector' = 'custom' >> > > > > > > > > > > ); >> > > > > > > > > > > >> > > > > > > > > > > (4) Run with the query as below: >> > > > > > > > > > > >> > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip >> > > > > > > > > > > FROM Orders AS o >> > > > > > > > > > > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS >> c >> > > > > > > > > > > ON o.customer_id = c.id; >> > > > > > > > > > > >> > > > > > > > > > > This example is from doc >> > > > > > > > > > > < >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#lookup-join >> > > > > > > > > > >.You >> > > > > > > > > > > can image the look up process as an async RPC call >> > process. >> > > > > > > > > > > >> > > > > > > > > > > Let's see how we can perform an async RPC call with >> > lateral >> > > > > join: >> > > > > > > > > > > >> > > > > > > > > > > (1) Implement an AsyncTableFunction with RPC call >> logic. >> > > > > > > > > > > (2) Run query with >> > > > > > > > > > > >> > > > > > > > > > > Create function f1 as '...' ; >> > > > > > > > > > > >> > > > > > > > > > > SELECT o.order_id, o.total, c.country, c.zip FROM >> Orders >> > > > > lateral >> > > > > > > > table >> > > > > > > > > > > (f1(order_id)) as T(...); >> > > > > > > > > > > >> > > > > > > > > > > As you can see, the lateral join version is more >> simple >> > and >> > > > > > > intuitive >> > > > > > > > > to >> > > > > > > > > > > users. Users do not have to wrap a >> > > > > > > > > > > LookupTableSource for the purpose of using async udtf. >> > > > > > > > > > > >> > > > > > > > > > > In the end, We can also see the user defined async >> table >> > > > > function >> > > > > > > is >> > > > > > > > an >> > > > > > > > > > > enhancement of the current lateral table join >> > > > > > > > > > > which only supports sync lateral join now. >> > > > > > > > > > > >> > > > > > > > > > > Best, >> > > > > > > > > > > Aitozi. >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > Jing Ge <j...@ververica.com.invalid> 于2023年6月2日周五 >> > 19:37写道: >> > > > > > > > > > > >> > > > > > > > > > >> Hi Aitozi, >> > > > > > > > > > >> >> > > > > > > > > > >> Thanks for the update. Just out of curiosity, what is >> > the >> > > > > > > difference >> > > > > > > > > > >> between the RPC call or query you mentioned and the >> > lookup >> > > > in >> > > > > a >> > > > > > > very >> > > > > > > > > > >> general way? Since Lateral join is used in the FLIP. >> Is >> > > > there >> > > > > > any >> > > > > > > > > > special >> > > > > > > > > > >> thought for that? Sorry for asking so many questions. >> > The >> > > > FLIP >> > > > > > > > > contains >> > > > > > > > > > >> limited information to understand the motivation. >> > > > > > > > > > >> >> > > > > > > > > > >> Best regards, >> > > > > > > > > > >> Jing >> > > > > > > > > > >> >> > > > > > > > > > >> On Fri, Jun 2, 2023 at 3:48 AM Aitozi < >> > > gjying1...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > > > >> >> > > > > > > > > > >> > Hi Jing, >> > > > > > > > > > >> > I have updated the proposed changes to the >> FLIP. >> > > IMO, >> > > > > > lookup >> > > > > > > > has >> > > > > > > > > > its >> > > > > > > > > > >> > clear >> > > > > > > > > > >> > async call requirement is due to its IO heavy >> > operator. >> > > In >> > > > > our >> > > > > > > > > usage, >> > > > > > > > > > >> sql >> > > > > > > > > > >> > users have >> > > > > > > > > > >> > logic to do some RPC call or query the third-party >> > > service >> > > > > > which >> > > > > > > > is >> > > > > > > > > > >> also IO >> > > > > > > > > > >> > intensive. >> > > > > > > > > > >> > In these case, we'd like to leverage the async >> > function >> > > to >> > > > > > > improve >> > > > > > > > > the >> > > > > > > > > > >> > throughput. >> > > > > > > > > > >> > >> > > > > > > > > > >> > Thanks, >> > > > > > > > > > >> > Aitozi. >> > > > > > > > > > >> > >> > > > > > > > > > >> > Jing Ge <j...@ververica.com.invalid> 于2023年6月1日周四 >> > > > 22:55写道: >> > > > > > > > > > >> > >> > > > > > > > > > >> > > Hi Aitozi, >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > Sorry for the late reply. Would you like to >> update >> > the >> > > > > > > proposed >> > > > > > > > > > >> changes >> > > > > > > > > > >> > > with more details into the FLIP too? >> > > > > > > > > > >> > > I got your point. It looks like a rational idea. >> > > > However, >> > > > > > > since >> > > > > > > > > > lookup >> > > > > > > > > > >> > has >> > > > > > > > > > >> > > its clear async call requirement, are there any >> real >> > > use >> > > > > > cases >> > > > > > > > > that >> > > > > > > > > > >> > > need this change? This will help us understand >> the >> > > > > > motivation. >> > > > > > > > > After >> > > > > > > > > > >> all, >> > > > > > > > > > >> > > lateral join and temporal lookup join[1] are >> quite >> > > > > > different. >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > Best regards, >> > > > > > > > > > >> > > Jing >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > [1] >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/d90a72da2fd601ca4e2a46700e91ec5b348de2ad/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/AsyncTableFunction.java#L54 >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > On Wed, May 31, 2023 at 8:53 AM Aitozi < >> > > > > > gjying1...@gmail.com> >> > > > > > > > > > wrote: >> > > > > > > > > > >> > > >> > > > > > > > > > >> > > > Hi Jing, >> > > > > > > > > > >> > > > What do you think about it? Can we move >> > forward >> > > > this >> > > > > > > > > feature? >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > Thanks, >> > > > > > > > > > >> > > > Aitozi. >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > Aitozi <gjying1...@gmail.com> 于2023年5月29日周一 >> > > 09:56写道: >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > > Hi Jing, >> > > > > > > > > > >> > > > > > "Do you mean to support the >> > > AyncTableFunction >> > > > > > beyond >> > > > > > > > the >> > > > > > > > > > >> > > > > LookupTableSource?" >> > > > > > > > > > >> > > > > Yes, I mean to support the AyncTableFunction >> > > beyond >> > > > > the >> > > > > > > > > > >> > > > LookupTableSource. >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > The "AsyncTableFunction" is the function with >> > > > ability >> > > > > to >> > > > > > > be >> > > > > > > > > > >> executed >> > > > > > > > > > >> > > > async >> > > > > > > > > > >> > > > > (with AsyncWaitOperator). >> > > > > > > > > > >> > > > > The async lookup join is a one of usage of >> this. >> > > So, >> > > > > we >> > > > > > > > don't >> > > > > > > > > > >> have to >> > > > > > > > > > >> > > > bind >> > > > > > > > > > >> > > > > the AyncTableFunction with LookupTableSource. >> > > > > > > > > > >> > > > > If User-defined AsyncTableFunction is >> supported, >> > > > user >> > > > > > can >> > > > > > > > > > directly >> > > > > > > > > > >> > use >> > > > > > > > > > >> > > > > lateral table syntax to perform async >> operation. >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > > "It would be better if you could elaborate >> the >> > > > > > proposed >> > > > > > > > > > changes >> > > > > > > > > > >> wrt >> > > > > > > > > > >> > > the >> > > > > > > > > > >> > > > > CorrelatedCodeGenerator with more details" >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > In the proposal, we use lateral table syntax >> to >> > > > > support >> > > > > > > the >> > > > > > > > > > async >> > > > > > > > > > >> > table >> > > > > > > > > > >> > > > > function. So the planner will also treat this >> > > > > statement >> > > > > > > to a >> > > > > > > > > > >> > > > > CommonExecCorrelate node. So the runtime code >> > > should >> > > > > be >> > > > > > > > > > generated >> > > > > > > > > > >> in >> > > > > > > > > > >> > > > > CorrelatedCodeGenerator. >> > > > > > > > > > >> > > > > In CorrelatedCodeGenerator, we will know the >> > > > > > > TableFunction's >> > > > > > > > > > Kind >> > > > > > > > > > >> of >> > > > > > > > > > >> > > > > `FunctionKind.Table` or >> > `FunctionKind.ASYNC_TABLE` >> > > > > > > > > > >> > > > > For `FunctionKind.ASYNC_TABLE` we can >> generate >> > a >> > > > > > > > > > >> AsyncWaitOperator >> > > > > > > > > > >> > to >> > > > > > > > > > >> > > > > execute the async table function. >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > Thanks, >> > > > > > > > > > >> > > > > Aitozi. >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > > Jing Ge <j...@ververica.com.invalid> >> > > 于2023年5月29日周一 >> > > > > > > 03:22写道: >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > >> Hi Aitozi, >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> Thanks for the clarification. The naming >> > "Lookup" >> > > > > might >> > > > > > > > > suggest >> > > > > > > > > > >> > using >> > > > > > > > > > >> > > it >> > > > > > > > > > >> > > > >> for table look up. But conceptually what the >> > > eval() >> > > > > > > method >> > > > > > > > > will >> > > > > > > > > > >> do >> > > > > > > > > > >> > is >> > > > > > > > > > >> > > to >> > > > > > > > > > >> > > > >> get a collection of results(Row, RowData) >> from >> > > the >> > > > > > given >> > > > > > > > > keys. >> > > > > > > > > > >> How >> > > > > > > > > > >> > it >> > > > > > > > > > >> > > > will >> > > > > > > > > > >> > > > >> be done depends on the implementation, i.e. >> you >> > > can >> > > > > > > > implement >> > > > > > > > > > >> your >> > > > > > > > > > >> > own >> > > > > > > > > > >> > > > >> Source[1][2]. The example in the FLIP >> should be >> > > > able >> > > > > to >> > > > > > > be >> > > > > > > > > > >> handled >> > > > > > > > > > >> > in >> > > > > > > > > > >> > > > this >> > > > > > > > > > >> > > > >> way. >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> Do you mean to support the AyncTableFunction >> > > beyond >> > > > > the >> > > > > > > > > > >> > > > LookupTableSource? >> > > > > > > > > > >> > > > >> It would be better if you could elaborate >> the >> > > > > proposed >> > > > > > > > > changes >> > > > > > > > > > >> wrt >> > > > > > > > > > >> > the >> > > > > > > > > > >> > > > >> CorrelatedCodeGenerator with more details. >> > > Thanks! >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> Best regards, >> > > > > > > > > > >> > > > >> Jing >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> [1] >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/LookupTableSource.java#L64 >> > > > > > > > > > >> > > > >> [2] >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink/blob/678370b18e1b6c4a23e5ce08f8efd05675a0cc17/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/AsyncTableFunctionProvider.java#L49 >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> On Sat, May 27, 2023 at 9:48 AM Aitozi < >> > > > > > > > gjying1...@gmail.com >> > > > > > > > > > >> > > > > > > > > > >> > wrote: >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> > Hi Jing, >> > > > > > > > > > >> > > > >> > Thanks for your response. As stated in >> > the >> > > > > FLIP, >> > > > > > > the >> > > > > > > > > > >> purpose >> > > > > > > > > > >> > of >> > > > > > > > > > >> > > > this >> > > > > > > > > > >> > > > >> > FLIP is meant to support >> > > > > > > > > > >> > > > >> > user-defined async table function. As >> > described >> > > > in >> > > > > > > flink >> > > > > > > > > > >> document >> > > > > > > > > > >> > > [1] >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > Async table functions are special >> functions >> > for >> > > > > table >> > > > > > > > > sources >> > > > > > > > > > >> that >> > > > > > > > > > >> > > > >> perform >> > > > > > > > > > >> > > > >> > > a lookup. >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > So end user can not directly define and >> use >> > > async >> > > > > > table >> > > > > > > > > > >> function >> > > > > > > > > > >> > > now. >> > > > > > > > > > >> > > > An >> > > > > > > > > > >> > > > >> > user case is reported in [2] >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > So, in conclusion, no new interface is >> > > > introduced, >> > > > > > but >> > > > > > > we >> > > > > > > > > > >> extend >> > > > > > > > > > >> > the >> > > > > > > > > > >> > > > >> > ability to support user-defined async >> table >> > > > > function. >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > [1]: >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/functions/udfs/ >> > > > > > > > > > >> > > > >> > [2]: >> > > > > > > > > > >> > > >> > > > > > > > >> > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > Thanks. >> > > > > > > > > > >> > > > >> > Aitozi. >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > Jing Ge <j...@ververica.com.invalid> >> > > > 于2023年5月27日周六 >> > > > > > > > > 06:40写道: >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> > > Hi Aitozi, >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > Thanks for your proposal. I am not quite >> > sure >> > > > if >> > > > > I >> > > > > > > > > > understood >> > > > > > > > > > >> > your >> > > > > > > > > > >> > > > >> > thoughts >> > > > > > > > > > >> > > > >> > > correctly. You described a special case >> > > > > > > implementation >> > > > > > > > of >> > > > > > > > > > the >> > > > > > > > > > >> > > > >> > > AsyncTableFunction with on public API >> > > changes. >> > > > > > Would >> > > > > > > > you >> > > > > > > > > > >> please >> > > > > > > > > > >> > > > >> elaborate >> > > > > > > > > > >> > > > >> > > your purpose of writing a FLIP >> according to >> > > the >> > > > > > FLIP >> > > > > > > > > > >> > > > documentation[1]? >> > > > > > > > > > >> > > > >> > > Thanks! >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > [1] >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > Best regards, >> > > > > > > > > > >> > > > >> > > Jing >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > On Wed, May 24, 2023 at 1:07 PM Aitozi < >> > > > > > > > > > gjying1...@gmail.com >> > > > > > > > > > >> > >> > > > > > > > > > >> > > > wrote: >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > > > May I ask for some feedback :D >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > > Thanks, >> > > > > > > > > > >> > > > >> > > > Aitozi >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > > Aitozi <gjying1...@gmail.com> >> > > 于2023年5月23日周二 >> > > > > > > 19:14写道: >> > > > > > > > > > >> > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > Just catch an user case report from >> > > Giannis >> > > > > > > Polyzos >> > > > > > > > > for >> > > > > > > > > > >> this >> > > > > > > > > > >> > > > >> usage: >> > > > > > > > > > >> > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > > > > > >> > > https://lists.apache.org/thread/qljwd40v5ntz6733cwcdr8s4z97b343b >> > > > > > > > > > >> > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > Aitozi <gjying1...@gmail.com> >> > > > 于2023年5月23日周二 >> > > > > > > > 17:45写道: >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > > Hi guys, >> > > > > > > > > > >> > > > >> > > > > > I want to bring up a >> discussion >> > > about >> > > > > > > adding >> > > > > > > > > > >> support >> > > > > > > > > > >> > of >> > > > > > > > > > >> > > > User >> > > > > > > > > > >> > > > >> > > > > > Defined AsyncTableFunction in >> Flink. >> > > > > > > > > > >> > > > >> > > > > > Currently, async table function >> are >> > > > special >> > > > > > > > > functions >> > > > > > > > > > >> for >> > > > > > > > > > >> > > > table >> > > > > > > > > > >> > > > >> > > source >> > > > > > > > > > >> > > > >> > > > > > to perform >> > > > > > > > > > >> > > > >> > > > > > async lookup. However, it's worth >> to >> > > > > support >> > > > > > > the >> > > > > > > > > user >> > > > > > > > > > >> > > defined >> > > > > > > > > > >> > > > >> async >> > > > > > > > > > >> > > > >> > > > > > table function. >> > > > > > > > > > >> > > > >> > > > > > Because, in this way, the end SQL >> > user >> > > > can >> > > > > > > > leverage >> > > > > > > > > > it >> > > > > > > > > > >> to >> > > > > > > > > > >> > > > >> perform >> > > > > > > > > > >> > > > >> > the >> > > > > > > > > > >> > > > >> > > > > > async operation >> > > > > > > > > > >> > > > >> > > > > > which is useful to maximum the >> system >> > > > > > > throughput >> > > > > > > > > > >> > especially >> > > > > > > > > > >> > > > for >> > > > > > > > > > >> > > > >> IO >> > > > > > > > > > >> > > > >> > > > > > bottleneck case. >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > > You can find some more detail in >> [1]. >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > > Looking forward to feedback >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > > [1]: >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/%5BFLIP-313%5D+Add+support+of+User+Defined+AsyncTableFunction >> > > > > > > > > > >> > > > >> > > > > > >> > > > > > > > > > >> > > > >> > > > > > Thanks, >> > > > > > > > > > >> > > > >> > > > > > Aitozi. >> > > > > > > > > > >> > > > >> > > > >> > > > > > > > > > >> > > > >> > > >> > > > > > > > > > >> > > > >> > >> > > > > > > > > > >> > > > >> >> > > > > > > > > > >> > > > > >> > > > > > > > > > >> > > > >> > > > > > > > > > >> > > >> > > > > > > > > > >> > >> > > > > > > > > > >> >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >