bgeng777 commented on code in PR #20263:
URL: https://github.com/apache/flink/pull/20263#discussion_r930839985


##########
flink-python/pyflink/datastream/connectors/kafka.py:
##########
@@ -816,3 +826,340 @@ def offsets(offsets: Dict['KafkaTopicPartition', int],
             enumerator.initializer.OffsetsInitializer
         return KafkaOffsetsInitializer(JOffsetsInitializer.offsets(
             j_map_wrapper.asMap(), 
offset_reset_strategy._to_j_offset_reset_strategy()))
+
+
+class KafkaSink(Sink, SupportPreprocessing):
+    """
+    Flink Sink to produce data into a Kafka topic. The sink supports all 
delivery guarantees
+    described by :class:`DeliveryGuarantee`.
+
+    * :attr:`DeliveryGuarantee.NONE` does not provide any guarantees: messages 
may be lost in case
+      of issues on the Kafka broker and messages may be duplicated in case of 
a Flink failure.
+    * :attr:`DeliveryGuarantee.AT_LEAST_ONCE` the sink will wait for all 
outstanding records in the
+      Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. 
No messages will be
+      lost in case of any issue with the Kafka brokers but messages may be 
duplicated when Flink
+      restarts.
+    * :attr:`DeliveryGuarantee.EXACTLY_ONCE`: In this mode the KafkaSink will 
write all messages in
+      a Kafka transaction that will be committed to Kafka on a checkpoint. 
Thus, if the consumer
+      reads only committed data (see Kafka consumer config 
``isolation.level``), no duplicates
+      will be seen in case of a Flink restart. However, this delays record 
writing effectively
+      until a checkpoint is written, so adjust the checkpoint duration 
accordingly. Please ensure
+      that you use unique transactional id prefixes across your applications 
running on the same
+      Kafka cluster such that multiple running jobs do not interfere in their 
transactions!
+      Additionally, it is highly recommended to tweak Kafka transaction 
timeout (link) >> maximum
+      checkpoint duration + maximum restart duration or data loss may happen 
when Kafka expires an
+      uncommitted transaction.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self, j_kafka_sink, preprocessing: TransformAppender = None):
+        super().__init__(j_kafka_sink)
+        self._preprocessing = preprocessing
+
+    @staticmethod
+    def builder() -> 'KafkaSinkBuilder':
+        """
+        Create a :class:`KafkaSinkBuilder` to construct :class:`KafkaSink`.
+        """
+        return KafkaSinkBuilder()
+
+    def need_preprocessing(self) -> bool:
+        return self._preprocessing is not None
+
+    def get_preprocessing(self) -> TransformAppender:
+        return self._preprocessing
+
+
+class KafkaSinkBuilder(object):
+    """
+    Builder to construct :class:`KafkaSink`.
+
+    The following example shows the minimum setup to create a KafkaSink that 
writes String values
+    to a Kafka topic.
+
+    ::
+
+        >>> record_serializer = KafkaRecordSerializationSchema.builder() \\
+        ...     .set_topic(MY_SINK_TOPIC) \\
+        ...     .set_value_serialization_schema(SimpleStringSchema()) \\
+        ...     .build()
+        >>> sink = KafkaSink.builder() \\
+        ...     .set_bootstrap_servers(MY_BOOTSTRAP_SERVERS) \\
+        ...     .set_record_serializer(record_serializer) \\
+        ...     .build()
+
+    One can also configure different :class:`DeliveryGuarantee` by using
+    :meth:`set_delivery_guarantee` but keep in mind when using
+    :attr:`DeliveryGuarantee.EXACTLY_ONCE`, one must set the transactional id 
prefix
+    :meth:`set_transactional_id_prefix`.
+
+    .. versionadded:: 1.16.0
+    """
+
+    def __init__(self):
+        jvm = get_gateway().jvm
+        self._j_builder = 
jvm.org.apache.flink.connector.kafka.sink.KafkaSink.builder()
+        self._preprocessing = None
+
+    def build(self) -> 'KafkaSink':
+        """
+        Constructs the :class:`KafkaSink` with the configured properties.
+        """
+        if self._preprocessing is None:
+            return KafkaSink(self._j_builder.build())
+        else:
+            return KafkaSink(self._j_builder.build(), self._preprocessing)

Review Comment:
   directly `return KafkaSink(self._j_builder.build(), self._preprocessing)` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to