Hi Team ,
I am using Flink 1.5.0 and tried to use below code for Kafka consumer in
Flink. I need some information for Flink POJO type .
KafkaTableSource kf_source = Kafka010AvroTableSource.builder()
.forTopic("test-topic")
.withKafkaProperties(props)
.withSchema(tb_schema)
.forAvroRecordClass(StoreSales.class) // ?? How can
we create our own custom class to use in this function ?
.build();
Scenario :
I need to collect avro records from Kafka topic using Kafka010AvroTableSource
with below schema.
long datetime,long ss_sold_date_sk ,long ss_sold_time_sk,long
ss_item_sk,long ss_customer_sk ,long ss_cdemo_sk,long ss_hdemo_sk ,long
ss_addr_sk,long ss_store_sk ,long ss_promo_sk,long ss_ticket_number,int
ss_quantity ,double ss_wholesale_cost,double ss_list_price,double
ss_sales_price,double ss_ext_discount_amt,double ss_ext_sales_price,double
ss_ext_wholesale_cost,double ss_ext_list_price,double ss_ext_tax,double
ss_coupon_amt,double ss_net_paid,double ss_net_paid_inc_tax,double ss_net_profit
I have passed the schema in "forAvroRecordClass" function in above code
snippet . Below is the code for tb_schema.
TableSchema tb_schema = TableSchema.builder()
.field("datetime",Types.LONG())
.field("ss_sold_date_sk",Types.LONG())
.field("ss_sold_time_sk",Types.LONG())
.field("ss_item_sk",Types.LONG())
.field("ss_customer_sk",Types.LONG())
.field("ss_cdemo_sk",Types.LONG())
.field("ss_hdemo_sk",Types.LONG())
.field("ss_addr_sk",Types.LONG())
.field("ss_store_sk",Types.LONG())
.field("ss_promo_sk",Types.LONG())
.field("ss_ticket_number",Types.LONG())
.field("ss_quantity",Types.INT())
.field("ss_wholesale_cost",Types.DOUBLE())
.field("ss_list_price",Types.DOUBLE())
.field("ss_sales_price",Types.DOUBLE())
.field("ss_ext_discount_amt",Types.DOUBLE())
.field("ss_ext_sales_price",Types.DOUBLE())
.field("ss_ext_wholesale_cost",Types.DOUBLE())
.field("ss_ext_list_price",Types.DOUBLE())
.field("ss_ext_tax",Types.DOUBLE())
.field("ss_coupon_amt",Types.DOUBLE())
.field("ss_net_paid",Types.DOUBLE())
.field("ss_net_paid_inc_tax",Types.DOUBLE())
.field("ss_net_profit",Types.DOUBLE())
.build();
Issue :
The StoreSales class which I created is not compatible with
"forAvroRecordClass" function. Could anyone please suggest a way to make it
compatible with the code ?
public class StoreSales extends SpecificRecordBase {
public long datetime;
public long ss_sold_date_sk;
public long ss_sold_time_sk;
public long ss_item_sk;
public long ss_customer_sk ;
public long ss_cdemo_sk;
public long ss_hdemo_sk ;
public long ss_addr_sk;
public long ss_store_sk ;
public long ss_promo_sk;
public long ss_ticket_number;
public int ss_quantity ;
public double ss_wholesale_cost;
public double ss_list_price;
public double ss_sales_price;
public double ss_ext_discount_amt;
public double ss_ext_sales_price;
public double ss_ext_wholesale_cost;
public double ss_ext_list_price;
public double ss_ext_tax;
public double ss_coupon_amt;
public double ss_net_paid;
public double ss_net_paid_inc_tax;
public double ss_net_profit;
@Override
public Schema getSchema() {
// TODO Auto-generated method stub
return null;
}
@Override
public Object get(int field) {
// TODO Auto-generated method stub
return null;
}
@Override
public void put(int field, Object value) {
// TODO Auto-generated method stub
}
Regards,
Kafeel Ansari