[
https://issues.apache.org/jira/browse/FLINK-38519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sebastian YEPES FERNANDEZ updated FLINK-38519:
----------------------------------------------
Description:
Hi all,
I’m currently using a custom Kafka producer but am considering switching to the
native Flink Kafka Sink Connector. However, I’ve encountered some issues and
exceptions in the process.
At the moment, I’m publishing JSON payloads with custom message keys to
optimize partitioning and ordering. From what I can tell, the PyFlink interface
doesn’t support this functionality directly. I even consulted Claude Code,
which suggested that the only solution is to implement a custom Java
serializer—but I’d prefer to avoid creating custom JARs if possible.
Below is a simplified example of what I’m trying to implement:
{code:java}
def create_kafka_record(event):
"""Create Kafka record with kafkaKey = client:client_id"""
client = event.get("name", "")
client_id = event.get("id", "")
kafka_key = f"{client}:{client_id}"
return (kafka_key, json.dumps(event, default=str))
kafka_records = processed_events.map(create_kafka_record,
output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare Kafka
Records")
kafka_sink =
KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
Does anyone know of any alternatives or workaround for this? Or could someone
clarify if there are plans to enhance the PyFlink Java interface to support
custom message keys and serializers?
I have attached the Claude Code analysis of the problem, hope this can help.
Thanks in advance for your help!
was:
Hi all,
I’m currently using a custom Kafka producer but am considering switching to the
native Flink Kafka Sink Connector. However, I’ve encountered some issues and
exceptions in the process.
At the moment, I’m publishing JSON payloads with custom message keys to
optimize partitioning and ordering. From what I can tell, the PyFlink interface
doesn’t support this functionality directly. I even consulted Claude Code,
which suggested that the only solution is to implement a custom Java
serializer—but I’d prefer to avoid creating custom JARs if possible.
Below is a simplified example of what I’m trying to implement:
{code:java}
def create_kafka_record(event):
"""Create Kafka record with kafkaKey = client:client_id"""
client = event.get("name", "")
client_id = event.get("id", "")
kafka_key = f"{client}:{client_id}"
return (kafka_key, json.dumps(event, default=str))
kafka_records = processed_events.map(create_kafka_record,
output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare Kafka
Records")
kafka_sink =
KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
Does anyone know of any alternatives or work arounds for this? Or could someone
clarify if there are plans to enhance the PyFlink Java interface to support
custom message keys and serializers?
I have attached the Claude Code analysis of the problem, hope this can help.
Thanks in advance for your help!
> Custom Message Keys with PyFlink Kafka Sink
> -------------------------------------------
>
> Key: FLINK-38519
> URL: https://issues.apache.org/jira/browse/FLINK-38519
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 2.1.0, kafka-4.0.1
> Environment: Apache Flink 2.1.0 with the bellow jars
> https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/4.0.1-2.0/flink-sql-connector-kafka-4.0.1-2.0.jar
> https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/4.1.0/kafka-clients-4.1.0.jar
> https://repo1.maven.org/maven2/org/apache/flink/flink-avro/2.1.0/flink-avro-2.1.0.jar
> https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/2.1.0/flink-avro-confluent-registry-2.1.0.jar
> https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/8.0.0/kafka-schema-registry-client-8.0.0.jar
> https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/8.0.0/kafka-avro-serializer-8.0.0.jar
> https://repo1.maven.org/maven2/org/apache/avro/avro/1.12.0/avro-1.12.0.jar
> https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.7-4/zstd-jni-1.5.7-4.jar
> https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.20.0/jackson-databind-2.20.0.jar
> https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.20.0/jackson-core-2.20.0.jar
> https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.20/jackson-annotations-2.20.jar
> Reporter: Sebastian YEPES FERNANDEZ
> Priority: Major
> Labels: Kafka, producer
> Attachments: Issue-Alternative.md, issue.md
>
>
> Hi all,
> I’m currently using a custom Kafka producer but am considering switching to
> the native Flink Kafka Sink Connector. However, I’ve encountered some issues
> and exceptions in the process.
> At the moment, I’m publishing JSON payloads with custom message keys to
> optimize partitioning and ordering. From what I can tell, the PyFlink
> interface doesn’t support this functionality directly. I even consulted
> Claude Code, which suggested that the only solution is to implement a custom
> Java serializer—but I’d prefer to avoid creating custom JARs if possible.
> Below is a simplified example of what I’m trying to implement:
>
> {code:java}
> def create_kafka_record(event):
> """Create Kafka record with kafkaKey = client:client_id"""
> client = event.get("name", "")
> client_id = event.get("id", "")
> kafka_key = f"{client}:{client_id}"
> return (kafka_key, json.dumps(event, default=str))
>
>
> kafka_records = processed_events.map(create_kafka_record,
> output_type=Types.TUPLE([Types.STRING(), Types.STRING()])).name("Prepare
> Kafka Records")
>
> kafka_sink =
> KafkaSink.builder().set_bootstrap_servers(bootstrap_servers).build()
>
> kafka_records.sink_to(kafka_sink).name("Sink to Kafka") {code}
>
> Does anyone know of any alternatives or workaround for this? Or could someone
> clarify if there are plans to enhance the PyFlink Java interface to support
> custom message keys and serializers?
> I have attached the Claude Code analysis of the problem, hope this can help.
> Thanks in advance for your help!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)