Hello Fabian, I am streaming my mongodb oplog using flink and want to use flink table API to join multiple tables. My code looks like
DataStream<Oplog> streamSource = env .addSource(kafkaConsumer) .setParallelism(4); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(streamSource); Table master = table1.filter("ns === 'Master'").select("o as master, o.applicationId as primaryKey"); Table child1 = table1.filter("ns === 'Child1'").select("o as child1, o.applicationId as foreignKey"); Table child2 = table1.filter("ns === 'Child2'").select("o as child2, o.applicationId as foreignKey2"); Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException' exception. Undefined function: APPLICATIONID" public class Oplog implements Serializable{ private BasicDBObject o; } Where o is generic java type for fetching mongodb oplog and I can not replace this generic type with static pojo's. please tell me any work around on this. BasicDBObject suffice following two rules - The class must be public. - It must have a public constructor without arguments (default constructor) and we can access class members through basicDBObject.getString("abc") ----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------