The lookup fashion Temporal Join[1] should be a solution for your case and there is an ITCase as an example[2]
[1] https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/sources/LookupableTableSource.java [2] https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AsyncLookupJoinITCase.scala *Best Regards,* *Zhenghua Gao* On Mon, Sep 16, 2019 at 9:23 PM srikanth flink <flink.d...@gmail.com> wrote: > Hi there, > > I'm working with streaming in FlinkSQL. I've two tables created one with > dynamic stream and the other a periodic updates. > I would like to keep the periodic table a static(but updates with new data > every day or so by flushing the old), So at any point of time the static > table should contain new set of data. > With dynamic table being populated with stream data, could I do a lookup on > a column of static table to find if the value exists. > > This is what I have done: > dynamic table: sourceKafka > static table: badips > > Trying to build a list, kind of using ROW() function and done. From dynamic > table, trying to lookup into the list if the value exists. > Query: INSERT INTO sourceKafkaMalicious select s.* from sourceKafka as s > where s.`source.ip` OR s.`destination.ip` IN (select ROW(ip) from badips); > Resonse: > [INFO] Submitting SQL update statement to the cluster... > [ERROR] Could not execute SQL statement. Reason: > org.apache.calcite.sql.validate.SqlValidatorException: Values passed to IN > operator must have compatible types > > Is it possible to solve my use case? If so, where am I going wrong? > > Thanks > Srikanth >