sandeep-mst opened a new pull request, #11:
URL: https://github.com/apache/pulsar-connectors/pull/11

   <!-- Either this PR fixes an issue, -->
   
   Fixes [#25450](https://github.com/apache/pulsar/pull/25452)
   
   ### Motivation
   
   <!-- Explain here the context, and why you're making that change. What is 
the problem you're trying to solve. -->
   Fix acknowledgments not being sent when the `collapsePartitionedTopics=true` 
on kafka-connect-adapter. The topic name stored in 
[sinkrecord](https://github.com/apache/pulsar/blob/3d33e4874fefa2a570a1ef60e02d9e995c4dde27/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java#L443)
 and 
[currentOffsets](https://github.com/apache/pulsar/blob/3d33e4874fefa2a570a1ef60e02d9e995c4dde27/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java#L500)
 is different from the topic name stored in the pulsar record. 
   
   The topic name used in SinkRecord / currentOffsets is collapsed (base topic 
name) and the topic name in the original Pulsar Record remains non-collapsed 
(includes -partition-X)
   
   As a result, during `ackUntil()`
   - Topic lookup in committedOffsets fails
   - 
[lastCommittedOffset](https://github.com/apache/pulsar/blob/3d33e4874fefa2a570a1ef60e02d9e995c4dde27/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java#L312)
 will always be `null` because of topic name mismatch.
   - Records are never acknowledged
   
   ### Modifications
   
   <!-- Describe the modifications you've done. -->
   Updated `ackUntil()` to derive topic and partition from the source Record 
using the same logic as `toSinkRecord()` (i.e., respecting 
`collapsePartitionedTopics`)
   Added a debug log and `ackRequestedCount` for better logging clarity.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   This change added tests and can be verified as follows:
     - 
*org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest#testAckUntilWithCollapsePartitionedTopics*
     - 
*org.apache.pulsar.io.kafka.connect.KafkaConnectSinkTest#testAckUntilWithoutCollapsePartitionedTopics*
   
   ### Does this pull request potentially affect one of the following parts:
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE -->
   <!--
   After opening this PR, the build in apache/pulsar will fail and instructions 
will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since 
the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed 
in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments 
have
   been handled, the tests pass and the PR is approved by a reviewer.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to