[ 
https://issues.apache.org/jira/browse/KAFKA-13632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-13632:
--------------------------------------

    Assignee: Rens Groothuijsen

> MirrorMaker 2.0 NPE and Warning "Failure to commit records" for filtered 
> records
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-13632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13632
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 3.1.0, 3.2.0, 3.3.0
>            Reporter: Bert Baron
>            Assignee: Rens Groothuijsen
>            Priority: Minor
>             Fix For: 3.4.0
>
>
> We have a setup where we filter records with MirrorMaker 2.0 (see below). 
> This results in the following warning messages as a result of NPE's in 
> MirrorSourceTask.commitRecord for each filtered record:
> {code:java}
> [2022-01-31 08:01:29,581] WARN [MirrorSourceConnector|task-0] Failure 
> committing record. (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
>       at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>       at java.base/java.lang.Thread.run(Thread.java:829) {code}
> The reason seems to be that for filtered records metadata is null. Note that 
> in the overridden SourceTask.commitRecord the javadoc clearly states that 
> metadata will be null if the record was filtered.
> In our case we use a custom predicate, but the issue can be reproduced with 
> the following configuration:
> {code:java}
> clusters = source,target
> tasks.max = 1
> source.bootstrap.servers = <cluster1>
> target.bootstrap.servers = <cluster2>
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> source->target.enabled = true
> source->target.topics = topic1
> source->target.transforms=Filter
> source->target.transforms.Filter.type=org.apache.kafka.connect.transforms.Filter
> source->target.transforms.Filter.predicate=HeaderPredicate
> source->target.predicates=HeaderPredicate
> source->target.predicates.HeaderPredicate.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
> source->target.predicates.HeaderPredicate.name=someheader
>  {code}
> Each record with the header key 'someheader' will result in the NPE and 
> warning message.
> On a side note, we couldn't find clear documentation on how to configure 
> (SMT) filtering with MirrorMaker 2 or whether this is supported at all, but 
> apart from the NPE's and warning messages this seems to functionally work for 
> us with our custom filter.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to