tzulitai commented on code in PR #7:
URL: 
https://github.com/apache/flink-connector-kafka/pull/7#discussion_r1189345491


##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TwoPhaseCommittingStatefulSink.java:
##########
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
+ *
+ * <p>The purpose of this interface is to be able to pass an interface rather 
than a {@link
+ * KafkaSink} implementation into the reducing sink which simplifies unit 
testing.

Review Comment:
   I guess it's probably not only for unit testing? Without the mixin interface 
the `ReducingUpsertSink` and `ReducingUpsertWriter` wouldn't be able to 
properly wrap the KafkaSink / KafkaWriter delegates.



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertSink.java:
##########
@@ -32,19 +33,21 @@
  * A wrapper of a {@link Sink}. It will buffer the data emitted by the wrapper 
{@link SinkWriter}
  * and only emit it when the buffer is full or a timer is triggered or a 
checkpoint happens.
  *
- * <p>The sink provides eventual consistency guarantees without the need of a 
two-phase protocol
- * because the updates are idempotent therefore duplicates have no effect.
+ * <p>The sink provides eventual consistency guarantees without under {@link
+ * org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because 
the updates are
+ * idempotent therefore duplicates have no effect.

Review Comment:
   ```suggestion
    * <p>The sink provides eventual consistency guarantees under {@link
    * org.apache.flink.connector.base.DeliveryGuarantee#AT_LEAST_ONCE} because 
the updates are
    * idempotent therefore duplicates have no effect.
   ```



##########
docs/content/docs/connectors/table/upsert-kafka.md:
##########
@@ -277,6 +293,19 @@ connector is working in the upsert mode, the last record 
on the same key will ta
 reading back as a source. Therefore, the upsert-kafka connector achieves 
idempotent writes just like
 the [HBase sink]({{< ref "docs/connectors/table/hbase" >}}).
 
+With Flink's checkpointing enabled, the `upsert-kafka` connector can provide 
exactly-once delivery guarantees.
+
+Besides enabling Flink's checkpointing, you can also choose three different 
modes of operating chosen by passing appropriate `sink.delivery-guarantee` 
option:
+
+* `none`: Flink will not guarantee anything. Produced records can be lost or 
they can be duplicated.
+* `at-least-once` (default setting): This guarantees that no records will be 
lost (although they can be duplicated).
+* `exactly-once`: Kafka transactions will be used to provide exactly-once 
semantic. Whenever you write
+  to Kafka using transactions, do not forget about setting desired 
`isolation.level` (`read_uncommitted`
+  or `read_committed` - the latter one is the default value) for any 
application consuming records
+  from Kafka.
+
+Please refer to [Kafka documentation]({{< ref 
"docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for 
more caveats about delivery guarantees.

Review Comment:
   ```suggestion
   Please refer to [Kafka connector documentation]({{< ref 
"docs/connectors/datastream/kafka" >}}#kafka-producers-and-fault-tolerance) for 
more caveats about delivery guarantees.
   ```



##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/TwoPhaseCommittingStatefulSink.java:
##########
@@ -0,0 +1,34 @@
+package org.apache.flink.connector.kafka.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A combination of {@link TwoPhaseCommittingSink} and {@link StatefulSink}.
+ *
+ * <p>The purpose of this interface is to be able to pass an interface rather 
than a {@link
+ * KafkaSink} implementation into the reducing sink which simplifies unit 
testing.
+ *
+ * @param <InputT> The type of the sink's input
+ * @param <WriterStateT> The type of the sink writer's state
+ * @param <CommT> The type of the committables.
+ */
+@Internal
+public interface TwoPhaseCommittingStatefulSink<InputT, WriterStateT, CommT>

Review Comment:
   I'm a bit torn whether or not we should just accept this awkwardness and 
move on. I understand the need for this "tag" mixin interface, but it just 
feels really odd.
   
   Interface mixins in Java really doesn't play well with the decorator pattern 
😕



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to