C0urante commented on code in PR #12602:
URL: https://github.com/apache/kafka/pull/12602#discussion_r969617914


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -160,6 +160,33 @@ public void testPoll() {
         }
     }
 
+    @Test
+    public void testCommitRecordWithNullMetadata() {
+        // Create a consumer mock
+        byte[] key1 = "abc".getBytes();
+        byte[] value1 = "fgh".getBytes();
+        String topicName = "test";
+        String headerKey = "key";
+        RecordHeaders headers = new RecordHeaders(new Header[] {
+            new RecordHeader(headerKey, "value".getBytes()),
+        });
+
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        MirrorMetrics metrics = mock(MirrorMetrics.class);
+
+        String sourceClusterName = "cluster1";
+        ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, 
metrics, sourceClusterName,
+                replicationPolicy, 50);
+
+        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new 
ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
+                TimestampType.CREATE_TIME, key1.length, value1.length, key1, 
value1, headers, Optional.empty()));
+
+        // Expect that commitRecord will not throw an exception
+        mirrorSourceTask.commitRecord(sourceRecord, null);

Review Comment:
   It's good that we're ensuring this code path doesn't lead to an exception, 
but it'd be nice if we could also ensure that we don't try to produce offsets 
in this case as well.
   
   Could we add an extra parameter to the testing-only constructor defined 
[here](https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java#L70-L79)
 for the `offsetProducer` field, and then, using a mocked producer and 
[Mockito::verifyNoInteractions](https://javadoc.io/doc/org.mockito/mockito-core/latest/org/mockito/Mockito.html#verifyNoInteractions(java.lang.Object...)),
 check to make sure that the source task doesn't use that producer to try to 
send any offset sync records?



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -170,25 +170,25 @@ public List<SourceRecord> poll() {
  
     @Override
     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
-        try {
-            if (stopping) {
-                return;
-            }
-            if (!metadata.hasOffset()) {
-                log.error("RecordMetadata has no offset -- can't sync offsets 
for {}.", record.topic());
-                return;
-            }
-            TopicPartition topicPartition = new TopicPartition(record.topic(), 
record.kafkaPartition());
-            long latency = System.currentTimeMillis() - record.timestamp();
-            metrics.countRecord(topicPartition);
-            metrics.replicationLatency(topicPartition, latency);
-            TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
-            long upstreamOffset = 
MirrorUtils.unwrapOffset(record.sourceOffset());
-            long downstreamOffset = metadata.offset();
-            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
-        } catch (Throwable e) {
-            log.warn("Failure committing record.", e);
+        if (stopping) {
+            return;
+        }
+        if (metadata == null) {
+            log.debug("No RecordMetadata -- can't sync offsets for {}.", 
record.topic());

Review Comment:
   Can we note here the likely cause of this scenario? Thinking something like 
this:
   
   ```suggestion
               log.debug("No RecordMetadata (source record was probably 
filtered out during transformation) -- can't sync offsets for {}.", 
record.topic());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to