dannycranmer commented on code in PR #517:
URL: https://github.com/apache/flink-web/pull/517#discussion_r866808673


##########
_posts/2022-03-16-async-sink-base.md:
##########
@@ -0,0 +1,169 @@
+---
+layout: post
+title: "The Generic Asynchronous Base Sink"
+date: 2022-04-30 16:00:00
+authors:
+- CrynetLogistics:
+  name: "Zichen Liu"
+excerpt: An overview of the new AsyncBaseSink and how to use it for building 
your own concrete sink
+---
+
+Flink sinks share a lot of similar behavior. Most sinks batch records 
according to user-defined buffering hints, sign requests, write them to the 
destination, retry unsuccessful or throttled requests, and participate in 
checkpointing.
+
+This is why for Flink 1.15 we have decided to create the [`AsyncSinkBase` 
(FLIP-171)](https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink),
 an abstract sink with a number of common functionalities extracted. 
+
+This is a base implementation for asynchronous sinks, which you should use 
whenever you need to implement a sink that doesn't offer transactional 
capabilities. Adding support for a new destination now only requires a 
lightweight shim that implements the specific interfaces of the destination 
using a client that supports async requests.
+
+This common abstraction will reduce the effort required to maintain individual 
sinks that extend from this abstract sink, with bug fixes and improvements to 
the sink core benefiting all implementations that extend it. The design of 
`AsyncSinkBase` focuses on extensibility and a broad support of destinations. 
The core of the sink is kept generic and free of any connector-specific 
dependencies.
+
+The sink base is designed to participate in checkpointing to provide 
at-least-once semantics and can work directly with destinations that provide a 
client that supports asynchronous requests.
+
+In this post, we will go over the details of the AsyncSinkBase so that you can 
start using it to build your own concrete sink.
+
+{% toc %}
+
+# Adding the base sink as a dependency
+
+In order to use the base sink, you will need to add the following dependency 
to your project. The example below follows the Maven syntax:
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-base</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+# The Public Interfaces of AsyncSinkBase
+
+## Generic Types
+
+`<InputT>` – type of elements in a DataStream that should be passed to the sink
+
+`<RequestEntryT>` – type of a payload containing the element and additional 
metadata that is required to submit a single element to the destination
+
+
+## Element Converter Interface
+
+[ElementConverter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java)
+
+```java
+public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
+    RequestEntryT apply(InputT element, SinkWriter.Context context);
+}
+```
+The concrete sink implementation should provide a way to convert from an 
element in the DataStream to the payload type that contains all the additional 
metadata required to submit that element to the destination by the sink. 
Ideally, this would be encapsulated from the end user since it allows concrete 
sink implementers to adapt to changes in the destination API without breaking 
end user code.
+
+## Sink Writer Interface
+
+[AsyncSinkWriter](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java)
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>
+        implements StatefulSink.StatefulSinkWriter<InputT, 
BufferedRequestState<RequestEntryT>> {
+    // ...
+    protected abstract void submitRequestEntries(
+            List<RequestEntryT> requestEntries, Consumer<List<RequestEntryT>> 
requestResult);
+    // ...
+}
+```
+
+In this method, sink implementers should use the destination clients to submit 
`requestEntries` asynchronously to be written.
+
+Should any elements fail to be persisted, they should be requeued back in the 
buffer for retry using `requestResult.accept(...list of failed entries...)`. 
However, retrying any element that is known to be faulty and consistently 
failing, will result in that element being requeued forever, therefore a 
sensible strategy for determining what should be retried is highly recommended. 
If no errors were returned, we must indicate this with 
`requestResult.accept(Collections.emptyList())`.
+
+If at any point, it is determined that a fatal error has occurred and that we 
should throw a runtime exception from the sink, we can call 
`getFatalExceptionCons().accept(...);` from anywhere in the concrete sink 
writer.
+
+```java
+public abstract class AsyncSinkWriter<InputT, RequestEntryT extends 
Serializable>

Review Comment:
   Java serialization is not actually used in the 
`AsyncSinkWriterStateSerializer`. It was in the original PR, which was rejected 
and subsequently reimplemented. Looks like `extends Serializable` was then not 
removed, which it should have been. This is a smell in the interface, since 
this is `@PublicEvolving` we can consider removing this in Flink 1.16 with 
community buy-in. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to