AHeise commented on a change in pull request #16769:
URL: https://github.com/apache/flink/pull/16769#discussion_r686585342



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaSinkBuilder.java
##########
@@ -76,7 +76,7 @@
      * @param deliveryGuarantee
      * @return {@link KafkaSinkBuilder}
      */
-    KafkaSinkBuilder<IN> setDeliverGuarantee(DeliveryGuarantee 
deliveryGuarantee) {
+    public KafkaSinkBuilder<IN> setDeliverGuarantee(DeliveryGuarantee 
deliveryGuarantee) {

Review comment:
       🙈 

##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/DeliveryGuarantee.java
##########
@@ -17,25 +17,57 @@
 
 package org.apache.flink.connector.base;
 
+import org.apache.flink.configuration.DescribedEnum;
+import org.apache.flink.configuration.description.InlineElement;
+
+import static org.apache.flink.configuration.description.TextElement.text;
+
 /**
  * DeliverGuarantees that can be chosen. In general your pipeline can only 
offer the lowest delivery
  * guarantee which is supported by your sources and sinks.
  */
-public enum DeliveryGuarantee {
+public enum DeliveryGuarantee implements DescribedEnum {
     /**
      * Records are only delivered exactly-once also under failover scenarios. 
To build a complete
      * exactly-once pipeline is required that the source and sink support 
exactly-once and are
      * properly configured.
      */
-    EXACTLY_ONCE,
+    EXACTLY_ONCE(
+            "exactly-once",
+            text(
+                    "Records are only delivered exactly-once also under 
failover scenarios. To build a complete exactly-once pipeline is required that 
the source and sink support exactly-once and are properly configured.")),
     /**
      * Records are ensured to be delivered but it may happen that the same 
record is delivered
      * multiple times. Usually, this guarantee is faster than the exactly-once 
delivery.
      */
-    AT_LEAST_ONCE,
+    AT_LEAST_ONCE(
+            "at-least-once",
+            text(
+                    "Records are ensured to be delivered but it may happen 
that the same record is delivered multiple times. Usually, this guarantee is 
faster than the exactly-once delivery")),
     /**
      * Records are delivered on a best effort basis. It is often the fastest 
way to process records
      * but it may happen that records are lost or duplicated.
      */
-    NONE
+    NONE(
+            "none",
+            text(
+                    "Records are delivered on a best effort basis. It is often 
the fastest way to process records but it may happen that records are lost or 
duplicated."));
+
+    private final String value;

Review comment:
       name?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+    private final boolean upsertMode;
+
+    private transient boolean isPartitionerOpen = false;
+
+    DynamicKafkaRecordSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keySerialization != null && keyFieldGetters.length > 0,
+                    "Key must be set in upsert mode for serialization 
schema.");
+        }
+        this.topic = topic;
+        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+    }

Review comment:
       `checkNotNull`

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+    private final boolean upsertMode;
+
+    private transient boolean isPartitionerOpen = false;
+
+    DynamicKafkaRecordSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keySerialization != null && keyFieldGetters.length > 0,
+                    "Key must be set in upsert mode for serialization 
schema.");
+        }
+        this.topic = topic;
+        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(
+            RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+        if (partitioner != null && !isPartitionerOpen) {
+            partitioner.open(
+                    context.getParallelInstanceId(), 
context.getNumberOfParallelInstances());
+            isPartitionerOpen = true;
+        }

Review comment:
       Idea: Move to `open`. Get rid of `isPartitionerOpen`. Extend 
`KafkaRecordSerializationSchema#open` with 2. parameter `KafkaSinkContext`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaSerializationSchema.java
##########
@@ -120,13 +119,17 @@ public void 
open(SerializationSchema.InitializationContext context) throws Excep
         if (keySerialization == null) {

Review comment:
       Why do we still need this class?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -172,20 +185,43 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         final SerializationSchema<RowData> valueSerialization =
                 createSerialization(context, valueEncodingFormat, 
valueProjection, null);
 
-        final FlinkKafkaProducer<RowData> kafkaProducer =
-                createKafkaProducer(keySerialization, valueSerialization);
-
         if (flushMode.isEnabled() && upsertMode) {
             BufferedUpsertSinkFunction buffedSinkFunction =

Review comment:
       So imho we can safely drop everything that contains producer here ;)

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -323,18 +339,62 @@ public int hashCode() {
                         valueSerialization,
                         keyFieldGetters,
                         valueFieldGetters,
-                        hasMetadata,
+                        hasMetadata(),
                         metadataPositions,
                         upsertMode);
 
         return new FlinkKafkaProducer<>(
                 topic,
                 kafkaSerializer,
                 properties,
-                sinkSemantic.getSemantic(),
+                getSemantic(deliveryGuarantee),
                 FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
     }
 
+    private int[] getMetadataPositions(List<LogicalType> physicalChildren) {

Review comment:
       Do we need to execute it if `hasMetadata = false`? Can we maybe even 
encode it through returning `null` and get rid of `hasMetadata` parameter? 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -240,6 +235,18 @@
                                                     + "must be set to be 
greater than zero to enable sink buffer flushing.")
                                     .build());
 
+    public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE =
+            ConfigOptions.key("sink.delivery-guarantee")
+                    .enumType(DeliveryGuarantee.class)
+                    .defaultValue(DeliveryGuarantee.NONE)
+                    .withDescription("Optional delivery guarantee when 
committing.");
+
+    public static final ConfigOption<String> TRANSACTIONAL_ID_PREFIX =
+            ConfigOptions.key("sink.transactional-id-prefix")
+                    .stringType()
+                    .defaultValue(null)
+                    .withDescription("Optional delivery guarantee when 
committing.");

Review comment:
       c&p. 

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {

Review comment:
       Please use `@Nullable` consistently and pay attention what your IDE 
tells you (if configured correctly).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaWriter.java
##########
@@ -376,6 +378,11 @@ private void 
initMetrics(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
                         others,
                         kafkaSinkContext.getNumberOfParallelInstances())) {
             return log.getTransactionsToAbort();
+        } catch (KafkaException e) {
+            LOG.warn(
+                    "Cannot abort transactions before startup this might leave 
lingering transactions from previous executions.",

Review comment:
       + Lingering transactions may hold new data back from downstream 
consumers.
   Please abort these transactions manually or give this Flink application 
access to the __transaction_state topic.
   
   (or rephrase again - not 100% happy with my proposal)

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+    private final boolean upsertMode;
+
+    private transient boolean isPartitionerOpen = false;
+
+    DynamicKafkaRecordSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,

Review comment:
       Fields should also be Nullable. Alternatively, use null-object-pattern 
and use checkNotNull.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+    private final boolean upsertMode;
+
+    private transient boolean isPartitionerOpen = false;
+
+    DynamicKafkaRecordSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keySerialization != null && keyFieldGetters.length > 0,
+                    "Key must be set in upsert mode for serialization 
schema.");
+        }
+        this.topic = topic;
+        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(
+            RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+        if (partitioner != null && !isPartitionerOpen) {
+            partitioner.open(
+                    context.getParallelInstanceId(), 
context.getNumberOfParallelInstances());
+            isPartitionerOpen = true;
+        }
+        // shortcut in case no input projection is required
+        if (keySerialization == null && !hasMetadata) {

Review comment:
       I feel that this implicitly assumes that !hasMetadata => !upsertMode. 
Does that always hold? @twalthr ?
   
   I'd probably get rid of the shortcut here.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;

Review comment:
       We should have a delegating `SerializationSchema` that hides the 
projection from this `DynamicKafkaRecordSerializationSchema`.

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.table;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import 
org.apache.flink.streaming.connectors.kafka.sink.KafkaRecordSerializationSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link
+ * org.apache.flink.streaming.connectors.kafka.sink.KafkaSink}.
+ */
+class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+
+    private final String topic;
+    private final FlinkKafkaPartitioner<RowData> partitioner;
+    private final SerializationSchema<RowData> keySerialization;
+    private final SerializationSchema<RowData> valueSerialization;
+    private final RowData.FieldGetter[] keyFieldGetters;
+    private final RowData.FieldGetter[] valueFieldGetters;
+    private final boolean hasMetadata;
+    private final int[] metadataPositions;
+    private final boolean upsertMode;
+
+    private transient boolean isPartitionerOpen = false;
+
+    DynamicKafkaRecordSerializationSchema(
+            String topic,
+            @Nullable FlinkKafkaPartitioner<RowData> partitioner,
+            @Nullable SerializationSchema<RowData> keySerialization,
+            SerializationSchema<RowData> valueSerialization,
+            RowData.FieldGetter[] keyFieldGetters,
+            RowData.FieldGetter[] valueFieldGetters,
+            boolean hasMetadata,
+            int[] metadataPositions,
+            boolean upsertMode) {
+        if (upsertMode) {
+            Preconditions.checkArgument(
+                    keySerialization != null && keyFieldGetters.length > 0,
+                    "Key must be set in upsert mode for serialization 
schema.");
+        }
+        this.topic = topic;
+        this.partitioner = partitioner;
+        this.keySerialization = keySerialization;
+        this.valueSerialization = valueSerialization;
+        this.keyFieldGetters = keyFieldGetters;
+        this.valueFieldGetters = valueFieldGetters;
+        this.hasMetadata = hasMetadata;
+        this.metadataPositions = metadataPositions;
+        this.upsertMode = upsertMode;
+    }
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(
+            RowData consumedRow, KafkaSinkContext context, Long timestamp) {
+        if (partitioner != null && !isPartitionerOpen) {
+            partitioner.open(
+                    context.getParallelInstanceId(), 
context.getNumberOfParallelInstances());
+            isPartitionerOpen = true;
+        }
+        // shortcut in case no input projection is required
+        if (keySerialization == null && !hasMetadata) {
+            final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
+            return new ProducerRecord<>(
+                    topic,
+                    extractPartition(
+                            consumedRow,
+                            null,
+                            valueSerialized,
+                            context.getPartitionsForTopic(topic)),
+                    null,
+                    valueSerialized);
+        }
+        final byte[] keySerialized;
+        if (keySerialization == null) {
+            keySerialized = null;
+        } else {
+            final RowData keyRow = createProjectedRow(consumedRow, 
RowKind.INSERT, keyFieldGetters);
+            keySerialized = keySerialization.serialize(keyRow);
+        }
+
+        final byte[] valueSerialized;
+        final RowKind kind = consumedRow.getRowKind();
+        final RowData valueRow =
+                DynamicKafkaRecordSerializationSchema.createProjectedRow(
+                        consumedRow, kind, valueFieldGetters);

Review comment:
       Do on-demand. (Not needed in 50% of the cases)

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/sink/KafkaTransactionLog.java
##########
@@ -117,8 +118,9 @@
      * <li>Applies the rules from {@link TransactionsToAbortChecker}
      *
      * @return all transactionIds which must be aborted before starting new 
transactions.
+     * @throws KafkaException if no transaction information can be fetched.

Review comment:
       @throws KafkaException if no transaction information can be fetched 
(e.g., no access rights to __transaction_state).

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
##########
@@ -195,12 +196,6 @@
                                                     "custom class name (use 
custom FlinkKafkaPartitioner subclass)"))
                                     .build());
 
-    public static final ConfigOption<SinkSemantic> SINK_SEMANTIC =

Review comment:
       :+1: to fallback key `sink.semantic`. Are values compatible?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -323,18 +339,62 @@ public int hashCode() {
                         valueSerialization,
                         keyFieldGetters,
                         valueFieldGetters,
-                        hasMetadata,
+                        hasMetadata(),
                         metadataPositions,
                         upsertMode);
 
         return new FlinkKafkaProducer<>(
                 topic,
                 kafkaSerializer,
                 properties,
-                sinkSemantic.getSemantic(),
+                getSemantic(deliveryGuarantee),
                 FlinkKafkaProducer.DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
     }
 
+    private int[] getMetadataPositions(List<LogicalType> physicalChildren) {
+        final int[] metadataPositions =
+                Stream.of(WritableMetadata.values())
+                        .mapToInt(
+                                m -> {
+                                    final int pos = 
metadataKeys.indexOf(m.key);
+                                    if (pos < 0) {
+                                        return -1;
+                                    }
+                                    return physicalChildren.size() + pos;
+                                })
+                        .toArray();
+        return metadataPositions;
+    }
+
+    private boolean hasMetadata() {
+        final boolean hasMetadata = metadataKeys.size() > 0;
+        return hasMetadata;

Review comment:
       inline?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
##########
@@ -172,20 +185,43 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context 
context) {
         final SerializationSchema<RowData> valueSerialization =
                 createSerialization(context, valueEncodingFormat, 
valueProjection, null);
 
-        final FlinkKafkaProducer<RowData> kafkaProducer =
-                createKafkaProducer(keySerialization, valueSerialization);
-
         if (flushMode.isEnabled() && upsertMode) {
             BufferedUpsertSinkFunction buffedSinkFunction =

Review comment:
       I don't see the benefit of having that here 
`BufferedUpsertSinkFunction`. This is what Kafka producer is doing internally 
already. @twalthr any other opinion?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptionsUtil.java
##########
@@ -535,6 +538,15 @@ private static void autoCompleteSubject(
         }
     }
 
+    static void validateDeliveryGuarantee(ReadableConfig tableOptions) {
+        if 
(tableOptions.getOptional(DELIVERY_GUARANTEE).orElse(DELIVERY_GUARANTEE.defaultValue())
+                        == DeliveryGuarantee.EXACTLY_ONCE
+                && 
!tableOptions.getOptional(TRANSACTIONAL_ID_PREFIX).isPresent()) {
+            throw new ValidationException(
+                    "sink.transactional-id-prefix must be specified when using 
DeliveryGuarantee.EXACTLY_ONCE.");

Review comment:
       Is there any way to generate a kinda unique string from SQL that 
survives manual restarts? @twalthr 
   Then we could also convert the error into a warning and assign this 
transaction id.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to