[ 
https://issues.apache.org/jira/browse/FLINK-24234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zichen Liu updated FLINK-24234:
-------------------------------
    Description: 
*User stories*

 * As a Sink user, I’d like to configure the batch size for items to send to 
the destination at once (e.g. “flush if there are x number of items in the 
batch”)
 * As a Sink user, I’d like to configure the batching logic so that I can flush 
the batch of requests based on time period (e.g. “flush every 2 seconds”)
 * As a Sink user I’d like to specify the number of bytes for the batch of 
requests to be flushed (e.g. ”submit the batch after the total number of bytes 
in it is above 1KB”)
 * As a Sink developer, I’d like to use the configuration mechanism provided to 
allow Sink users to configure my Sink implementation
 * 

{{*Scope*
}}
 * Allow Sink developers and users to pass batch size config to the 
AsyncSinkWriter
 * Add support for time-based flushing (e.g. “flush after x miliseconds”) using 
the ProcessingTimeService which is part of the Sink interface
 * Add support for byte-based flushing
 * Consider the combination of time-based flushing and byte-based flushing, if 
there are more bytes than configured in the time-based batch, then the last few 
(however many necessary) items should go in the next batch to satisfy the 
requirement for the number of bytes.

*References*

More details to be found 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]

  was:
h2. Motivation

Apache Flink has a rich connector ecosystem that can persist data in various 
destinations. Flink natively supports Apache Kafka, Amazon Kinesis Data 
Streams, Elasticsearch, HBase, and many more destinations. Additional 
connectors are maintained in Apache Bahir or directly on GitHub. The basic 
functionality of these sinks is quite similar. They batch events according to 
user defined buffering hints, sign requests and send them to the respective 
endpoint, retry unsuccessful or throttled requests, and participate in 
checkpointing. They primarily just differ in the way they interface with the 
destination. Yet, all the above-mentioned sinks are developed and maintained 
independently.

We hence propose to create a sink that abstracts away this common functionality 
into a generic sink. Adding support for a new destination then just means 
creating a lightweight shim that only implements the specific interfaces of the 
destination using a client that supports async requests. Having a common 
abstraction will reduce the effort required to maintain all these individual 
sinks. It will also make it much easier and faster to create integrations with 
additional destinations. Moreover, improvements or bug fixes to the core of the 
sink will benefit all implementations that are based on it.

The design of the sink focusses on extensibility and a broad support of 
destinations. The core of the sink is kept generic and free of any connector 
specific dependencies. The sink is designed to participate in checkpointing to 
provide at-least once semantics, but it is limited to destinations that provide 
a client that supports async requests. 
h2. References

More details to be found 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink


> [FLIP-171] Byte Based & Time Based Flushing for AsyncSinkBase
> -------------------------------------------------------------
>
>                 Key: FLINK-24234
>                 URL: https://issues.apache.org/jira/browse/FLINK-24234
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Common
>            Reporter: Zichen Liu
>            Assignee: Zichen Liu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> *User stories*
>  * As a Sink user, I’d like to configure the batch size for items to send to 
> the destination at once (e.g. “flush if there are x number of items in the 
> batch”)
>  * As a Sink user, I’d like to configure the batching logic so that I can 
> flush the batch of requests based on time period (e.g. “flush every 2 
> seconds”)
>  * As a Sink user I’d like to specify the number of bytes for the batch of 
> requests to be flushed (e.g. ”submit the batch after the total number of 
> bytes in it is above 1KB”)
>  * As a Sink developer, I’d like to use the configuration mechanism provided 
> to allow Sink users to configure my Sink implementation
>  * 
> {{*Scope*
> }}
>  * Allow Sink developers and users to pass batch size config to the 
> AsyncSinkWriter
>  * Add support for time-based flushing (e.g. “flush after x miliseconds”) 
> using the ProcessingTimeService which is part of the Sink interface
>  * Add support for byte-based flushing
>  * Consider the combination of time-based flushing and byte-based flushing, 
> if there are more bytes than configured in the time-based batch, then the 
> last few (however many necessary) items should go in the next batch to 
> satisfy the requirement for the number of bytes.
> *References*
> More details to be found 
> [https://cwiki.apache.org/confluence/display/FLINK/FLIP-171%3A+Async+Sink]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to