EMsnap commented on code in PR #6298:
URL: https://github.com/apache/inlong/pull/6298#discussion_r1006357137


##########
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##########
@@ -172,6 +189,101 @@ public ProducerRecord<byte[], byte[]> serialize(RowData 
consumedRow, @Nullable L
                 readMetadata(consumedRow, 
KafkaDynamicSink.WritableMetadata.HEADERS));
     }
 
+    /**
+     * Serialize for list it is used for multiple sink scenes when a record 
contains mulitple real records.
+     *
+     * @param consumedRow The consumeRow
+     * @param timestamp The timestamp
+     * @return List of ProducerRecord
+     */
+    public List<ProducerRecord<byte[], byte[]>> serializeForList(RowData 
consumedRow, @Nullable Long timestamp) {
+        if (!multipleSink) {
+            return Collections.singletonList(serialize(consumedRow, 
timestamp));
+        }
+        List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
+        try {
+            JsonNode rootNode = 
jsonDynamicSchemaFormat.deserialize(consumedRow.getBinary(0));
+            boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
+            if (isDDL) {
+                values.add(new ProducerRecord<>(
+                        jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+                        extractPartition(consumedRow, null, 
consumedRow.getBinary(0)),
+                        null,
+                        consumedRow.getBinary(0)));
+                return values;
+            }
+            JsonNode updateBeforeNode = 
jsonDynamicSchemaFormat.getUpdateBefore(rootNode);
+            JsonNode updateAfterNode = 
jsonDynamicSchemaFormat.getUpdateAfter(rootNode);
+            boolean splitRequired = (updateAfterNode != null && 
updateAfterNode.isArray()

Review Comment:
   maybe a method for such operation would make the code cleaner



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to