The lookup fashion Temporal Join[1] should be a solution for your case and
there is an ITCase as an example[2]

[1]
https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java
[2]
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala


*Best Regards,*
*Zhenghua Gao*


On Mon, Sep 16, 2019 at 9:23 PM srikanth flink <flink.d...@gmail.com> wrote:

> Hi there,
>
> I'm working with streaming in FlinkSQL. I've two tables created one with
> dynamic stream and the other a periodic updates.
> I would like to keep the periodic table a static(but updates with new data
> every day or so by flushing the old), So at any point of time the static
> table should contain new set of data.
> With dynamic table being populated with stream data, could I do a lookup on
> a column of static table to find if the value exists.
>
> This is what I have done:
> dynamic table: sourceKafka
> static table: badips
>
> Trying to build a list, kind of using ROW() function and done. From dynamic
> table, trying to lookup into the list if the value exists.
> Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s
> where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips);
> Resonse:
> [INFO] Submitting SQL update statement to the cluster...
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN
> operator must have compatible types
>
> Is it possible to solve my use case? If so, where am I going wrong?
>
> Thanks
> Srikanth
>

Reply via email to