Hi Team,

I have a flink job that read tranasctions data from a single source and creates 
iceberg tables. I need to create multiple datasets for each account in the list 
of transactions. The IcebergTableSink accepts the TableLoader on the 
constructor, but my table name is based on the account Id on each transaction 
and the account Ids is not a predefined list. This means I have to decide which 
table to load data into while writing the code, but I want it to be dynamic 
based on the account Id. Is there a better way to handle it? Here's the job I 
have to create one iceberg table from the job. Appreciate any help here.

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> transactions = env
                .addSource(new TransactionSource())
                .name("transactions");

        DataStream<RowData> rows = transactions
                .keyBy(Transaction::getAccountId)
                .process(new FraudDetector())
                .name("fraud-detector");
            TableSchema ts = TableSchema.builder()
                                 .field("AccountId", DataTypes.BIGINT())
                                 .field("Timestamp", DataTypes.BIGINT())
                                 .field("Amount", DataTypes.DOUBLE())
                                 .build();
           // TODO: need to write to multiple tables here based on 
Transaction::getAccountId
           String tablelocation = "./data/flinklocal/transactions5";
           TableLoader tl = TableLoader.fromHadoopTable(tablelocation, 
hadoopConf);
           IcebergTableSink sink = new IcebergTableSink(false, tl, ts);
           sink.consumeDataStream(rows);
           env.execute("Multiple transactional datasets”);
}
Thanks,
Asha Desu

Reply via email to