[ 
https://issues.apache.org/jira/browse/FLINK-25696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-25696:
---------------------------------
    Description: 
In Table Store, we want to get the offsets of kafka writer, only the offset 
returned by the callback inside the KafkaWriter is accurate, so we need this 
callback mechanism.

This ticket wants to add metadataConsumer to InitContext in Sink:
{code:java}
/**
 * Returns a metadata consumer, the {@link SinkWriter} can publish metadata 
events of type
 * {@link MetaT} to the consumer. The consumer can accept metadata events in an 
asynchronous
 * thread, and the {@link Consumer#accept} method is executed very fast.
 */
default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
    return Optional.empty();
}{code}
SinkWriter can get this consumer, and publish metadata to the consumer 
implemented by table store sink.

  was:
In Table Store, we want to get the offsets of kafka writer, only the offset 
returned by the callback inside the KafkaWriter is accurate, so we need this 
callback mechanism.

This ticket wants to add a interface MetadataPublisher:
{code:java}
public interface MetadataPublisher<MetaT> {
    void subscribe(Consumer<MetaT> consumer);
} {code}
SinkWriter can implement this interface, so that table store can subscribe 
metadata from SinkWriter.


> Introduce metadataConsumer to InitContext in Sink
> -------------------------------------------------
>
>                 Key: FLINK-25696
>                 URL: https://issues.apache.org/jira/browse/FLINK-25696
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream, Connectors / Kafka, Table Store
>            Reporter: Jingsong Lee
>            Assignee: Jingsong Lee
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> In Table Store, we want to get the offsets of kafka writer, only the offset 
> returned by the callback inside the KafkaWriter is accurate, so we need this 
> callback mechanism.
> This ticket wants to add metadataConsumer to InitContext in Sink:
> {code:java}
> /**
>  * Returns a metadata consumer, the {@link SinkWriter} can publish metadata 
> events of type
>  * {@link MetaT} to the consumer. The consumer can accept metadata events in 
> an asynchronous
>  * thread, and the {@link Consumer#accept} method is executed very fast.
>  */
> default <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
>     return Optional.empty();
> }{code}
> SinkWriter can get this consumer, and publish metadata to the consumer 
> implemented by table store sink.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to