Small correction. I didn't mean to declare the new method `abstract`.
I agree with Randall's suggestion to give it a default implementation that
will call the old `put` and at the same time deprecate the old `put`.

Konstantine

On Fri, May 15, 2020 at 10:19 AM Konstantine Karantasis <
konstant...@confluent.io> wrote:

>
> I was on the fence between the various overloading methods myself, liking
> `start(...)` the least.
>
> Initially, I thought we were interested in offering the ability to call
> the reporter out of band, outside `put`.
> But after your replies I understand you don't think that's the case, and I
> also agree that keeping the reporter in `put(...)` makes the intended use
> case more clear.
> In most cases it won't even require storing it as a member variable in the
> task class.
>
> So, I'm also happy with
> `public abstract void put(Collection<SinkRecord> records, BiFunction<...>
> failedRecordReporter)`
>
> Konstantine
>
> On Fri, May 15, 2020 at 9:10 AM Randall Hauch <rha...@gmail.com> wrote:
>
>> Konstantine said:
>>
>> > I notice Randall also used BiFunction in his example, I wonder if it's
>> for
>> > similar reasons.
>> >
>>
>> Nope. Just a typo on my part.
>>
>> There appear to be three outstanding questions.
>>
>> First, Konstantine suggested calling this "failedRecordReporter". I think
>> this is minor, but using this new name may be a bit more precise and I'd
>> be
>> fine with this.
>>
>> Second, should the reporter method be synchronous? I think the two options
>> are:
>>
>> 2a. Use `BiConsumer<SinkRecord, Throwable>` that returns nothing and
>> blocks
>> (at this time).
>> 2b. Use `BiFunction<SinkRecord, Throwable, Future<Void>>` that returns a
>> future that the user can optionally use to be synchronous.
>>
>> I do agree with Konstantine that option 2b gives us more room for future
>> semantic changes, and since the producer write is already asynchronous
>> this
>> should be straightforward to implement. I think the concern here is that
>> if
>> the sink task does not *use* the future to make this synchronous, it is
>> very possible that the error records could be written out of order (due to
>> retries). But this won't be an issue if the implementation uses
>> `max.in.flight.requests.per.connection=1` for writing the error records.
>> It's a little less clear, but honestly IMO passing the reporter in the
>> `put(...)` method helps make this lambda easier to understand, for some
>> strange reason. So unless there are good reasons to avoid this, I'd be in
>> favor of 2b and returning a Future.
>>
>> Third, how do we pass the reporter lambda / method reference to the task?
>> My proposal to pass the reporter via an overload `put(...)` still is the
>> most attractive to me, for several reasons:
>>
>> 3a. There's no need to pass the reporter separately *and* to describe the
>> changes in method call ordering.
>> 3b. As mentioned above, for some reason passing it via `put(...)` makes
>> the
>> intent more clear that it be used when processing the SinkRecord, and that
>> it shouldn't be used in `start(...)`, `preCommit(...)`,
>> `onPartitionsAssigned(...)`, or any of the other task methods. As Andrew
>> pointed out earlier, *describing* this in the KIP and in JavaDoc will be
>> tough to be exact yet succinct.
>> 3c. There is already precedence for evolving
>> `SourceTask.commitRecord(...)`, and the pattern is identical.
>> 3d. Backward compatibility is easy to understand, and at the same time
>> it's
>> pretty easy to describe what implementations that want to take advantage
>> of
>> this feature should do.
>> 3e. Minimal changes to the interface: we're just *adding* one default
>> method that calls the existing method and deprecating the existing
>> `put(...)`.
>> 3f. Deprecating the existing `put(...)` makes it more clear in a
>> programmatic sense that new sink implementations should use the reporter,
>> and that we recommend old sinks evolve to use it.
>>
>> Some of these benefits apply to some of the other suggestions, but I think
>> none of the other suggestions have all of these benefits. For example,
>> overloading `initialize(...)` is more difficult since most sink connectors
>> don't override it and therefore would be less subject to deprecations
>> warnings. Overloading `start(...)` is less attractive. Adding a method IMO
>> shares the fewest of these benefits.
>>
>> The one disadvantage of this approach is that sink task implementations
>> can't rely upon the reporter upon startup. IMO that's an acceptable
>> tradeoff to get the cleaner and more explicit API, especially if the API
>> contract is that Connect will pass the same reporter instance to each call
>> to `put(...)` on a single task instance.
>>
>> Best regards,
>>
>> Randall
>>
>> On Fri, May 15, 2020 at 6:59 AM Andrew Schofield <
>> andrew_schofi...@live.com>
>> wrote:
>>
>> > Hi,
>> > Randall's suggestion is really good. I think it gives the flexibility
>> > required and also
>> > keeps the interface the right way round.
>> >
>> > Thanks,
>> > Andrew Schofield
>> >
>> > On 15/05/2020, 02:07, "Aakash Shah" <as...@confluent.io> wrote:
>> >
>> > > Hi Randall,
>> > >
>> > > Thanks for the feedback.
>> > >
>> > > 1. This is a great suggestion, but I find that adding an overloaded
>> > > put(...) which essentially deprecates the old put(...) to only be used
>> > when
>> > > a connector is deployed on older versions of Connect adds enough of a
>> > > complication that could cause connectors to break if the old put(...)
>> > > doesn't correctly invoke the overloaded put(...); either that, or it
>> will
>> > > add duplication of functionality across the two put(...) methods. I
>> think
>> > > the older method simplifies things with the idea that a DLQ/error
>> > reporter
>> > > will or will not be passed into the method depending on the version of
>> > AK.
>> > > However, I also understand the aesthetic advantage of this method vs
>> the
>> > > setter method, so I am okay with going in this direction if others
>> agree
>> > > with adding the overloaded put(...).
>> > >
>> > > 2. Yes, your assumption is correct. Yes, we can remove the "Order of
>> > > Operations" if we go with the overloaded put(...) direction.
>> > >
>> > > 3. Great point, I will remove them from the KIP.
>> > >
>> > > 4. Yeah, accept(...) will be synchronous. I will change it to be
>> clearer,
>> > > thanks.
>> > >
>> > > 5. This KIP will use existing metrics as well introduce new metrics. I
>> > will
>> > > update this section to fully specify the metrics.
>> > >
>> > > Please let me know what you think.
>> > >
>> > > Thanks,
>> > > Aakash
>> > >
>> > > On Thu, May 14, 2020 at 3:52 PM Randall Hauch <rha...@gmail.com>
>> wrote:
>> > >
>> > > > Hi, Aakash.
>> > > >
>> > > > Thanks for the KIP. Connect does need an improved ability for sink
>> > > > connectors to report individual records as being problematic, and
>> this
>> > > > integrates nicely with the existing DLQ feature.
>> > > >
>> > > > I also appreciate the desire to maintain compatibility so that
>> > connectors
>> > > > can take advantage of this feature when deployed in a runtime that
>> > supports
>> > > > this feature, but can safely and easily do without the feature when
>> > > > deployed to an older runtime. But I do understand Andrew's concern
>> > about
>> > > > the aesthetics. Have you considered overloading the `put(...)`
>> method
>> > and
>> > > > adding the `reporter` as a second parameter? Essentially it would
>> add
>> > the
>> > > > one method (with proper JavaDoc) to `SinkTask` only:
>> > > >
>> > > > ```
>> > > >     public void put(Collection<SinkRecord> records,
>> > BiFunction<SinkRecord,
>> > > > Throwable> reporter) {
>> > > >         put(records);
>> > > >     }
>> > > > ```
>> > > > and the WorkerSinkTask would be changed to call `put(Collection,
>> > > > BiFunction)` instead.
>> > > >
>> > > > Sink connector implementations that don't do anything different can
>> > still
>> > > > override `put(Collection)`, and it still works as before. Developers
>> > that
>> > > > want to change their sink connector implementations to support this
>> new
>> > > > feature would do the following, which would work in older and newer
>> > Connect
>> > > > runtimes:
>> > > > ```
>> > > >     public void put(Collection<SinkRecord> records) {
>> > > >         put(records, null);
>> > > >     }
>> > > >     public void put(Collection<SinkRecord> records,
>> > BiFunction<SinkRecord,
>> > > > Throwable> reporter) {
>> > > >         // the normal `put(Collection)` logic goes here, but can
>> > optionally
>> > > > use `reporter` if non-null
>> > > >     }
>> > > > ```
>> > > >
>> > > > I think this has all the same benefits of the current KIP, but
>> > > > it's noticeably simpler and hopefully more aesthetically pleasing.
>> > > >
>> > > > As for Andrew's second concern about "the task can send errant
>> records
>> > to
>> > > > it within put(...)" being too restrictive. My guess is that this was
>> > more
>> > > > an attempt at describing the basic behavior, and less about
>> requiring
>> > the
>> > > > reporter only being called within the `put(...)` method and not by
>> > methods
>> > > > to which `put(...)` synchronously or asynchronously delegates. Can
>> you
>> > > > confirm whether my assumption is correct? If so, then perhaps my
>> > suggestion
>> > > > helps work around this issue as well, since there would be no
>> > restriction
>> > > > on when the reporter is called, and the whole "Order of Operations"
>> > section
>> > > > could potentially be removed.
>> > > >
>> > > > Third, it's not clear to me why the "Error Reporter Object"
>> subsection
>> > in
>> > > > the "Proposal" section lists the worker configuration properties
>> that
>> > were
>> > > > previously introduced with
>> > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
>> > > > .
>> > > > Maybe it's worth mentioning that the error reporter functionality
>> will
>> > > > reuse or build upon KIP-298, including reusing the configuration
>> > properties
>> > > > defined in KIP-298. But IIUC, the KIP does not propose changing any
>> > > > technical or semantic aspect of these configuration properties, and
>> > > > therefore the KIP would be more clear and succinct without them.
>> > *That* the
>> > > > error reporter will use these properties is part of the UX and
>> > therefore
>> > > > necessary to mention, but *how* it uses those properties is really
>> up
>> > to
>> > > > the implementation.
>> > > >
>> > > > Fourth, the "Synchrony" section has a sentence that is confusing, or
>> > not as
>> > > > clear as it could be.
>> > > >
>> > > >     "If a record is sent to the error reporter, processing of the
>> next
>> > > > errant record in accept(...) will not begin until the producer
>> > successfully
>> > > > sends the errant record to Kafka."
>> > > >
>> > > > This sentence is a bit difficult to understand, but IIUC this really
>> > just
>> > > > means that "accept(...)" will be synchronous and will block until
>> the
>> > > > errant record has been successfully written to Kafka. If so, let's
>> say
>> > > > that. The rest of the paragraph is fine.
>> > > >
>> > > > Finally, is this KIP proposing new metrics, or that existing metrics
>> > would
>> > > > be used to track the error reporter usage? If the former, then
>> please
>> > > > fully-specify what these metrics will be, similarly to how metrics
>> are
>> > > > specified in
>> > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-196%3A+Add+metrics+to+Kafka+Connect+framework
>> > > > .
>> > > >
>> > > > Thoughts?
>> > > >
>> > > > Best regards,
>> > > >
>> > > > Randall
>> > > >
>> > > > On Mon, May 11, 2020 at 4:49 PM Andrew Schofield <
>> > > > andrew_schofi...@live.com>
>> > > > wrote:
>> > > >
>> > > > > Hi Aakash,
>> > > > > Thanks for sorting out the replies to the mailing list.
>> > > > >
>> > > > > First, I do like the idea of improving error reporting in sink
>> > > > connectors.
>> > > > > I'd like a simple
>> > > > > way to put bad records onto the DLQ.
>> > > > >
>> > > > > I think this KIP is considerably more complicated than it seems.
>> The
>> > > > > guidance on the
>> > > > > SinkTask.put() method is that it should send the records
>> > asynchronously
>> > > > > and immediately
>> > > > > return, so the task is likely to want to report errors
>> asynchronously
>> > > > > too.  Currently the KIP
>> > > > > states that "the task can send errant records to it within
>> put(...)"
>> > and
>> > > > > that's too restrictive.
>> > > > > The task ought to be able to report any unflushed records, but the
>> > > > > synchronisation of this is going
>> > > > > to be tricky. I suppose the connector author needs to make sure
>> that
>> > all
>> > > > > errant records have
>> > > > > been reported before returning control from SinkTask.flush(...) or
>> > > > perhaps
>> > > > > SinkTask.preCommit(...).
>> > > > >
>> > > > > I think the interface is a little strange too. I can see that this
>> > was
>> > > > > done so it's possible to deliver a connector
>> > > > > that supports error reporting but it can also work in earlier
>> > versions of
>> > > > > the KC runtime. But, the
>> > > > > pattern so far is that the task uses the methods of
>> SinkTaskContext
>> > to
>> > > > > access utilities in the Kafka
>> > > > > Connect runtime, and I suggest that reporting a bad record is
>> such a
>> > > > > utility. SinkTaskContext has
>> > > > > changed before when the configs() methods was added, so I think
>> > there is
>> > > > > precedent for adding a method.
>> > > > > The way the KIP adds a method to SinkTask that the KC runtime
>> calls
>> > to
>> > > > > provide the error reporting utility
>> > > > > seems not to match what has gone before.
>> > > > >
>> > > > > Thanks,
>> > > > > Andrew
>> > > > >
>> > > > > On 11/05/2020, 19:05, "Aakash Shah" <as...@confluent.io> wrote:
>> > > > >
>> > > > >     I wasn't previously added to the dev mailing list, so I'd
>> like to
>> > > > post
>> > > > > my
>> > > > >     discussion with Andrew Schofield below for visibility and
>> further
>> > > > >     discussion:
>> > > > >
>> > > > >     Hi Andrew,
>> > > > >
>> > > > >     Thanks for the reply. The main concern with this approach
>> would
>> > be
>> > > > its
>> > > > >     backward compatibility. I’ve highlighted the thoughts around
>> the
>> > > > > backwards
>> > > > >     compatibility of the initial approach, please let me know what
>> > you
>> > > > > think.
>> > > > >
>> > > > >     Thanks,
>> > > > >     Aakash
>> > > > >
>> > > > >
>> > > > >
>> > > >
>> >
>> ____________________________________________________________________________________________________________________________
>> > > > >
>> > > > >     Hi,
>> > > > >     By adding a new method to the SinkContext interface in say
>> Kafka
>> > > > 2.6, a
>> > > > >     connector that calls it would require a Kafka 2.6 connect
>> > runtime. I
>> > > > > don't
>> > > > >     quite see how that's a backward compatibility problem. It's
>> just
>> > that
>> > > > > new
>> > > > >     connectors need the latest interface. I might not quite be
>> > > > > understanding,
>> > > > >     but I think it would be fine.
>> > > > >
>> > > > >     Thanks,
>> > > > >     Andrew
>> > > > >
>> > > > >
>> > > > >
>> > > >
>> >
>> ____________________________________________________________________________________________________________________________
>> > > > >
>> > > > >     Hi Andrew,
>> > > > >
>> > > > >     I apologize for the way the reply was sent. I just subscribed
>> to
>> > the
>> > > > > dev
>> > > > >     mailing list so it should be resolved now.
>> > > > >
>> > > > >     You are correct, new connectors would simply require the
>> latest
>> > > > > interface.
>> > > > >     However, we want to remove that requirement - in other words,
>> we
>> > want
>> > > > > to
>> > > > >     allow the possibility that someone wants the latest
>> connector/to
>> > > > > upgrade to
>> > > > >     the latest version, but deploys it on an older version of AK.
>> > > > > Basically, we
>> > > > >     don't want to enforce the necessity of upgrading AK to get the
>> > latest
>> > > > >     interface. In the current approach, there would be no issue of
>> > > > > deploying a
>> > > > >     new connector on an older version of AK, as the Connect
>> framework
>> > > > would
>> > > > >     simply not invoke the new method.
>> > > > >
>> > > > >     Please let me know what you think and if I need to clarify
>> > anything.
>> > > > >
>> > > > >     Thanks,
>> > > > >     Aakash
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>>
>

Reply via email to