wuchong commented on a change in pull request #12805:
URL: https://github.com/apache/flink/pull/12805#discussion_r449967255
##########
File path:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.java
##########
@@ -386,6 +411,7 @@ private CatalogTable
createKafkaSinkCatalogTable(Map<String, String> options) {
tableOptions.put("properties.group.id", "dummy");
tableOptions.put("properties.bootstrap.servers", "dummy");
tableOptions.put("sink.partitioner",
KafkaOptions.SINK_PARTITIONER_VALUE_FIXED);
+ tableOptions.put("sink.semantic",
KafkaOptions.SINK_SEMANTIC_VALUE_AT_LEAST_ONCE);
Review comment:
use exactly-once to verify the configuration works, because default
value is at-least-once.
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -165,6 +165,14 @@ Connector Options
</ul>
</td>
</tr>
+ <tr>
+ <td><h5>sink.semantic</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">at-least-once</td>
+ <td>String</td>
+ <td>Optional semantic when commit. Valid enumerationns are
['at-lease-once', 'exactly-once', 'none'].
+ Only Kafka whose version greater than 1.0.0 support 'exactly-once'
with checkpointing enabled.</td>
Review comment:
```suggestion
<td>Defines the delivery semantic for the Kafka sink. Valid
enumerationns are <code>'at-lease-once'</code>, <code>'exactly-once'</code> and
<code>'none'</code>. <code>'kafka-0.10'</code> doesn't support this option. See
<a href='#consistency-guarantees'>Consistency guarantees</a> for more details.
</td>
```
##########
File path: docs/dev/table/connectors/kafka.md
##########
@@ -206,6 +214,8 @@ However, it will cause a lot of network connections between
all the Flink instan
### Consistency guarantees
By default, a Kafka sink ingests data with at-least-once guarantees into a
Kafka topic if the query is executed with [checkpointing enabled]({% link
dev/stream/state/checkpointing.md %}#enabling-and-configuring-checkpointing).
+With Kafka whose version is greater than 1.0.0, `sink.semantic` can provide
exactly-once delivery guarantee. Whenever you write to Kafka using
transactions, do not forget about setting the desired `isolation.level`
+(`read_committed` or `read_uncommitted` - latter one is the default value) for
any application consuming records from Kafka.
Review comment:
```suggestion
With Flink's checkpointing enabled, the `kafka` and `kafka-0.11` connectors
can provide exactly-once delivery guarantees.
Besides enabling Flink's checkpointing, you can also choose three different
modes of operating chosen by passing appropriate `sink.semantic` option:
* `NONE`: Flink will not guarantee anything. Produced records can be lost
or they can be duplicated.
* `AT_LEAST_ONCE` (default setting): This guarantees that no records will
be lost (although they can be duplicated).
* `EXACTLY_ONCE`: Kafka transactions will be used to provide exactly-once
semantic. Whenever you write
to Kafka using transactions, do not forget about setting desired
`isolation.level` (`read_committed`
or `read_uncommitted` - the latter one is the default value) for any
application consuming records
from Kafka.
Please refer to [Kafka documentation]({% link dev/connectors/kafka.md
%}#kafka-producers-and-fault-tolerance) for more caveats about delivery
guarantees.
```
##########
File path:
flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicSink.java
##########
@@ -42,26 +44,31 @@ public Kafka011DynamicSink(
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
- EncodingFormat<SerializationSchema<RowData>>
encodingFormat) {
+ EncodingFormat<SerializationSchema<RowData>>
encodingFormat,
+ String semantic) {
super(
consumedDataType,
topic,
properties,
partitioner,
- encodingFormat);
+ encodingFormat,
+ semantic);
}
@Override
protected SinkFunction<RowData> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
- Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
+ Optional<FlinkKafkaPartitioner<RowData>> partitioner,
+ String semantic) {
return new FlinkKafkaProducer011<>(
topic,
- serializationSchema,
+ new
KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
- partitioner);
+ partitioner,
+ getSemantic(semantic),
Review comment:
I suggest to move this logic into `KafkaOptions` to avoid duplicate
code. You can have a table level enum class `SinkSemantic`. And simply call
`FlinkKafkaProducer.Semantic.valueOf(semantic.name())` here.
##########
File path:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaOptions.java
##########
@@ -191,6 +208,21 @@ private static void validateSinkPartitioner(ReadableConfig
tableOptions) {
});
}
+ private static void validateSinkSemantic(ReadableConfig tableOptions,
String kafkaVersion) {
+ tableOptions.getOptional(SINK_SEMANTIC).ifPresent(semantic -> {
+ if (!SINK_SEMANTIC_ENUMS.contains(semantic)){
+ throw new ValidationException(
+ String.format("Unsupported value '%s'
for '%s'. Supported values are ['at-least-once', 'exactly-once', 'none'].",
+ semantic, SINK_SEMANTIC.key()));
+ }
+
+ if (kafkaVersion.equals("kafka-0.10") &&
(!SINK_SEMANTIC_VALUE_AT_LEAST_ONCE.equals(semantic))){
Review comment:
This is indeed what we should avoid. A base class shouldn't depends on
specific connector implementation. We should move this special logic into
`Kafka010DynamicTableFactory`.
A simple way to disallow this option in kafka-0.10 is override
`optionalOptions()` and remove `SINK_SEMANTIC` from it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]