Hello Fabian, Can you please tell me hot to convert Table back into DataStream? I just want to print the table result.
----------------------------------------------- *Amol Suryawanshi* Java Developer am...@iprogrammer.com *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <sac...@iprogrammer.com> ------------------------------------------------ On Mon, Jul 2, 2018 at 4:20 PM, Fabian Hueske <fhue...@gmail.com> wrote: > You can also use Row, but then you cannot rely on automatic type extraction > and provide TypeInformation. > > Amol S - iProgrammer <am...@iprogrammer.com> schrieb am Mo., 2. Juli 2018, > 12:37: > > > Hello Fabian, > > > > According to my requirement I can not create static pojo's for all > classes > > because I want to create dynamic jobs for all tables based on rule engine > > config. Please suggest me if there any other way to achieve this. > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > am...@iprogrammer.com > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <sac...@iprogrammer.com> > > ------------------------------------------------ > > > > On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <fhue...@gmail.com> wrote: > > > > > Hi Amol, > > > > > > These are the requirements for POJOs [1] that are fully supported by > > Flink. > > > > > > Best, Fabian > > > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs- > > > release-1.5/dev/api_concepts.html#pojos > > > > > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <am...@iprogrammer.com > >: > > > > > > > Hello Xingcan > > > > > > > > As mentioned in above mail thread I am streaming mongodb oplog to > join > > > > multiple mongo tables based on some unique key (Primary key). To > > achieve > > > > this I have created one java pojo as below. where o represent generic > > > pojo > > > > type of mongodb which has my table fields i.e. dynamic. now I want to > > use > > > > table api join over this basic BasicDBObject but it seem flink does > not > > > > allow generic pojo's. please suggest on this. > > > > > > > > public class Oplog { > > > > private OplogTimestamp ts; > > > > private BasicDBObject o; > > > > } > > > > > > > > > > > > > > > > ----------------------------------------------- > > > > *Amol Suryawanshi* > > > > Java Developer > > > > am...@iprogrammer.com > > > > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > > 411016, > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > www.iprogrammer.com <sac...@iprogrammer.com> > > > > ------------------------------------------------ > > > > > > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < > > > > am...@iprogrammer.com> > > > > wrote: > > > > > > > > > Hello Xingcan > > > > > > > > > > DataStream<Oplog> streamSource = env > > > > > .addSource(kafkaConsumer) > > > > > .setParallelism(4); > > > > > > > > > > StreamTableEnvironment tableEnv = TableEnvironment. > > > > getTableEnvironment(env); > > > > > // Convert the DataStream into a Table with default fields "f0", > "f1" > > > > > Table table1 = tableEnv.fromDataStream(streamSource); > > > > > > > > > > > > > > > Table customerMISMaster = table1.filter("ns === > > > > 'local.customerMISMaster'" > > > > > ).select("o as master"); > > > > > Table customerMISChild1 = table1.filter("ns === > > > > 'local.customerMISChild1'" > > > > > ).select("o as child1"); > > > > > Table customerMISChild2 = table1.filter("ns === > > > > 'local.customerMISChild2'" > > > > > ).select("o as child2"); > > > > > Table result = customerMISMaster.join(customerMISChild1).where(" > > > master. > > > > > loanApplicationId=child1.loanApplicationId"); > > > > > > > > > > > > > > > it is throwing error "Method threw 'org.apache.flink.table.api. > > > ValidationException' > > > > exception. Undefined function: LOANAPPLICATIONID" > > > > > > > > > > > > > > > > > > > > ----------------------------------------------- > > > > > *Amol Suryawanshi* > > > > > Java Developer > > > > > am...@iprogrammer.com > > > > > > > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > > > 411016, > > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > > www.iprogrammer.com <sac...@iprogrammer.com> > > > > > ------------------------------------------------ > > > > > > > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <xingc...@gmail.com> > > > wrote: > > > > > > > > > >> Hi Amol, > > > > >> > > > > >> The “dynamic table” is just a logical concept, following which the > > > Flink > > > > >> table API is designed. > > > > >> That means you don’t need to implement dynamic tables yourself. > > > > >> > > > > >> Flink table API provides different kinds of stream to stream joins > > in > > > > >> recent versions (from 1.4). > > > > >> The related docs can be found here https://ci.apache.org/projects > > > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > > > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 > > > > >> /dev/table/tableApi.html#joins>. > > > > >> > > > > >> Best, > > > > >> Xingcan > > > > >> > > > > >> > > > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > > > > am...@iprogrammer.com> > > > > >> wrote: > > > > >> > > > > > >> > Hello, > > > > >> > > > > > >> > I am streaming mongodb oplog using kafka and flink and want to > > join > > > > >> > multiple tables using flink table api but i have some concerns > > like > > > is > > > > >> it > > > > >> > possible to join streamed tables in flink and if yes then please > > > > >> provide me > > > > >> > some example of stream join using table API. > > > > >> > > > > > >> > I gone through your dynamic table api doc. it is quit > interesting > > > but > > > > >> > haven't found any example tutorial how to implement dynamic > table. > > > > >> > > > > > >> > I have tried to implement table api join using pojo class but it > > is > > > > >> > giving org.apache.flink.table.api.TableException: Cannot > generate > > a > > > > >> valid > > > > >> > execution plan for the given query > > > > >> > > > > > >> > ----------------------------------------------- > > > > >> > *Amol Suryawanshi* > > > > >> > Java Developer > > > > >> > am...@iprogrammer.com > > > > >> > > > > > >> > > > > > >> > *iProgrammer Solutions Pvt. Ltd.* > > > > >> > > > > > >> > > > > > >> > > > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing > Society, > > > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, > Pune > > - > > > > >> 411016, > > > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > >> > www.iprogrammer.com <sac...@iprogrammer.com> > > > > >> > ------------------------------------------------ > > > > >> > > > > >> > > > > > > > > > > > > > > >