fsk119 commented on a change in pull request #13850:
URL: https://github.com/apache/flink/pull/13850#discussion_r515758607



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
##########
@@ -96,32 +142,107 @@ public void deserialize(ConsumerRecord<byte[], byte[]> 
record, Collector<RowData
 
        // 
--------------------------------------------------------------------------------------------
 
-       private static final class MetadataAppendingCollector implements 
Collector<RowData>, Serializable {
+       private static final class BufferingCollector implements 
Collector<RowData>, Serializable {
+
+               private final List<RowData> buffer = new ArrayList<>();
+
+               @Override
+               public void collect(RowData record) {
+                       buffer.add(record);
+               }
+
+               @Override
+               public void close() {
+                       // nothing to do
+               }
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+
+       /**
+        * Emits a row with key, value, and metadata fields.
+        *
+        * <p>The collector is able to handle the following kinds of keys:
+        * <ul>
+        *     <li>No key is used.
+        *     <li>A key is used.
+        *     <li>The deserialization schema emits multiple keys.
+        * </ul>
+        */
+       private static final class OutputProjectionCollector implements 
Collector<RowData>, Serializable {
 
                private static final long serialVersionUID = 1L;
 
+               private final int[] keyProjection;
+
+               private final int[] valueProjection;
+
                private final MetadataConverter[] metadataConverters;
 
                private transient ConsumerRecord<?, ?> inputRecord;
 
+               private transient List<RowData> physicalKeyRows;
+
                private transient Collector<RowData> outputCollector;
 
-               MetadataAppendingCollector(MetadataConverter[] 
metadataConverters) {
+               OutputProjectionCollector(
+                               int[] keyProjection,
+                               int[] valueProjection,
+                               MetadataConverter[] metadataConverters) {
+                       this.keyProjection = keyProjection;
+                       this.valueProjection = valueProjection;
                        this.metadataConverters = metadataConverters;
                }
 
                @Override
-               public void collect(RowData physicalRow) {
-                       final GenericRowData genericPhysicalRow = 
(GenericRowData) physicalRow;
-                       final int physicalArity = physicalRow.getArity();
+               public void collect(RowData physicalValueRow) {
+                       // no key defined
+                       if (keyProjection.length == 0) {
+                               emitRow(null, (GenericRowData) 
physicalValueRow);
+                               return;
+                       }
+
+                       // emit a value for each key
+                       // if parsing key data gets problems, ignore the value 
data.
+                       for (RowData physicalKeyRow : physicalKeyRows) {
+                               if (physicalValueRow == null) {
+                                       
physicalKeyRow.setRowKind(RowKind.DELETE);
+                               }
+                               emitRow((GenericRowData) physicalKeyRow, 
(GenericRowData) physicalValueRow);
+                       }
+               }
+
+               @Override
+               public void close() {
+                       // nothing to do
+               }
+
+               /**
+                * There are 4 situations:
+                * 1. key is null && value is not null => project and set 
rowkind = insert
+                * 2. key is not null && value is not null => project and set 
rowkind = insert
+                * 3. key is not null && value is null => project and set 
rowkind = delete
+                * 4. key is null && value is null => impossible
+                * This situation is impossible:
+                *   keyProjection.length > 0 && key == null
+                * */
+               private void emitRow(@Nullable GenericRowData physicalKeyRow, 
@Nullable GenericRowData physicalValueRow) {
                        final int metadataArity = metadataConverters.length;
+                       final int physicalArity = keyProjection.length + 
valueProjection.length;
+                       final RowKind rowkind = physicalValueRow == null ? 
RowKind.DELETE : physicalValueRow.getRowKind();

Review comment:
       It is a inner colletor. It will receive the tombstone only when the 
outer class `DynamicKafkaDeserializationSchema` decide to emitrow. Therefore, 
it doesn't need a flag to detect the mode of the deserialization.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -86,39 +98,38 @@ public void open(SerializationSchema.InitializationContext 
context) throws Excep
 
        @Override
        public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, 
@Nullable Long timestamp) {
-               final RowData physicalRow;
-               // shortcut if no metadata is required
-               if (!hasMetadata) {
-                       physicalRow = consumedRow;
-               } else {
-                       final int physicalArity = physicalFieldGetters.length;
-                       final GenericRowData genericRowData = new 
GenericRowData(
-                                       consumedRow.getRowKind(),
-                                       physicalArity);
-                       for (int i = 0; i < physicalArity; i++) {
-                               genericRowData.setField(i, 
physicalFieldGetters[i].getFieldOrNull(consumedRow));
-                       }
-                       physicalRow = genericRowData;
+               // shortcut in case no input projection is required
+               if (keySerialization == null && !hasMetadata) {
+                       final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
+                       return new ProducerRecord<>(
+                                       topic,
+                                       extractPartition(consumedRow, null, 
valueSerialized),
+                                       null,
+                                       valueSerialized);
                }
 
-               final byte[] valueSerialized = 
valueSerialization.serialize(physicalRow);
-
-               final Integer partition;
-               if (partitioner != null) {
-                       partition = partitioner.partition(physicalRow, null, 
valueSerialized, topic, partitions);
+               final byte[] keySerialized;
+               final RowKind kind = consumedRow.getRowKind();
+               if (keySerialization == null) {
+                       keySerialized = null;
                } else {
-                       partition = null;
+                       final RowData keyRow = createProjectedRow(consumedRow, 
kind, keyFieldGetters);
+                       keySerialized = keySerialization.serialize(keyRow);
                }
 
-               // shortcut if no metadata is required
-               if (!hasMetadata) {
-                       return new ProducerRecord<>(topic, partition, null, 
null, valueSerialized);
+               final byte[] valueSerialized;
+               if (kind == RowKind.DELETE) {
+                       valueSerialized = null;
+               } else {
+                       final RowData valueRow = 
createProjectedRow(consumedRow, kind, valueFieldGetters);
+                       valueSerialized = 
valueSerialization.serialize(valueRow);

Review comment:
       If uspert mode is true, we should transfrom the delete/update-before 
message to tombstone message. But I am not sure what behavirour of the 
`DynamicKafkaSerializationSchema`? It has no means to discard the row it 
receives. If we have the same behaviour, I think it's useless to introduce a 
flag.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to