ashajyothi828 opened a new issue #2208:
URL: https://github.com/apache/iceberg/issues/2208


   Hi Team,
   
   I have a flink job that read tranasctions data from a single source and 
creates iceberg tables. I need to create multiple datasets for each account in 
the list of transactions. The IcebergTableSink accepts the TableLoader on the 
constructor, but my table name is based on the account Id on each transaction 
and the account Ids is not a predefined list. This means I have to decide which 
table to load data into while writing the code, but I want it to be dynamic 
based on the account Id. Is there a better way to handle it? Here's the job I 
have to create one iceberg table from the job. Appreciate any help here.
   
   public static void main(String[] args) throws Exception {
                StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   
                DataStream<Transaction> transactions = env
                        .addSource(new TransactionSource())
                        .name("transactions");
   
                DataStream<RowData> rows = transactions
                        .keyBy(Transaction::getAccountId)
                        .process(new FraudDetector())
                        .name("fraud-detector");
                   TableSchema ts = TableSchema.builder()
                                         .field("AccountId", DataTypes.BIGINT())
                                         .field("Timestamp", DataTypes.BIGINT())
                                         .field("Amount", DataTypes.DOUBLE())
                                         .build();
                  // TODO: need to write to multiple tables here based on 
Transaction::getAccountId
                  String tablelocation = "./data/flinklocal/transactions5";
                  TableLoader tl = TableLoader.fromHadoopTable(tablelocation, 
hadoopConf);
                  IcebergTableSink sink = new IcebergTableSink(false, tl, ts);
                  sink.consumeDataStream(rows);
                  env.execute("Multiple transactional datasets");
   }


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to