EricJoy2048 commented on code in PR #3085:
URL: 
https://github.com/apache/incubator-seatunnel/pull/3085#discussion_r995536895


##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding 
messages in the Kafka b
 
 NON does not provide any guarantees: messages may be lost in case of issues on 
the Kafka broker and messages may be duplicated.
 
+### partition_key [string]
+
+Determine the partition of the kafka send message based on the key.

Review Comment:
   Replace with `Configure which field is used as the key of the kafka message` 
better.



##########
docs/en/connector-v2/sink/Kafka.md:
##########
@@ -93,7 +111,9 @@ sink {
 
 ###  change log
 ####  next version
- 
+
  - Add kafka sink doc 
  - New feature : Kafka specified partition to send 
- - New feature : Determine the partition that kafka send based on the message 
content
+ - New feature : Determine the partition that kafka send messag based on the 
message content
+ - New feature : Determine the partition of the kafka send message based on 
the field name

Review Comment:
   Replace with `Configure which field is used as the key of the kafka message 
better` ?



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -63,7 +66,28 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     // check config
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord = 
seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord = null;
+        //Determine the partition of the kafka send message based on the field 
name
+        if (pluginConfig.hasPath(PARTITION_KEY)){
+            String keyField = pluginConfig.getString(PARTITION_KEY);
+            List<String> fields = 
Arrays.asList(seaTunnelRowType.getFieldNames());
+            String key;
+            if (fields.contains(keyField)) {
+                Object field = element.getField(fields.indexOf(keyField));
+                //If the field is null, send the message to the same partition
+                if (field == null) {
+                    key = "null";

Review Comment:
   It is not a good way. because there may be a lot of `null` values.  If you 
case `null` to "null", it means all of `null` values will be write to a same 
partition.
   
   You can keep `null` and update the code in 
`seaTunnelRowSerializer.serializeRowByKey(key, element)` like this
   ```
   return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), 
jsonSerializationSchema.serialize(row));
   ```
   
   if the key is null, kafka will send it to a random partition(random select a 
new partition per topic.metadata.refresh.ms).
   



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java:
##########
@@ -48,4 +48,10 @@ public ProducerRecord<byte[], byte[]> 
serializeRow(SeaTunnelRow row) {
             return new ProducerRecord<>(topic, null, 
jsonSerializationSchema.serialize(row));
         }
     }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, 
SeaTunnelRow row) {
+        return new ProducerRecord<>(topic, key.getBytes(), 
jsonSerializationSchema.serialize(row));

Review Comment:
   I am sorry, you need check `key` is null. Because you use `key.toBytes()` . 
You can update to 
   `return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), 
jsonSerializationSchema.serialize(row));`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to