[ 
https://issues.apache.org/jira/browse/FLINK-27738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17541231#comment-17541231
 ] 

LCER edited comment on FLINK-27738 at 5/24/22 7:17 AM:
-------------------------------------------------------

[~martijnvisser] ,I use flink mysql cdc  collect  data from mysql to kafka, but 
 some table  row data size is large than kafka default config value , in this 
case ,throw an exception :
org.apache.kafka.common.errors.RecordTooLargeException: The message is XXX 
bytes when serialized which is larger than the maximum request size you have 
configured with  the max.request.size configuration.
so ,I want use  KafkaSinkBuilder to config Topic of  {{max.message.bytes}} 
proeperty to resoleve the problem;
 


was (Author: JIRAUSER289859):
[~martijnvisser] ,I use flink mysql cdc  collect  data from mysql to kafka, but 
 some table  row data size is large than kafka default config value , in this 
case ,throw an exception :
org.apache.kafka.common.errors.RecordTooLargeException: The message is XXX 
bytes when serialized which is larger than XXX, which is the value of the 
max.request.size configuration.
so ,I want use  KafkaSinkBuilder to config Topic proeperties to resoleve the 
problem;
 

> instance KafkaSink support config topic properties
> --------------------------------------------------
>
>                 Key: FLINK-27738
>                 URL: https://issues.apache.org/jira/browse/FLINK-27738
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.0
>            Reporter: LCER
>            Priority: Major
>
> I use KafkaSink to config Kafka information as following:
> *KafkaSink.<String>builder()*
>  *.setBootstrapServers(brokers)*
>  *.setRecordSerializer(KafkaRecordSerializationSchema.builder()*
>  *.setTopicSelector(topicSelector)*
>  *.setValueSerializationSchema(new SimpleStringSchema())*
>  *.build()*
>  *)*
>  *.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)*
>  *.setKafkaProducerConfig(properties)*
>  *.build();*
> *----------------*
> *I can't find any method to support config topic properties*



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to