wuchong commented on a change in pull request #12805:
URL: https://github.com/apache/flink/pull/12805#discussion_r451951070
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections
between all the Flink instan
By default, a Kafka sink ingests data with at-least-once guarantees into a
Kafka topic if the query is executed with [checkpointing enabled]({% link
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors
can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different
modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or
they can be duplicated.
+ * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be
lost (although they can be duplicated).
Review comment:
```suggestion
* `at-least-once` (default setting): This guarantees that no records will
be lost (although they can be duplicated).
```
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections
between all the Flink instan
By default, a Kafka sink ingests data with at-least-once guarantees into a
Kafka topic if the query is executed with [checkpointing enabled]({% link
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors
can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different
modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or
they can be duplicated.
+ * `AT_LEAST_ONCE` (default setting): This guarantees that no records will be
lost (although they can be duplicated).
+ * `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once
semantic. Whenever you write
Review comment:
```suggestion
* `exactly-once`: Kafka transactions will be used to provide exactly-once
semantic. Whenever you write
```
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -207,6 +214,19 @@ However, it will cause a lot of network connections
between all the Flink instan
By default, a Kafka sink ingests data with at-least-once guarantees into a
Kafka topic if the query is executed with [checkpointing enabled]({% link
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors
can provide exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different
modes of operating chosen by passing appropriate `sink.semantic` option:
+
+ * `NONE`: Flink will not guarantee anything. Produced records can be lost or
they can be duplicated.
Review comment:
```suggestion
* `none`: Flink will not guarantee anything. Produced records can be lost
or they can be duplicated.
```
##########
File path:
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka010DynamicTableFactory.java
##########
@@ -65,18 +69,27 @@ protected KafkaDynamicSinkBase createKafkaTableSink(
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>>
encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>>
encodingFormat,
+ KafkaSemantic semantic) {
return new Kafka010DynamicSink(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ final Set<ConfigOption<?>> options = super.optionalOptions();
+ options.remove(SINK_SEMANTIC);
Review comment:
Add a comment on this to explain why we remove sink semantic in 0.10
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]