ayush-san opened a new issue #2504:
URL: https://github.com/apache/iceberg/issues/2504


   I am using flink CDC to stream CDC changes in an iceberg table. When I first 
run the flink job for a topic with all the data for a table, it gets out of 
heap memory as flink tries to load all the data during my 15mins checkpointing 
interval. Right now, the only solution I have is to pass `-ytm 8192 -yjm 2048m` 
for a table with 10M rows and then reduce it after the flink has consumed all 
the data. 
   
![image](https://user-images.githubusercontent.com/57655135/115663412-13d65900-a35e-11eb-9218-9f2f9af2a5c4.png)
   Shouldn't my iceberg sink have propagated the back-pressure up the source 
since it has not written that data yet and it will only do that after the 
checkpointing interval?
   ```
   FlinkSink.forRowData(rowDataDataStream)
                   .table(icebergTable)
                   
.tableSchema(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergTable.schema())))
                   .tableLoader(tableLoader)
                   .equalityFieldColumns(tableConfig.getEqualityColumns())
                   .build();
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to