This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 897ced12eeb KAFKA-14797: Emit offset sync when offset translation lag 
would exceed max.offset.lag (#13367)
897ced12eeb is described below

commit 897ced12eebe7cea7a5ed9227f463107d858fbc2
Author: Greg Harris <[email protected]>
AuthorDate: Tue Mar 21 06:31:08 2023 -0700

    KAFKA-14797: Emit offset sync when offset translation lag would exceed 
max.offset.lag (#13367)
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     | 16 +++----
 .../kafka/connect/mirror/MirrorSourceTaskTest.java |  6 ++-
 .../IdentityReplicationIntegrationTest.java        | 21 +++------
 .../MirrorConnectorsIntegrationBaseTest.java       | 50 ++++++++++++++--------
 4 files changed, 52 insertions(+), 41 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 09de13aff34..e088e68f31f 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -305,7 +305,6 @@ public class MirrorSourceTask extends SourceTask {
     static class PartitionState {
         long previousUpstreamOffset = -1L;
         long previousDownstreamOffset = -1L;
-        long lastSyncUpstreamOffset = -1L;
         long lastSyncDownstreamOffset = -1L;
         long maxOffsetLag;
         boolean shouldSyncOffsets;
@@ -316,13 +315,14 @@ public class MirrorSourceTask extends SourceTask {
 
         // true if we should emit an offset sync
         boolean update(long upstreamOffset, long downstreamOffset) {
-            long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
-            long downstreamTargetOffset = lastSyncDownstreamOffset + 
upstreamStep;
-            if (lastSyncDownstreamOffset == -1L
-                    || downstreamOffset - downstreamTargetOffset >= 
maxOffsetLag
-                    || upstreamOffset - previousUpstreamOffset != 1L
-                    || downstreamOffset < previousDownstreamOffset) {
-                lastSyncUpstreamOffset = upstreamOffset;
+            // Emit an offset sync if any of the following conditions are true
+            boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset == 
-1L;
+            // the OffsetSync::translateDownstream method will translate this 
offset 1 past the last sync, so add 1.
+            // TODO: share common implementation to enforce this relationship
+            boolean translatedOffsetTooStale = downstreamOffset - 
(lastSyncDownstreamOffset + 1) >= maxOffsetLag;
+            boolean skippedUpstreamRecord = upstreamOffset - 
previousUpstreamOffset != 1L;
+            boolean truncatedDownstreamTopic = downstreamOffset < 
previousDownstreamOffset;
+            if (noPreviousSyncThisLifetime || translatedOffsetTooStale || 
skippedUpstreamRecord || truncatedDownstreamTopic) {
                 lastSyncDownstreamOffset = downstreamOffset;
                 shouldSyncOffsets = true;
             }
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index b309df79fd9..0c566eb596b 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -99,11 +99,13 @@ public class MirrorSourceTaskTest {
         partitionState.reset();
         assertFalse(partitionState.update(3, 152), "no sync");
         partitionState.reset();
-        assertFalse(partitionState.update(4, 153), "no sync");
+        assertTrue(partitionState.update(4, 153), "one past target offset");
         partitionState.reset();
         assertFalse(partitionState.update(5, 154), "no sync");
         partitionState.reset();
-        assertTrue(partitionState.update(6, 205), "one past target offset");
+        assertFalse(partitionState.update(6, 203), "no sync");
+        partitionState.reset();
+        assertTrue(partitionState.update(7, 204), "one past target offset");
         partitionState.reset();
         assertTrue(partitionState.update(2, 206), "upstream reset");
         partitionState.reset();
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 17aa9ebc142..f2c8753fe36 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.mirror.integration;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -188,8 +187,8 @@ public class IdentityReplicationIntegrationTest extends 
MirrorConnectorsIntegrat
         }
     }
 
-    @Test
-    public void testOneWayReplicationWithAutoOffsetSync() throws 
InterruptedException {
+    @Override
+    public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws 
InterruptedException {
         produceMessages(primary, "test-topic-1");
         String consumerGroupName = 
"consumer-group-testOneWayReplicationWithAutoOffsetSync";
         Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
@@ -206,6 +205,7 @@ public class IdentityReplicationIntegrationTest extends 
MirrorConnectorsIntegrat
         // enable automated consumer group offset sync
         mm2Props.put("sync.group.offsets.enabled", "true");
         mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
         // one way replication from primary to backup
         mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
 
@@ -221,14 +221,9 @@ public class IdentityReplicationIntegrationTest extends 
MirrorConnectorsIntegrat
                 consumerProps, "test-topic-1")) {
 
             waitForConsumerGroupFullSync(backup, 
Collections.singletonList("test-topic-1"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED);
-
-            ConsumerRecords<byte[], byte[]> records = 
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
-            // the size of consumer record should be zero, because the offsets 
of the same consumer group
-            // have been automatically synchronized from primary to backup by 
the background job, so no
-            // more records to consume from the replicated topic by the same 
consumer group at backup cluster
-            assertEquals(0, records.count(), "consumer record size is not 
zero");
+            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, 
offsetLagMax);
         }
 
         // now create a new topic in primary cluster
@@ -251,11 +246,9 @@ public class IdentityReplicationIntegrationTest extends 
MirrorConnectorsIntegrat
                 "group.id", consumerGroupName), "test-topic-1", 
"test-topic-2")) {
 
             waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", 
"test-topic-2"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED);
+                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
-            ConsumerRecords<byte[], byte[]> records = 
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            // similar reasoning as above, no more records to consume by the 
same consumer group at backup cluster
-            assertEquals(0, records.count(), "consumer record size is not 
zero");
+            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, 
offsetLagMax);
         }
 
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 82772ceb06e..b2950d4f19e 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -90,6 +90,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected static final int NUM_RECORDS_PER_PARTITION = 10;
     protected static final int NUM_PARTITIONS = 10;
     protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+    protected static final int OFFSET_LAG_MAX = 10;
     protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
     protected static final int CHECKPOINT_DURATION_MS = 20_000;
     private static final int RECORD_CONSUME_DURATION_MS = 20_000;
@@ -416,6 +417,15 @@ public class MirrorConnectorsIntegrationBaseTest {
 
     @Test
     public void testOneWayReplicationWithAutoOffsetSync() throws 
InterruptedException {
+        testOneWayReplicationWithOffsetSyncs(OFFSET_LAG_MAX);
+    }
+
+    @Test
+    public void testOneWayReplicationWithFrequentOffsetSyncs() throws 
InterruptedException {
+        testOneWayReplicationWithOffsetSyncs(0);
+    }
+
+    public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws 
InterruptedException {
         produceMessages(primary, "test-topic-1");
         String consumerGroupName = 
"consumer-group-testOneWayReplicationWithAutoOffsetSync";
         Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
@@ -432,6 +442,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         // enable automated consumer group offset sync
         mm2Props.put("sync.group.offsets.enabled", "true");
         mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
         // one way replication from primary to backup
         mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + 
".enabled", "false");
 
@@ -448,14 +459,9 @@ public class MirrorConnectorsIntegrationBaseTest {
             consumerProps, "primary.test-topic-1")) {
 
             waitForConsumerGroupFullSync(backup, 
Collections.singletonList("primary.test-topic-1"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED);
+                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
-            ConsumerRecords<byte[], byte[]> records = 
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-
-            // the size of consumer record should be zero, because the offsets 
of the same consumer group
-            // have been automatically synchronized from primary to backup by 
the background job, so no
-            // more records to consume from the replicated topic by the same 
consumer group at backup cluster
-            assertEquals(0, records.count(), "consumer record size is not 
zero");
+            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, 
offsetLagMax);
         }
 
         // now create a new topic in primary cluster
@@ -478,11 +484,9 @@ public class MirrorConnectorsIntegrationBaseTest {
             "group.id", consumerGroupName), "primary.test-topic-1", 
"primary.test-topic-2")) {
 
             waitForConsumerGroupFullSync(backup, 
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED);
+                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
-            ConsumerRecords<byte[], byte[]> records = 
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            // similar reasoning as above, no more records to consume by the 
same consumer group at backup cluster
-            assertEquals(0, records.count(), "consumer record size is not 
zero");
+            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, 
offsetLagMax);
         }
 
         assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + 
".checkpoints.internal");
@@ -608,13 +612,14 @@ public class MirrorConnectorsIntegrationBaseTest {
         warmUpConsumer(consumerProps);
         mm2Props.put("sync.group.offsets.enabled", "true");
         mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
         mm2Config = new MirrorMakerConfig(mm2Props);
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, 
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
         produceMessages(primary, "test-topic-1");
         try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
             waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
         }
-        waitForConsumerGroupFullSync(backup, 
Collections.singletonList(remoteTopic), consumerGroupName, 
NUM_RECORDS_PRODUCED);
+        waitForConsumerGroupFullSync(backup, 
Collections.singletonList(remoteTopic), consumerGroupName, 
NUM_RECORDS_PRODUCED, OFFSET_LAG_MAX);
         restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
         Thread.sleep(5000);
@@ -622,7 +627,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         try (Consumer<byte[], byte[]> primaryConsumer = 
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
             waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
         }
-        waitForConsumerGroupFullSync(backup, 
Collections.singletonList(remoteTopic), consumerGroupName, 2 * 
NUM_RECORDS_PRODUCED);
+        waitForConsumerGroupFullSync(backup, 
Collections.singletonList(remoteTopic), consumerGroupName, 2 * 
NUM_RECORDS_PRODUCED, OFFSET_LAG_MAX);
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
@@ -800,7 +805,11 @@ public class MirrorConnectorsIntegrationBaseTest {
      * offsets are eventually synced to the expected offset numbers
      */
     protected static <T> void waitForConsumerGroupFullSync(
-            EmbeddedConnectCluster connect, List<String> topics, String 
consumerGroupId, int numRecords
+            EmbeddedConnectCluster connect,
+            List<String> topics,
+            String consumerGroupId,
+            int numRecords,
+            int offsetLagMax
     ) throws InterruptedException {
         int expectedRecords = numRecords * topics.size();
         Map<String, Object> consumerProps = new HashMap<>();
@@ -834,7 +843,7 @@ public class MirrorConnectorsIntegrationBaseTest {
                 for (TopicPartition tp : tps) {
                     assertTrue(consumerGroupOffsets.containsKey(tp),
                             "TopicPartition " + tp + " does not have 
translated offsets");
-                    assertTrue(consumerGroupOffsets.get(tp).offset() > 
lastOffset.get(tp),
+                    assertTrue(consumerGroupOffsets.get(tp).offset() > 
lastOffset.get(tp) - offsetLagMax,
                             "TopicPartition " + tp + " does not have 
fully-translated offsets");
                     assertTrue(consumerGroupOffsets.get(tp).offset() <= 
endOffsets.get(tp).offset(),
                             "TopicPartition " + tp + " has downstream offsets 
beyond the log end, this would lead to negative lag metrics");
@@ -870,6 +879,15 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
+    protected static void 
assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> 
targetConsumer, int offsetLagMax) {
+        ConsumerRecords<byte[], byte[]> records = 
targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+        // After a full sync, there should be at most offset.lag.max records 
per partition consumed by both upstream and downstream consumers.
+        for (TopicPartition tp : records.partitions()) {
+            int count = records.records(tp).size();
+            assertTrue(count < offsetLagMax,  "downstream consumer is 
re-reading more than " + offsetLagMax + " records from" + tp);
+        }
+    }
+
     /*
      * make sure the consumer to consume expected number of records
      */
@@ -903,8 +921,6 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Props.put("offset.storage.replication.factor", "1");
         mm2Props.put("status.storage.replication.factor", "1");
         mm2Props.put("replication.factor", "1");
-        // Sync offsets as soon as possible to ensure the final record in a 
finite test has its offset translated.
-        mm2Props.put("offset.lag.max", "0");
         mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms", 
"5000");
         mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms", 
"5000");
         return mm2Props;

Reply via email to