C0urante commented on code in PR #12602: URL: https://github.com/apache/kafka/pull/12602#discussion_r969617914
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java: ########## @@ -160,6 +160,33 @@ public void testPoll() { } } + @Test + public void testCommitRecordWithNullMetadata() { + // Create a consumer mock + byte[] key1 = "abc".getBytes(); + byte[] value1 = "fgh".getBytes(); + String topicName = "test"; + String headerKey = "key"; + RecordHeaders headers = new RecordHeaders(new Header[] { + new RecordHeader(headerKey, "value".getBytes()), + }); + + @SuppressWarnings("unchecked") + KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class); + MirrorMetrics metrics = mock(MirrorMetrics.class); + + String sourceClusterName = "cluster1"; + ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy(); + MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName, + replicationPolicy, 50); + + SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(), + TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty())); + + // Expect that commitRecord will not throw an exception + mirrorSourceTask.commitRecord(sourceRecord, null); Review Comment: It's good that we're ensuring this code path doesn't lead to an exception, but it'd be nice if we could also ensure that we don't try to produce offsets in this case as well. Could we add an extra parameter to the testing-only constructor defined [here](https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L70-L79) for the `offsetProducer` field, and then, using a mocked producer and [Mockito::verifyNoInteractions](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#verifyNoInteractions(java.lang.Object...)), check to make sure that the source task doesn't use that producer to try to send any offset sync records? ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java: ########## @@ -170,25 +170,25 @@ public List<SourceRecord> poll() { @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { - try { - if (stopping) { - return; - } - if (!metadata.hasOffset()) { - log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); - return; - } - TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - long latency = System.currentTimeMillis() - record.timestamp(); - metrics.countRecord(topicPartition); - metrics.replicationLatency(topicPartition, latency); - TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); - long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); - long downstreamOffset = metadata.offset(); - maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); - } catch (Throwable e) { - log.warn("Failure committing record.", e); + if (stopping) { + return; + } + if (metadata == null) { + log.debug("No RecordMetadata -- can't sync offsets for {}.", record.topic()); Review Comment: Can we note here the likely cause of this scenario? Thinking something like this: ```suggestion log.debug("No RecordMetadata (source record was probably filtered out during transformation) -- can't sync offsets for {}.", record.topic()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org