[ https://issues.apache.org/jira/browse/KAFKA-13985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton resolved KAFKA-13985. ----------------------------------- Resolution: Fixed > MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record > ------------------------------------------------------------------------------ > > Key: KAFKA-13985 > URL: https://issues.apache.org/jira/browse/KAFKA-13985 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 3.1.0, 3.2.0, 3.3.0 > Reporter: Jacopo Riciputi > Assignee: Rens Groothuijsen > Priority: Minor > Fix For: 3.4.0 > > > Applying a SMT that filters out messages it can brings to enter in this path: > From WorkerSourceTask.java > {code:java} > final SourceRecord record = transformationChain.apply(preTransformRecord); > final ProducerRecord<byte[], byte[]> producerRecord = > convertTransformedRecord(record); > if (producerRecord == null || retryWithToleranceOperator.failed()) { > counter.skipRecord(); > commitTaskRecord(preTransformRecord, null); > continue; > } {code} > > Then to: > {code:java} > private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) { > try { > task.commitRecord(record, metadata); > } catch (Throwable t) { > log.error("{} Exception thrown while calling > task.commitRecord()", this, t); > } > }{code} > Finally > From MirrorSourceTask.java > {code:java} > @Override > public void commitRecord(SourceRecord record, RecordMetadata metadata) { > try { > if (stopping) { > return; > } > if (!metadata.hasOffset()) { > log.error("RecordMetadata has no offset -- can't sync offsets > for {}.", record.topic()); > return; > } > ...{code} > > Causing a NPE because metadata is null. > This the exception. > {code:java} > [2022-06-13 12:31:33,094] WARN Failure committing record. > (org.apache.kafka.connect.mirror.MirrorSourceTask:190) > java.lang.NullPointerException > at > org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) > at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) {code} > In my understanding this is well handled and it does not have negative > impacts because it's handled by MirrorSourceTask.commitRecord, without > leaving the exception be forwarded outside of it. > But probably is preferred to handle it checking if metadata != null. > So skipping commit but safely and silently > [EDIT] > Actually, going a bit in deep, there is a small side-effect. > If the latest message elaborated was filtered out (so not committed by > MirrorSourceTask), if MM2 instance is rebooted, this message will be re-read > by consumer, because offset was not committed (and probably filtered out if > configurations wasn't change). > But probably this behavior is fine considering MM2's nature > -- This message was sent by Atlassian Jira (v8.20.10#820010)