hailin0 commented on code in PR #8724:
URL: https://github.com/apache/seatunnel/pull/8724#discussion_r1958223215


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java:
##########
@@ -110,8 +115,12 @@ public KafkaSinkWriter(
 
     @Override
     public void write(SeaTunnelRow element) {
-        ProducerRecord<byte[], byte[]> producerRecord =
-                seaTunnelRowSerializer.serializeRow(element);
+        ProducerRecord<byte[], byte[]> producerRecord;
+        if (isNative) {
+            producerRecord = 
seaTunnelRowSerializer.serializeNativeRow(element, seaTunnelRowType);

Review Comment:
   Why not override `topicExtractor`, `partitionExtractor`, ...
   
   remove this if-else



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java:
##########
@@ -211,7 +221,55 @@ private CatalogTable createCatalogTable(ReadonlyConfig 
readonlyConfig) {
                 readonlyConfig.getOptional(KafkaSourceOptions.SCHEMA);
         TablePath tablePath = TablePath.of(null, readonlyConfig.get(TOPIC));
         TableSchema tableSchema;
-        if (schemaOptions.isPresent()) {
+        MessageFormat format = readonlyConfig.get(FORMAT);
+
+        if (format == MessageFormat.NATIVE) {
+            tableSchema =
+                    TableSchema.builder()

Review Comment:
   move to static final field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to