This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 897ced12eeb KAFKA-14797: Emit offset sync when offset translation lag
would exceed max.offset.lag (#13367)
897ced12eeb is described below
commit 897ced12eebe7cea7a5ed9227f463107d858fbc2
Author: Greg Harris <[email protected]>
AuthorDate: Tue Mar 21 06:31:08 2023 -0700
KAFKA-14797: Emit offset sync when offset translation lag would exceed
max.offset.lag (#13367)
Reviewers: Chris Egerton <[email protected]>
---
.../kafka/connect/mirror/MirrorSourceTask.java | 16 +++----
.../kafka/connect/mirror/MirrorSourceTaskTest.java | 6 ++-
.../IdentityReplicationIntegrationTest.java | 21 +++------
.../MirrorConnectorsIntegrationBaseTest.java | 50 ++++++++++++++--------
4 files changed, 52 insertions(+), 41 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 09de13aff34..e088e68f31f 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -305,7 +305,6 @@ public class MirrorSourceTask extends SourceTask {
static class PartitionState {
long previousUpstreamOffset = -1L;
long previousDownstreamOffset = -1L;
- long lastSyncUpstreamOffset = -1L;
long lastSyncDownstreamOffset = -1L;
long maxOffsetLag;
boolean shouldSyncOffsets;
@@ -316,13 +315,14 @@ public class MirrorSourceTask extends SourceTask {
// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
- long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
- long downstreamTargetOffset = lastSyncDownstreamOffset +
upstreamStep;
- if (lastSyncDownstreamOffset == -1L
- || downstreamOffset - downstreamTargetOffset >=
maxOffsetLag
- || upstreamOffset - previousUpstreamOffset != 1L
- || downstreamOffset < previousDownstreamOffset) {
- lastSyncUpstreamOffset = upstreamOffset;
+ // Emit an offset sync if any of the following conditions are true
+ boolean noPreviousSyncThisLifetime = lastSyncDownstreamOffset ==
-1L;
+ // the OffsetSync::translateDownstream method will translate this
offset 1 past the last sync, so add 1.
+ // TODO: share common implementation to enforce this relationship
+ boolean translatedOffsetTooStale = downstreamOffset -
(lastSyncDownstreamOffset + 1) >= maxOffsetLag;
+ boolean skippedUpstreamRecord = upstreamOffset -
previousUpstreamOffset != 1L;
+ boolean truncatedDownstreamTopic = downstreamOffset <
previousDownstreamOffset;
+ if (noPreviousSyncThisLifetime || translatedOffsetTooStale ||
skippedUpstreamRecord || truncatedDownstreamTopic) {
lastSyncDownstreamOffset = downstreamOffset;
shouldSyncOffsets = true;
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index b309df79fd9..0c566eb596b 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -99,11 +99,13 @@ public class MirrorSourceTaskTest {
partitionState.reset();
assertFalse(partitionState.update(3, 152), "no sync");
partitionState.reset();
- assertFalse(partitionState.update(4, 153), "no sync");
+ assertTrue(partitionState.update(4, 153), "one past target offset");
partitionState.reset();
assertFalse(partitionState.update(5, 154), "no sync");
partitionState.reset();
- assertTrue(partitionState.update(6, 205), "one past target offset");
+ assertFalse(partitionState.update(6, 203), "no sync");
+ partitionState.reset();
+ assertTrue(partitionState.update(7, 204), "one past target offset");
partitionState.reset();
assertTrue(partitionState.update(2, 206), "upstream reset");
partitionState.reset();
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 17aa9ebc142..f2c8753fe36 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.connect.mirror.integration;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.TopicPartition;
@@ -188,8 +187,8 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
}
}
- @Test
- public void testOneWayReplicationWithAutoOffsetSync() throws
InterruptedException {
+ @Override
+ public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws
InterruptedException {
produceMessages(primary, "test-topic-1");
String consumerGroupName =
"consumer-group-testOneWayReplicationWithAutoOffsetSync";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
@@ -206,6 +205,7 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
// enable automated consumer group offset sync
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS +
".enabled", "false");
@@ -221,14 +221,9 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
consumerProps, "test-topic-1")) {
waitForConsumerGroupFullSync(backup,
Collections.singletonList("test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
-
- ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
- // the size of consumer record should be zero, because the offsets
of the same consumer group
- // have been automatically synchronized from primary to backup by
the background job, so no
- // more records to consume from the replicated topic by the same
consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not
zero");
+ assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer,
offsetLagMax);
}
// now create a new topic in primary cluster
@@ -251,11 +246,9 @@ public class IdentityReplicationIntegrationTest extends
MirrorConnectorsIntegrat
"group.id", consumerGroupName), "test-topic-1",
"test-topic-2")) {
waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1",
"test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
- ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // similar reasoning as above, no more records to consume by the
same consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not
zero");
+ assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer,
offsetLagMax);
}
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 82772ceb06e..b2950d4f19e 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -90,6 +90,7 @@ public class MirrorConnectorsIntegrationBaseTest {
protected static final int NUM_RECORDS_PER_PARTITION = 10;
protected static final int NUM_PARTITIONS = 10;
protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS *
NUM_RECORDS_PER_PARTITION;
+ protected static final int OFFSET_LAG_MAX = 10;
protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
protected static final int CHECKPOINT_DURATION_MS = 20_000;
private static final int RECORD_CONSUME_DURATION_MS = 20_000;
@@ -416,6 +417,15 @@ public class MirrorConnectorsIntegrationBaseTest {
@Test
public void testOneWayReplicationWithAutoOffsetSync() throws
InterruptedException {
+ testOneWayReplicationWithOffsetSyncs(OFFSET_LAG_MAX);
+ }
+
+ @Test
+ public void testOneWayReplicationWithFrequentOffsetSyncs() throws
InterruptedException {
+ testOneWayReplicationWithOffsetSyncs(0);
+ }
+
+ public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws
InterruptedException {
produceMessages(primary, "test-topic-1");
String consumerGroupName =
"consumer-group-testOneWayReplicationWithAutoOffsetSync";
Map<String, Object> consumerProps = new HashMap<String, Object>() {{
@@ -432,6 +442,7 @@ public class MirrorConnectorsIntegrationBaseTest {
// enable automated consumer group offset sync
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS +
".enabled", "false");
@@ -448,14 +459,9 @@ public class MirrorConnectorsIntegrationBaseTest {
consumerProps, "primary.test-topic-1")) {
waitForConsumerGroupFullSync(backup,
Collections.singletonList("primary.test-topic-1"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
- ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-
- // the size of consumer record should be zero, because the offsets
of the same consumer group
- // have been automatically synchronized from primary to backup by
the background job, so no
- // more records to consume from the replicated topic by the same
consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not
zero");
+ assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer,
offsetLagMax);
}
// now create a new topic in primary cluster
@@ -478,11 +484,9 @@ public class MirrorConnectorsIntegrationBaseTest {
"group.id", consumerGroupName), "primary.test-topic-1",
"primary.test-topic-2")) {
waitForConsumerGroupFullSync(backup,
Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
- consumerGroupName, NUM_RECORDS_PRODUCED);
+ consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
- ConsumerRecords<byte[], byte[]> records =
backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
- // similar reasoning as above, no more records to consume by the
same consumer group at backup cluster
- assertEquals(0, records.count(), "consumer record size is not
zero");
+ assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer,
offsetLagMax);
}
assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS +
".checkpoints.internal");
@@ -608,13 +612,14 @@ public class MirrorConnectorsIntegrationBaseTest {
warmUpConsumer(consumerProps);
mm2Props.put("sync.group.offsets.enabled", "true");
mm2Props.put("sync.group.offsets.interval.seconds", "1");
+ mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config,
PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
produceMessages(primary, "test-topic-1");
try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
}
- waitForConsumerGroupFullSync(backup,
Collections.singletonList(remoteTopic), consumerGroupName,
NUM_RECORDS_PRODUCED);
+ waitForConsumerGroupFullSync(backup,
Collections.singletonList(remoteTopic), consumerGroupName,
NUM_RECORDS_PRODUCED, OFFSET_LAG_MAX);
restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
Thread.sleep(5000);
@@ -622,7 +627,7 @@ public class MirrorConnectorsIntegrationBaseTest {
try (Consumer<byte[], byte[]> primaryConsumer =
primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
}
- waitForConsumerGroupFullSync(backup,
Collections.singletonList(remoteTopic), consumerGroupName, 2 *
NUM_RECORDS_PRODUCED);
+ waitForConsumerGroupFullSync(backup,
Collections.singletonList(remoteTopic), consumerGroupName, 2 *
NUM_RECORDS_PRODUCED, OFFSET_LAG_MAX);
assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
}
@@ -800,7 +805,11 @@ public class MirrorConnectorsIntegrationBaseTest {
* offsets are eventually synced to the expected offset numbers
*/
protected static <T> void waitForConsumerGroupFullSync(
- EmbeddedConnectCluster connect, List<String> topics, String
consumerGroupId, int numRecords
+ EmbeddedConnectCluster connect,
+ List<String> topics,
+ String consumerGroupId,
+ int numRecords,
+ int offsetLagMax
) throws InterruptedException {
int expectedRecords = numRecords * topics.size();
Map<String, Object> consumerProps = new HashMap<>();
@@ -834,7 +843,7 @@ public class MirrorConnectorsIntegrationBaseTest {
for (TopicPartition tp : tps) {
assertTrue(consumerGroupOffsets.containsKey(tp),
"TopicPartition " + tp + " does not have
translated offsets");
- assertTrue(consumerGroupOffsets.get(tp).offset() >
lastOffset.get(tp),
+ assertTrue(consumerGroupOffsets.get(tp).offset() >
lastOffset.get(tp) - offsetLagMax,
"TopicPartition " + tp + " does not have
fully-translated offsets");
assertTrue(consumerGroupOffsets.get(tp).offset() <=
endOffsets.get(tp).offset(),
"TopicPartition " + tp + " has downstream offsets
beyond the log end, this would lead to negative lag metrics");
@@ -870,6 +879,15 @@ public class MirrorConnectorsIntegrationBaseTest {
}
}
+ protected static void
assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]>
targetConsumer, int offsetLagMax) {
+ ConsumerRecords<byte[], byte[]> records =
targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+ // After a full sync, there should be at most offset.lag.max records
per partition consumed by both upstream and downstream consumers.
+ for (TopicPartition tp : records.partitions()) {
+ int count = records.records(tp).size();
+ assertTrue(count < offsetLagMax, "downstream consumer is
re-reading more than " + offsetLagMax + " records from" + tp);
+ }
+ }
+
/*
* make sure the consumer to consume expected number of records
*/
@@ -903,8 +921,6 @@ public class MirrorConnectorsIntegrationBaseTest {
mm2Props.put("offset.storage.replication.factor", "1");
mm2Props.put("status.storage.replication.factor", "1");
mm2Props.put("replication.factor", "1");
- // Sync offsets as soon as possible to ensure the final record in a
finite test has its offset translated.
- mm2Props.put("offset.lag.max", "0");
mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms",
"5000");
mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms",
"5000");
return mm2Props;