tzulitai commented on code in PR #7:
URL:
https://github.com/apache/flink-connector-kafka/pull/7#discussion_r1189345491
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TwoPhaseCommittingStatefulSink.java:
##########
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
+ *
+ * <p>The purpose of this interface is to be able to pass an interface rather
than a {@link
+ * KafkaSink} implementation into the reducing sink which simplifies unit
testing.
Review Comment:
I guess it's probably not only for unit testing? Without the mixin interface
the `ReducingUpsertSink` and `ReducingUpsertWriter` wouldn't be able to
properly wrap the KafkaSink / KafkaWriter delegates.
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java:
##########
@@ -32,19 +33,21 @@
* A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper
{@link SinkWriter}
* and only emit it when the buffer is full or a timer is triggered or a
checkpoint happens.
*
- * <p>The sink provides eventual consistency guarantees without the need of a
two-phase protocol
- * because the updates are idempotent therefore duplicates have no effect.
+ * <p>The sink provides eventual consistency guarantees without under {@link
+ * org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because
the updates are
+ * idempotent therefore duplicates have no effect.
Review Comment:
```suggestion
* <p>The sink provides eventual consistency guarantees under {@link
* org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because
the updates are
* idempotent therefore duplicates have no effect.
```
##########
docs/content/docs/connectors/table/upsert-kafka.md:
##########
@@ -277,6 +293,19 @@ connector is working in the upsert mode, the last record
on the same key will ta
reading back as a source. Therefore, the upsert-kafka connector achieves
idempotent writes just like
the [HBase sink]({{< ref "docs/connectors/table/hbase" >}}).
+With Flink's checkpointing enabled, the `upsert-kafka` connector can provide
exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different
modes of operating chosen by passing appropriate `sink.delivery-guarantee`
option:
+
+* `none`: Flink will not guarantee anything. Produced records can be lost or
they can be duplicated.
+* `at-least-once` (default setting): This guarantees that no records will be
lost (although they can be duplicated).
+* `exactly-once`: Kafka transactions will be used to provide exactly-once
semantic. Whenever you write
+ to Kafka using transactions, do not forget about setting desired
`isolation.level` (`read_uncommitted`
+ or `read_committed` - the latter one is the default value) for any
application consuming records
+ from Kafka.
+
+Please refer to [Kafka documentation]({{< ref
"docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for
more caveats about delivery guarantees.
Review Comment:
```suggestion
Please refer to [Kafka connector documentation]({{< ref
"docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for
more caveats about delivery guarantees.
```
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TwoPhaseCommittingStatefulSink.java:
##########
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
+ *
+ * <p>The purpose of this interface is to be able to pass an interface rather
than a {@link
+ * KafkaSink} implementation into the reducing sink which simplifies unit
testing.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <WriterStateT> The type of the sink writer's state
+ * @param <CommT> The type of the committables.
+ */
+@Internal
+public interface TwoPhaseCommittingStatefulSink<InputT, WriterStateT, CommT>
Review Comment:
I'm a bit torn whether or not we should just accept this awkwardness and
move on. I understand the need for this "tag" mixin interface, but it just
feels really odd.
Interface mixins in Java really doesn't play well with the decorator pattern
😕
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]