Hi Huidong and Apoorv,
Thanks for bringing this up.
This example is indeed incorrect. Explicit acknowledgement requires
that every record returned by a poll must be acknowledged before
the next poll. Actually, the previous example in the KIP is the
correct way to do it, and you cannot abandon a batch of records like
this example attempts to show.
The original behaviour (AK 4.0) of explicit acknowledgement did work like
the example showed, but we changed it because of feedback from
users writing code with the ShareConsumer API.
I've updated the KIP to remove the misleading example. As I mentioned,
the previous example in the KIP is correct.
Thanks,
Andrew
On 2025/12/08 22:57:48 Apoorv Mittal wrote:
> Hi Huidong,
> Thanks for the review and pointing out the error in the example. Yes, I
> agree it should have the break statement and the example should look like
> below.
>
> I'll wait for Andrew's thoughts but I agree with your finding and shall
> update the document accordingly after confirmation.
>
> Properties props = new Properties();
> props.setProperty("bootstrap.servers", "localhost:9092");
> props.setProperty("group.id", "myshare");
> props.setProperty("share.acknowledgement.mode", "explicit");
>
> KafkaShareConsumer<String, String> consumer = new
> KafkaShareConsumer<>(props, new StringDeserializer(), new
> StringDeserializer());
> consumer.subscribe(Arrays.asList("foo"));
> while (true) {
> ConsumerRecords<String, String> records =
> consumer.poll(Duration.ofMillis(100)); // Return a batch of acquired
> records
> for (ConsumerRecord<String, String> record : records) {
> try {
> doProcessing(record);
> consumer.acknowledge(record,
> AcknowledgeType.ACCEPT); // Mark the record as
> processed successfully
> } catch (Exception e) {
> consumer.acknowledge(record,
> AcknowledgeType.REJECT); // Mark this record as
> unprocessable
> break;
> }
> }
>
> consumer.commitAsync();
> //
> Commit the acknowledgements of the acknowledged records only
> }
>
> Regards,
> Apoorv Mittal
>
>
> On Mon, Dec 8, 2025 at 2:48 PM 尹会东 <[email protected]> wrote:
>
> >
> > Hi Andrew and Kafka community,
> >
> > I found an inconsistency in the KIP-932 documentation at:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255070434
> >
> > In the section "Example - Per-record acknowledgement, ending processing of
> > the batch on an error (explicit acknowledgement)", the description says:
> >
> > > "In this example, the application stops processing the batch when it
> > encounters an exception."
> >
> > However, the example code does not include a break statement in the catch
> > block, so it would continue processing the remaining records in the batch.
> > This contradicts the description.
> >
> > Current code (without break):
> > ```java
> > catch (Exception e) {
> > consumer.acknowledge(record, AcknowledgeType.REJECT);
> > }
> > ```
> > Or if the code is correct, the description should be updated to reflect
> > that processing continues after an error.
> >
> > Best regards,
> >
> > Huidong Yin
> >
> >
> >
>