This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4597e954d91b05b64ff694cc4c2022713c266eba Author: Chris Egerton <[email protected]> AuthorDate: Mon Feb 6 04:53:58 2023 -0500 KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181) Reviewers: Mickael Maison <[email protected]>, Greg Harris <[email protected]> --- .../kafka/connect/mirror/MirrorSourceTask.java | 96 ++++++++++++++-------- .../kafka/connect/mirror/MirrorSourceTaskTest.java | 31 +++++-- 2 files changed, 88 insertions(+), 39 deletions(-) diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java index 4e714a6bf46..b9dd470d013 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java @@ -36,6 +36,8 @@ import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; import java.util.HashMap; import java.util.List; @@ -62,6 +64,7 @@ public class MirrorSourceTask extends SourceTask { private ReplicationPolicy replicationPolicy; private MirrorMetrics metrics; private boolean stopping = false; + private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>(); private Semaphore outstandingOffsetSyncs; private Semaphore consumerAccess; @@ -87,6 +90,7 @@ public class MirrorSourceTask extends SourceTask { @Override public void start(Map<String, String> props) { MirrorTaskConfig config = new MirrorTaskConfig(props); + pendingOffsetSyncs.clear(); outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS); consumerAccess = new Semaphore(1); // let one thread at a time access the consumer sourceClusterAlias = config.sourceClusterAlias(); @@ -111,7 +115,9 @@ public class MirrorSourceTask extends SourceTask { @Override public void commit() { - // nop + // Publish any offset syncs that we've queued up, but have not yet been able to publish + // (likely because we previously reached our limit for number of outstanding syncs) + firePendingOffsetSyncs(); } @Override @@ -176,59 +182,81 @@ public class MirrorSourceTask extends SourceTask { @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { - try { - if (stopping) { - return; - } - if (!metadata.hasOffset()) { - log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); - return; - } - TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); - long latency = System.currentTimeMillis() - record.timestamp(); - metrics.countRecord(topicPartition); - metrics.replicationLatency(topicPartition, latency); - TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); - long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); - long downstreamOffset = metadata.offset(); - maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); - } catch (Throwable e) { - log.warn("Failure committing record.", e); + if (stopping) { + return; + } + if (metadata == null) { + log.debug("No RecordMetadata (source record was probably filtered out during transformation) -- can't sync offsets for {}.", record.topic()); + return; } + if (!metadata.hasOffset()) { + log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); + return; + } + TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); + long latency = System.currentTimeMillis() - record.timestamp(); + metrics.countRecord(topicPartition); + metrics.replicationLatency(topicPartition, latency); + TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); + long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); + long downstreamOffset = metadata.offset(); + maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset); + // We may be able to immediately publish an offset sync that we've queued up here + firePendingOffsetSyncs(); } - // updates partition state and sends OffsetSync if necessary - private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset, - long downstreamOffset) { + // updates partition state and queues up OffsetSync if necessary + private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset, + long downstreamOffset) { PartitionState partitionState = partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag)); if (partitionState.update(upstreamOffset, downstreamOffset)) { - if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) { - partitionState.reset(); + OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); + synchronized (this) { + pendingOffsetSyncs.put(topicPartition, offsetSync); } + partitionState.reset(); } } - // sends OffsetSync record upstream to internal offsets topic - private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset, - long downstreamOffset) { - if (!outstandingOffsetSyncs.tryAcquire()) { - // Too many outstanding offset syncs. - return false; + private void firePendingOffsetSyncs() { + while (true) { + OffsetSync pendingOffsetSync; + synchronized (this) { + Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator(); + if (!syncIterator.hasNext()) { + // Nothing to sync + log.trace("No more pending offset syncs"); + return; + } + pendingOffsetSync = syncIterator.next(); + if (!outstandingOffsetSyncs.tryAcquire()) { + // Too many outstanding syncs + log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later"); + return; + } + syncIterator.remove(); + } + // Publish offset sync outside of synchronized block; we may have to + // wait for producer metadata to update before Producer::send returns + sendOffsetSync(pendingOffsetSync); + log.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition()); } - OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset); + } + + // sends OffsetSync record to internal offsets topic + private void sendOffsetSync(OffsetSync offsetSync) { ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0, offsetSync.recordKey(), offsetSync.recordValue()); offsetProducer.send(record, (x, e) -> { if (e != null) { log.error("Failure sending offset sync.", e); } else { - log.trace("Sync'd offsets for {}: {}=={}", topicPartition, - upstreamOffset, downstreamOffset); + log.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(), + offsetSync.upstreamOffset(), offsetSync.downstreamOffset()); } outstandingOffsetSyncs.release(); }); - return true; } private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java index bbd9ec3aff5..300a40087c6 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java @@ -283,7 +283,11 @@ public class MirrorSourceTaskTest { }); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + // We should have dispatched this sync to the producer + verify(producer, times(1)).send(any(), any()); + mirrorSourceTask.commit(); + // No more syncs should take place; we've been able to publish all of them so far verify(producer, times(1)).send(any(), any()); recordOffset = 2; @@ -297,7 +301,11 @@ public class MirrorSourceTaskTest { doReturn(null).when(producer).send(any(), producerCallback.capture()); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + // We should have dispatched this sync to the producer + verify(producer, times(2)).send(any(), any()); + mirrorSourceTask.commit(); + // No more syncs should take place; we've been able to publish all of them so far verify(producer, times(2)).send(any(), any()); // Do not send sync event @@ -309,22 +317,35 @@ public class MirrorSourceTaskTest { recordValue.length, recordKey, recordValue, headers, Optional.empty())); mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + mirrorSourceTask.commit(); + // We should not have dispatched any more syncs to the producer; there were too many already in flight verify(producer, times(2)).send(any(), any()); + // Now the in-flight sync has been ack'd + producerCallback.getValue().onCompletion(null, null); + mirrorSourceTask.commit(); + // We should dispatch the offset sync that was queued but previously not sent to the producer now + verify(producer, times(3)).send(any(), any()); + + // Ack the latest sync immediately + producerCallback.getValue().onCompletion(null, null); + // Should send sync event - recordOffset = 5; - metadataOffset = 150; + recordOffset = 6; + metadataOffset = 106; recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition); sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition, recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length, recordValue.length, recordKey, recordValue, headers, Optional.empty())); - producerCallback.getValue().onCompletion(null, null); - mirrorSourceTask.commitRecord(sourceRecord, recordMetadata); + // We should have dispatched this sync to the producer + verify(producer, times(4)).send(any(), any()); - verify(producer, times(3)).send(any(), any()); + mirrorSourceTask.commit(); + // No more syncs should take place; we've been able to publish all of them so far + verify(producer, times(4)).send(any(), any()); } private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {
