twalthr commented on a change in pull request #13862:
URL: https://github.com/apache/flink/pull/13862#discussion_r515878737



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java
##########
@@ -102,6 +108,36 @@
                .withDescription("Defines the format identifier for encoding 
data. " +
                        "The identifier is used to discover a suitable format 
factory.");
 
+       public static final ConfigOption<List<String>> KEY_FIELDS = 
ConfigOptions

Review comment:
       I was also unsure about this. We also have `KEY_FORMAT` and 
`VALUE_FORMAT` in this util. Do we want to move these as well?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -83,38 +97,53 @@
        protected final Properties properties;
 
        /** Partitioner to select Kafka partition for each item. */
-       protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
+       protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
 
        /** Sink commit semantic.*/
        protected final KafkaSinkSemantic semantic;
 
        public KafkaDynamicSink(
                        DataType physicalDataType,
+                       @Nullable EncodingFormat<SerializationSchema<RowData>> 
keyEncodingFormat,
+                       EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat,
+                       int[] keyProjection,
+                       int[] valueProjection,
+                       @Nullable String keyPrefix,
                        String topic,
                        Properties properties,
-                       Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
+                       @Nullable FlinkKafkaPartitioner<RowData> partitioner,
                        KafkaSinkSemantic semantic) {
+               // Format attributes
                this.physicalDataType = 
Preconditions.checkNotNull(physicalDataType, "Physical data type must not be 
null.");
+               this.keyEncodingFormat = keyEncodingFormat;
+               this.valueEncodingFormat = 
Preconditions.checkNotNull(valueEncodingFormat, "Value encoding format must not 
be null.");
+               this.keyProjection = Preconditions.checkNotNull(keyProjection, 
"Key projection must not be null.");
+               this.valueProjection = 
Preconditions.checkNotNull(valueProjection, "Value projection must not be 
null.");
+               this.keyPrefix = keyPrefix;
+               // Mutable attributes
                this.metadataKeys = Collections.emptyList();
+               // Kafka-specific attributes
                this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
                this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
-               this.partitioner = Preconditions.checkNotNull(partitioner, 
"Partitioner must not be null.");
-               this.encodingFormat = 
Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null.");
+               this.partitioner = partitioner;
                this.semantic = Preconditions.checkNotNull(semantic, "Semantic 
must not be null.");
        }
 
        @Override
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-               return this.encodingFormat.getChangelogMode();
+               return valueEncodingFormat.getChangelogMode();
        }
 
        @Override
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+               final SerializationSchema<RowData> keySerialization =
+                               createSerialization(context, keyEncodingFormat, 
keyProjection, keyPrefix);
+
                final SerializationSchema<RowData> valueSerialization =
-                               
this.encodingFormat.createRuntimeEncoder(context, this.physicalDataType);
+                               createSerialization(context, 
valueEncodingFormat, valueProjection, keyPrefix);

Review comment:
       Good review! This was one of the things where I was struggling. In the 
end, I decided for removing the prefix also from the values. IMHO the 
`keyPrefix` is just an intermediate helper option that should not affect the 
serialization data type. I think no user wants to have `user_id, key_name, 
payload` in the values. Esp. when `value.fields-include = ALL` we need to strip 
the prefix also from the value, otherwise a "logical" concept ends up in the 
physical data.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
##########
@@ -129,78 +142,183 @@ public DynamicTableSource 
createDynamicTableSource(Context context) {
 
                final DataType physicalDataType = 
context.getCatalogTable().getSchema().toPhysicalRowDataType();
 
+               final int[] keyProjection = 
FactoryUtil.createKeyFormatProjection(tableOptions, physicalDataType);
+
+               final int[] valueProjection = 
FactoryUtil.createValueFormatProjection(tableOptions, physicalDataType);
+
+               final String keyPrefix = 
tableOptions.getOptional(FactoryUtil.KEY_FIELDS_PREFIX).orElse(null);
+
                return createKafkaTableSource(

Review comment:
       I kept it for users that might want to extend `KafkaDynamicTableFactory` 
similar as `KafkaDynamicSource` has some protected methods that can be extended 
if needed.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -83,38 +97,53 @@
        protected final Properties properties;
 
        /** Partitioner to select Kafka partition for each item. */
-       protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
+       protected final @Nullable FlinkKafkaPartitioner<RowData> partitioner;
 
        /** Sink commit semantic.*/
        protected final KafkaSinkSemantic semantic;
 
        public KafkaDynamicSink(
                        DataType physicalDataType,
+                       @Nullable EncodingFormat<SerializationSchema<RowData>> 
keyEncodingFormat,
+                       EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat,
+                       int[] keyProjection,
+                       int[] valueProjection,
+                       @Nullable String keyPrefix,
                        String topic,
                        Properties properties,
-                       Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat,
+                       @Nullable FlinkKafkaPartitioner<RowData> partitioner,
                        KafkaSinkSemantic semantic) {
+               // Format attributes
                this.physicalDataType = 
Preconditions.checkNotNull(physicalDataType, "Physical data type must not be 
null.");
+               this.keyEncodingFormat = keyEncodingFormat;
+               this.valueEncodingFormat = 
Preconditions.checkNotNull(valueEncodingFormat, "Value encoding format must not 
be null.");
+               this.keyProjection = Preconditions.checkNotNull(keyProjection, 
"Key projection must not be null.");
+               this.valueProjection = 
Preconditions.checkNotNull(valueProjection, "Value projection must not be 
null.");
+               this.keyPrefix = keyPrefix;
+               // Mutable attributes
                this.metadataKeys = Collections.emptyList();
+               // Kafka-specific attributes
                this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
                this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
-               this.partitioner = Preconditions.checkNotNull(partitioner, 
"Partitioner must not be null.");
-               this.encodingFormat = 
Preconditions.checkNotNull(encodingFormat, "Encoding format must not be null.");
+               this.partitioner = partitioner;
                this.semantic = Preconditions.checkNotNull(semantic, "Semantic 
must not be null.");
        }
 
        @Override
        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
-               return this.encodingFormat.getChangelogMode();
+               return valueEncodingFormat.getChangelogMode();
        }
 
        @Override
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+               final SerializationSchema<RowData> keySerialization =
+                               createSerialization(context, keyEncodingFormat, 
keyProjection, keyPrefix);
+
                final SerializationSchema<RowData> valueSerialization =
-                               
this.encodingFormat.createRuntimeEncoder(context, this.physicalDataType);
+                               createSerialization(context, 
valueEncodingFormat, valueProjection, keyPrefix);

Review comment:
       Yes, I'm fine with this limitation. I will update the PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to