Hi Jason,

Fine by me; I wanted to be conservative with the return type but the case
you've outlined sounds enticing enough that adding a little flexibility to
the API seems warranted. I've added your suggestion to the proposed admin
API expansions; let me know what you think.

Cheers,

Chris

On Mon, Feb 1, 2021 at 3:38 PM Jason Gustafson <ja...@confluent.io> wrote:

> Hi Chris,
>
> If we add the new `fenceProducers` admin API, can we return the information
> from the `InitProducerId` response (i.e. producer id and epoch)? We may not
> have a use case for it yet, but I don't see any harm exposing it for the
> future. For example, we could allow this state to be provided to the
> Producer instance on initialization, which would save the need for the
> second `InitProducerId` request in the current proposal. Also, the `Void`
> type does give us much room for extension.
>
> -Jason
>
>
> On Mon, Jan 25, 2021 at 7:29 AM Chris Egerton <chr...@confluent.io> wrote:
>
> > Hi Ning,
> >
> > Apologies for the delay in response. I realized after publishing the KIP
> > that there were some finer points I hadn't considered in my design and
> that
> > it was far from providing exactly-once guarantees. In response to your
> > questions:
> >
> > 1) The goal of the KIP is to ensure the accuracy of the offsets that the
> > framework provides to source tasks; if tasks choose to manage offsets
> > outside of the framework, they're on their own. So, the source records
> and
> > their offsets will be written/committed to Kafka, and the task will be
> > provided them on startup, but it (or really, its predecessor) may not
> have
> > had time to do cleanup on resources associated with those records before
> > being killed.
> >
> > 2) I've cleaned up this section and removed the pseudocode as it seems
> too
> > low-level to be worth discussing in a KIP. I'll try to summarize here,
> > though: task.commit() is not what causes offsets provided to the
> framework
> > by tasks to be committed; it's simply a follow-up hook provided out of
> > convenience to tasks so that they can clean up resources associated with
> > the most recent batch of records (by ack'ing JMS messages, for example).
> > The Connect framework uses an internal Kafka topic to store source task
> > offsets.
> >
> > 3) In order to benefit from the improvements proposed in this KIP, yes,
> the
> > single source-of-truth should be the OffsetStorageReader provided to the
> > task by the Connect framework, at least at startup. After startup, tasks
> > should ideally bookkeep their own offset progress as each request to read
> > offsets requires a read to the end of the offsets topic, which can be
> > expensive in some cases.
> >
> > I've since expanded the KIP to include general exactly-once support for
> > source connectors that should cover the points I neglected in my initial
> > design, so it should be ready for review again.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Jul 27, 2020 at 11:42 PM Ning Zhang <ning2008w...@gmail.com>
> > wrote:
> >
> > > Hello Chris,
> > >
> > > That is an interesting KIP. I have a couple of questions:
> > >
> > > (1) in section of pseudo-code, what if the failure happens between 4(b)
> > > and 5(a), meaning after the producer commit the transaction, and before
> > > task.commitRecord().
> > >
> > > (2) in section "source task life time",  what is the difference between
> > > "commit offset" and "offsets to commit"? Given that the offset storage
> > can
> > > be a Kafka topic (/KafkaOffsetBackingStore.java) and producer could
> only
> > > produce to a kafka topic, are / is the topic(s) the same ? (the topic
> > that
> > > producer writes offsets to and the topic task.commit() to)
> > >
> > > (3) for JDBC source task, it relies on `context.offsetStorageReader()`
> (
> > >
> >
> https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java#L140
> > )
> > > to retrieve the previously committed offset (if from a fresh start or
> > > resume from failure). so it seems that the single-source-of-truth of
> > where
> > > to consume from last known / committed position stored in offset
> storage
> > > (e.g. kafka topic) managed by the periodic task.commit()?
> > >
> > > On 2020/05/22 06:20:51, Chris Egerton <chr...@confluent.io> wrote:
> > > > Hi all,
> > > >
> > > > I know it's a busy time with the upcoming 2.6 release and I don't
> > expect
> > > > this to get a lot of traction until that's done, but I've published a
> > KIP
> > > > for allowing atomic commit of offsets and records for source
> connectors
> > > and
> > > > would appreciate your feedback:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > > >
> > > > This feature should make it possible to implement source connectors
> > with
> > > > exactly-once delivery guarantees, and even allow a wide range of
> > existing
> > > > source connectors to provide exactly-once delivery guarantees with no
> > > > changes required.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > >
> >
>

Reply via email to