[ 
https://issues.apache.org/jira/browse/FLINK-38519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian YEPES FERNANDEZ updated FLINK-38519:
----------------------------------------------
    Description: 
Hi all,

I’m currently using a custom Kafka producer but am considering switching to the 
native Flink Kafka Sink Connector. However, I’ve encountered some issues and 
exceptions in the process.
At the moment, I’m publishing JSON payloads with custom message keys to 
optimize partitioning and ordering. From what I can tell, the PyFlink interface 
doesn’t support this functionality directly. I even consulted Claude Code, 
which suggested that the only solution is to implement a custom Java 
serializer—but I’d prefer to avoid creating custom JARs if possible.

Below is a simplified example of what I’m trying to implement:

 
{code:java}
  def create_kafka_record(event):
    """Create Kafka record with kafkaKey = client:client_id"""
    client = event.get("name", "")
    client_id = event.get("id", "")
    kafka_key = f"{client}:{client_id}"
    return (kafka_key, json.dumps(event, default=str))
    
  
  kafka_records = processed_events.map(create_kafka_record, 
output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare Kafka 
Records")
  
  kafka_sink = 
KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
      
  kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
 

Does anyone know of any alternatives or workaround for this? Or could someone 
clarify if there are plans to enhance the PyFlink Java interface to support 
custom message keys and serializers?

I have attached the Claude Code analysis of the problem, hope this can help.

Thanks in advance for your help!

  was:
Hi all,

I’m currently using a custom Kafka producer but am considering switching to the 
native Flink Kafka Sink Connector. However, I’ve encountered some issues and 
exceptions in the process.
At the moment, I’m publishing JSON payloads with custom message keys to 
optimize partitioning and ordering. From what I can tell, the PyFlink interface 
doesn’t support this functionality directly. I even consulted Claude Code, 
which suggested that the only solution is to implement a custom Java 
serializer—but I’d prefer to avoid creating custom JARs if possible.

Below is a simplified example of what I’m trying to implement:

 
{code:java}
  def create_kafka_record(event):
    """Create Kafka record with kafkaKey = client:client_id"""
    client = event.get("name", "")
    client_id = event.get("id", "")
    kafka_key = f"{client}:{client_id}"
    return (kafka_key, json.dumps(event, default=str))
    
  
  kafka_records = processed_events.map(create_kafka_record, 
output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare Kafka 
Records")
  
  kafka_sink = 
KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
      
  kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
 

Does anyone know of any alternatives or work arounds for this? Or could someone 
clarify if there are plans to enhance the PyFlink Java interface to support 
custom message keys and serializers?

I have attached the Claude Code analysis of the problem, hope this can help.

Thanks in advance for your help!


> Custom Message Keys with PyFlink Kafka Sink
> -------------------------------------------
>
>                 Key: FLINK-38519
>                 URL: https://issues.apache.org/jira/browse/FLINK-38519
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 2.1.0, kafka-4.0.1
>         Environment: Apache Flink 2.1.0 with the bellow jars
> https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/4.0.1-2.0/flink-sql-connector-kafka-4.0.1-2.0.jar
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/4.1.0/kafka-clients-4.1.0.jar
> https://repo1.maven.org/maven2/org/apache/flink/flink-avro/2.1.0/flink-avro-2.1.0.jar
> https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/2.1.0/flink-avro-confluent-registry-2.1.0.jar
> https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/8.0.0/kafka-schema-registry-client-8.0.0.jar
> https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/8.0.0/kafka-avro-serializer-8.0.0.jar
> https://repo1.maven.org/maven2/org/apache/avro/avro/1.12.0/avro-1.12.0.jar
> https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-4/zstd-jni-1.5.7-4.jar
> https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.20.0/jackson-databind-2.20.0.jar
> https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.20.0/jackson-core-2.20.0.jar
> https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.20/jackson-annotations-2.20.jar
>            Reporter: Sebastian YEPES FERNANDEZ
>            Priority: Major
>              Labels: Kafka, producer
>         Attachments: Issue-Alternative.md, issue.md
>
>
> Hi all,
> I’m currently using a custom Kafka producer but am considering switching to 
> the native Flink Kafka Sink Connector. However, I’ve encountered some issues 
> and exceptions in the process.
> At the moment, I’m publishing JSON payloads with custom message keys to 
> optimize partitioning and ordering. From what I can tell, the PyFlink 
> interface doesn’t support this functionality directly. I even consulted 
> Claude Code, which suggested that the only solution is to implement a custom 
> Java serializer—but I’d prefer to avoid creating custom JARs if possible.
> Below is a simplified example of what I’m trying to implement:
>  
> {code:java}
>   def create_kafka_record(event):
>     """Create Kafka record with kafkaKey = client:client_id"""
>     client = event.get("name", "")
>     client_id = event.get("id", "")
>     kafka_key = f"{client}:{client_id}"
>     return (kafka_key, json.dumps(event, default=str))
>     
>   
>   kafka_records = processed_events.map(create_kafka_record, 
> output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare 
> Kafka Records")
>   
>   kafka_sink = 
> KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
>       
>   kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
>  
> Does anyone know of any alternatives or workaround for this? Or could someone 
> clarify if there are plans to enhance the PyFlink Java interface to support 
> custom message keys and serializers?
> I have attached the Claude Code analysis of the problem, hope this can help.
> Thanks in advance for your help!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to