Hi Team ,
I am using Flink 1.5.0  and  tried to use below code for Kafka consumer in 
Flink. I need some information for Flink POJO type .

       KafkaTableSource kf_source = Kafka010AvroTableSource.builder()
                           .forTopic("test-topic")
                           .withKafkaProperties(props)
                           .withSchema(tb_schema)
                           .forAvroRecordClass(StoreSales.class)  // ?? How can 
we create our own custom class to use in this function ?
                           .build();



Scenario :
 I need to collect avro records from Kafka topic using Kafka010AvroTableSource 
with below schema.

               long datetime,long ss_sold_date_sk ,long ss_sold_time_sk,long 
ss_item_sk,long ss_customer_sk ,long ss_cdemo_sk,long ss_hdemo_sk ,long 
ss_addr_sk,long ss_store_sk ,long ss_promo_sk,long ss_ticket_number,int 
ss_quantity ,double ss_wholesale_cost,double ss_list_price,double 
ss_sales_price,double ss_ext_discount_amt,double ss_ext_sales_price,double 
ss_ext_wholesale_cost,double ss_ext_list_price,double ss_ext_tax,double 
ss_coupon_amt,double ss_net_paid,double ss_net_paid_inc_tax,double ss_net_profit


I have passed the schema in "forAvroRecordClass" function  in above code 
snippet . Below is the code for tb_schema.


             TableSchema tb_schema = TableSchema.builder()
                           .field("datetime",Types.LONG())
                           .field("ss_sold_date_sk",Types.LONG())
                           .field("ss_sold_time_sk",Types.LONG())
                           .field("ss_item_sk",Types.LONG())
                           .field("ss_customer_sk",Types.LONG())
                           .field("ss_cdemo_sk",Types.LONG())
                           .field("ss_hdemo_sk",Types.LONG())
                           .field("ss_addr_sk",Types.LONG())
                           .field("ss_store_sk",Types.LONG())
                           .field("ss_promo_sk",Types.LONG())
                           .field("ss_ticket_number",Types.LONG())
                           .field("ss_quantity",Types.INT())
                           .field("ss_wholesale_cost",Types.DOUBLE())
                           .field("ss_list_price",Types.DOUBLE())
                           .field("ss_sales_price",Types.DOUBLE())
                           .field("ss_ext_discount_amt",Types.DOUBLE())
                           .field("ss_ext_sales_price",Types.DOUBLE())
                           .field("ss_ext_wholesale_cost",Types.DOUBLE())
                           .field("ss_ext_list_price",Types.DOUBLE())
                           .field("ss_ext_tax",Types.DOUBLE())
                           .field("ss_coupon_amt",Types.DOUBLE())
                           .field("ss_net_paid",Types.DOUBLE())
                           .field("ss_net_paid_inc_tax",Types.DOUBLE())
                           .field("ss_net_profit",Types.DOUBLE())
                           .build();

Issue :
 The StoreSales class which I created is not compatible with 
"forAvroRecordClass" function. Could anyone please suggest a way to make it 
compatible with the code ?



public class StoreSales extends SpecificRecordBase {
       public long datetime;
       public long ss_sold_date_sk;
       public long ss_sold_time_sk;
       public long ss_item_sk;
       public long ss_customer_sk ;
       public long ss_cdemo_sk;
       public long ss_hdemo_sk ;
       public long ss_addr_sk;
       public long ss_store_sk ;
       public long ss_promo_sk;
       public long ss_ticket_number;
       public int ss_quantity ;
       public double ss_wholesale_cost;
       public double ss_list_price;
       public double ss_sales_price;
       public double ss_ext_discount_amt;
       public double ss_ext_sales_price;
       public double ss_ext_wholesale_cost;
       public double ss_ext_list_price;
       public double ss_ext_tax;
       public double ss_coupon_amt;
       public double ss_net_paid;
       public double ss_net_paid_inc_tax;
       public double ss_net_profit;

       @Override
       public Schema getSchema() {
             // TODO Auto-generated method stub
             return null;
       }

       @Override
       public Object get(int field) {
             // TODO Auto-generated method stub
             return null;
       }

       @Override
       public void put(int field, Object value) {
             // TODO Auto-generated method stub

       }


Regards,
Kafeel Ansari

Reply via email to