Hello Timo, Thanks for quick reply. By using your suggestion Previous exception gone but it is giving me following exception
Expression 'o.get(_id) failed on input check: Cannot access field of non-composite type 'GenericType<com.mongodb.BasicDBObject>'. ----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------ On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <twal...@apache.org> wrote: > Hi Amol, > > the dot operation is reserved for calling functions on fields. If you want > to get a nested field in the Table API, use the `.get("applicationId")` > operation. See also [1] under "Value access functions". > > Regards, > Timo > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ > dev/table/tableApi.html#built-in-functions > > > Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer: > >> Hello Fabian, >> >> I am streaming my mongodb oplog using flink and want to use flink table >> API >> to join multiple tables. My code looks like >> >> DataStream<Oplog> streamSource = env >> .addSource(kafkaConsumer) >> .setParallelism(4); >> >> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir >> onment(env); >> // Convert the DataStream into a Table with default fields "f0", "f1" >> Table table1 = tableEnv.fromDataStream(streamSource); >> >> Table master = table1.filter("ns === 'Master'").select("o as master, >> o.applicationId as primaryKey"); >> Table child1 = table1.filter("ns === 'Child1'").select("o as child1, >> o.applicationId as foreignKey"); >> Table child2 = table1.filter("ns === 'Child2'").select("o as child2, >> o.applicationId as foreignKey2"); >> >> Table result = master.join(child1).where("pri >> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); >> >> it is throwing error "Method threw >> 'org.apache.flink.table.api.ValidationException' exception. Undefined >> function: APPLICATIONID" >> >> public class Oplog implements Serializable{ >> private BasicDBObject o; >> } >> >> Where o is generic java type for fetching mongodb oplog and I can not >> replace this generic type with static pojo's. please tell me any work >> around on this. >> >> BasicDBObject suffice following two rules >> >> >> - >> >> The class must be public. >> - >> >> It must have a public constructor without arguments (default >> constructor) >> >> and we can access class members through basicDBObject.getString("abc") >> >> >> >> >> ----------------------------------------------- >> *Amol Suryawanshi* >> Java Developer >> am...@iprogrammer.com >> >> >> *iProgrammer Solutions Pvt. Ltd.* >> >> >> >> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> 411016, >> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> www.iprogrammer.com <sac...@iprogrammer.com> >> ------------------------------------------------ >> >> >