Hi Team , I am using Flink 1.5.0 and tried to use below code for Kafka consumer in Flink. I need some information for Flink POJO type .
KafkaTableSource kf_source = Kafka010AvroTableSource.builder() .forTopic("test-topic") .withKafkaProperties(props) .withSchema(tb_schema) .forAvroRecordClass(StoreSales.class) // ?? How can we create our own custom class to use in this function ? .build(); Scenario : I need to collect avro records from Kafka topic using Kafka010AvroTableSource with below schema. long datetime,long ss_sold_date_sk ,long ss_sold_time_sk,long ss_item_sk,long ss_customer_sk ,long ss_cdemo_sk,long ss_hdemo_sk ,long ss_addr_sk,long ss_store_sk ,long ss_promo_sk,long ss_ticket_number,int ss_quantity ,double ss_wholesale_cost,double ss_list_price,double ss_sales_price,double ss_ext_discount_amt,double ss_ext_sales_price,double ss_ext_wholesale_cost,double ss_ext_list_price,double ss_ext_tax,double ss_coupon_amt,double ss_net_paid,double ss_net_paid_inc_tax,double ss_net_profit I have passed the schema in "forAvroRecordClass" function in above code snippet . Below is the code for tb_schema. TableSchema tb_schema = TableSchema.builder() .field("datetime",Types.LONG()) .field("ss_sold_date_sk",Types.LONG()) .field("ss_sold_time_sk",Types.LONG()) .field("ss_item_sk",Types.LONG()) .field("ss_customer_sk",Types.LONG()) .field("ss_cdemo_sk",Types.LONG()) .field("ss_hdemo_sk",Types.LONG()) .field("ss_addr_sk",Types.LONG()) .field("ss_store_sk",Types.LONG()) .field("ss_promo_sk",Types.LONG()) .field("ss_ticket_number",Types.LONG()) .field("ss_quantity",Types.INT()) .field("ss_wholesale_cost",Types.DOUBLE()) .field("ss_list_price",Types.DOUBLE()) .field("ss_sales_price",Types.DOUBLE()) .field("ss_ext_discount_amt",Types.DOUBLE()) .field("ss_ext_sales_price",Types.DOUBLE()) .field("ss_ext_wholesale_cost",Types.DOUBLE()) .field("ss_ext_list_price",Types.DOUBLE()) .field("ss_ext_tax",Types.DOUBLE()) .field("ss_coupon_amt",Types.DOUBLE()) .field("ss_net_paid",Types.DOUBLE()) .field("ss_net_paid_inc_tax",Types.DOUBLE()) .field("ss_net_profit",Types.DOUBLE()) .build(); Issue : The StoreSales class which I created is not compatible with "forAvroRecordClass" function. Could anyone please suggest a way to make it compatible with the code ? public class StoreSales extends SpecificRecordBase { public long datetime; public long ss_sold_date_sk; public long ss_sold_time_sk; public long ss_item_sk; public long ss_customer_sk ; public long ss_cdemo_sk; public long ss_hdemo_sk ; public long ss_addr_sk; public long ss_store_sk ; public long ss_promo_sk; public long ss_ticket_number; public int ss_quantity ; public double ss_wholesale_cost; public double ss_list_price; public double ss_sales_price; public double ss_ext_discount_amt; public double ss_ext_sales_price; public double ss_ext_wholesale_cost; public double ss_ext_list_price; public double ss_ext_tax; public double ss_coupon_amt; public double ss_net_paid; public double ss_net_paid_inc_tax; public double ss_net_profit; @Override public Schema getSchema() { // TODO Auto-generated method stub return null; } @Override public Object get(int field) { // TODO Auto-generated method stub return null; } @Override public void put(int field, Object value) { // TODO Auto-generated method stub } Regards, Kafeel Ansari