[ https://issues.apache.org/jira/browse/KAFKA-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton resolved KAFKA-13632. ----------------------------------- Resolution: Fixed > MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered > records > -------------------------------------------------------------------------------- > > Key: KAFKA-13632 > URL: https://issues.apache.org/jira/browse/KAFKA-13632 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.1.0 > Reporter: Bert Baron > Priority: Minor > > We have a setup where we filter records with MirrorMaker 2.0 (see below). > This results in the following warning messages as a result of NPE's in > MirrorSourceTask.commitRecord for each filtered record: > {code:java} > [2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure > committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190) > java.lang.NullPointerException > at > org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) {code} > The reason seems to be that for filtered records metadata is null. Note that > in the overridden SourceTask.commitRecord the javadoc clearly states that > metadata will be null if the record was filtered. > In our case we use a custom predicate, but the issue can be reproduced with > the following configuration: > {code:java} > clusters = source,target > tasks.max = 1 > source.bootstrap.servers = <cluster1> > target.bootstrap.servers = <cluster2> > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > source->target.enabled = true > source->target.topics = topic1 > source->target.transforms=Filter > source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter > source->target.transforms.Filter.predicate=HeaderPredicate > source->target.predicates=HeaderPredicate > source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey > source->target.predicates.HeaderPredicate.name=someheader > {code} > Each record with the header key 'someheader' will result in the NPE and > warning message. > On a side note, we couldn't find clear documentation on how to configure > (SMT) filtering with MirrorMaker 2 or whether this is supported at all, but > apart from the NPE's and warning messages this seems to functionally work for > us with our custom filter. > -- This message was sent by Atlassian Jira (v8.20.10#820010)