Reo-LEI commented on issue #2918:
URL: https://github.com/apache/iceberg/issues/2918#issuecomment-903093340


   > I am not familiar with SQL or CDC parts of Flink. So the binlog is not 
first shipped to Kafka first here, right? Flink CDC source directly read binlog 
from MySQL?
   
   Yes, you are right, flink-cdc-connector will directly read binlog from MySQL.
   
   > Personally, I don't see too much benefit of just scale up the parallelism 
for the IcebergStreamWriter operator. All the extra network shuffle/rebalance 
can be expensive too. I do see the need to scale up the job parallelism with 
higher traffic. hence I am wondering if we can also scale up the Flink CDC 
source operator along with other operators so that we can keep the chaining 
behavior.
   
   Actually I encountered this problem when I scale up the job parallelism, and 
that is the Case2 above. In some situation, we can not scale up the cdc source 
opr, such as flink-cdc-connector the source parallelism will always be 1. And I 
don't think we should not depend on keep src parallelism equal to FlinkSink to 
guarantee result correctness.
   
   > As you said already, this only works if the hash shuffle (based on the 
primary key) is the only network shuffle before the IcebergStreamWriter 
operator. That is why I have some reservations of putting this in the Flink 
Iceberg sink. This won't work if there is some other network shuffle before the 
Flink sink, which is outside of the control of the sink. Hence I am wondering 
if this should be handled outside of the Flink sink and we should just document 
the expected behavior of the input data to the Flink Iceberg sink.
   
   In CDC case, ff there is some other network shuffle(not base on the primary 
key) before the FlinkSink, that will lead to cdc record disorder, and yes, we 
could not get the correct result, because that is out of the control of the 
FlinkSink. But I think that is user's fault, in CDC case, user should make sure 
cdc data order correct and send to downstream. 
   
   Besides, we should also to consider the flink default network shuffle and 
could not manually control network shuffle(such as FlinkSQL, we could not 
easily to control network shuffle between two opr) before the FlinkSink 
situation. I think that is Iceberg should resolve, and don't need user to do 
this manually. User only need to setting correct `equalityFieldColumns` and 
send cdc/upsert data to FlinkSink, and Iceberg will handle the rest of the work 
well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to