This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new dc0866557c KAFKA-14089: Only check for committed seqnos after
disabling exactly-once support in Connect integration test (#12429)
dc0866557c is described below
commit dc0866557c5051b8711df67d68621a3306521b10
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Jul 28 11:18:09 2022 -0400
KAFKA-14089: Only check for committed seqnos after disabling exactly-once
support in Connect integration test (#12429)
Reviewers: Mickael Maison <[email protected]>
, Tom Bentley
<[email protected]>
---
.../runtime/ExactlyOnceWorkerSourceTask.java | 3 +
.../org/apache/kafka/connect/runtime/Worker.java | 13 ++
.../ExactlyOnceSourceIntegrationTest.java | 192 +++++++++++++++------
.../integration/MonitorableSourceConnector.java | 2 +-
4 files changed, 155 insertions(+), 55 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index de78e592aa..931917b9e1 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -217,6 +217,9 @@ class ExactlyOnceWorkerSourceTask extends
AbstractWorkerSourceTask {
if (failed) {
log.debug("Skipping final offset commit as task has failed");
return;
+ } else if (isCancelled()) {
+ log.debug("Skipping final offset commit as task has been
cancelled");
+ return;
}
// It should be safe to commit here even if we were in the middle of
retrying on RetriableExceptions in the
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 16e48d8f17..5bc67693d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -756,6 +756,19 @@ public class Worker {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
String
clusterId) {
Map<String, Object> result = baseProducerConfigs(id.connector(),
"connector-producer-" + id, config, connConfig, connectorClass,
connectorClientConfigOverridePolicy, clusterId);
+ // The base producer properties forcibly disable idempotence; remove
it from those properties
+ // if not explicitly requested by the user
+ boolean connectorProducerIdempotenceConfigured =
connConfig.originals().containsKey(
+ ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX +
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+ );
+ if (!connectorProducerIdempotenceConfigured) {
+ boolean workerProducerIdempotenceConfigured =
config.originals().containsKey(
+ "producer." + ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
+ );
+ if (!workerProducerIdempotenceConfigured) {
+ result.remove(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+ }
+ }
ConnectUtils.ensureProperty(
result, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true",
"for connectors when exactly-once source support is enabled",
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 90fcaa8a21..bd9bceba06 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -407,12 +407,12 @@ public class ExactlyOnceSourceIntegrationTest {
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed");
// consume all records from the source topic or fail, to ensure that
they were correctly produced
- ConsumerRecords<byte[], byte[]> sourceRecords = connect.kafka()
- .consume(
- recordsProduced,
- TimeUnit.MINUTES.toMillis(1),
- consumerProps,
- "test-topic");
+ ConsumerRecords<byte[], byte[]> sourceRecords =
connect.kafka().consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed"),
+ null,
+ topic
+ );
assertTrue("Not enough records produced by source connector. Expected
at least: " + recordsProduced + " + but got " + sourceRecords.count(),
sourceRecords.count() >= recordsProduced);
@@ -434,8 +434,7 @@ public class ExactlyOnceSourceIntegrationTest {
offsetsTopic
);
- List<Long> actualOffsetSeqnos = new ArrayList<>();
- offsetRecords.forEach(record ->
actualOffsetSeqnos.add(parseAndAssertOffsetForSingleTask(record)));
+ List<Long> actualOffsetSeqnos =
parseAndAssertOffsetsForSingleTask(offsetRecords);
assertEquals("Committed offsets should match connector-defined
transaction boundaries",
expectedOffsetSeqnos, actualOffsetSeqnos.subList(0,
expectedOffsetSeqnos.size()));
@@ -716,6 +715,20 @@ public class ExactlyOnceSourceIntegrationTest {
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(CONNECTOR_NAME,
tasksMax, "Connector and task should have restarted successfully");
}
+ /**
+ * This test focuses extensively on the per-connector offsets feature.
+ * <p>
+ * First, a connector is brought up whose producer is configured to write
to a different Kafka cluster
+ * than the one the Connect cluster users for its internal topics, then
the contents of the connector's
+ * dedicated offsets topic and the worker's internal offsets topic are
inspected to ensure that offsets
+ * have been backed up from the dedicated topic to the global topic.
+ * <p>
+ * Then, a "soft downgrade" is simulated: the Connect cluster is shut down
and reconfigured to disable
+ * exactly-once support. The cluster is brought up again, the connector is
allowed to produce some data,
+ * the connector is shut down, and this time, the records the connector
has produced are inspected for
+ * accuracy. Because of the downgrade, exactly-once guarantees are lost,
but we check to make sure that
+ * the task has maintained exactly-once delivery <i>up to the
last-committed record</i>.
+ */
@Test
public void testSeparateOffsetsTopic() throws Exception {
final String globalOffsetsTopic = "connect-worker-offsets-topic";
@@ -761,7 +774,7 @@ public class ExactlyOnceSourceIntegrationTest {
// wait for the connector tasks to commit enough records
connectorHandle.awaitCommits(TimeUnit.MINUTES.toMillis(1));
- // consume all records from the source topic or fail, to ensure
that they were correctly produced
+ // consume at least the expected number of records from the source
topic or fail, to ensure that they were correctly produced
int recordNum = connectorTargetedCluster
.consume(
recordsProduced,
@@ -772,28 +785,33 @@ public class ExactlyOnceSourceIntegrationTest {
assertTrue("Not enough records produced by source connector.
Expected at least: " + recordsProduced + " + but got " + recordNum,
recordNum >= recordsProduced);
- // also consume from the connector's dedicated offsets topic; just
need to read one offset record
- ConsumerRecord<byte[], byte[]> offsetRecord =
connectorTargetedCluster
- .consume(
- 1,
+ // also consume from the connector's dedicated offsets topic
+ ConsumerRecords<byte[], byte[]> offsetRecords =
connectorTargetedCluster
+ .consumeAll(
TimeUnit.MINUTES.toMillis(1),
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed"),
+ null,
offsetsTopic
- ).iterator().next();
- long seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
- assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
- 0, seqno % recordsProduced);
+ );
+ List<Long> seqnos =
parseAndAssertOffsetsForSingleTask(offsetRecords);
+ seqnos.forEach(seqno ->
+ assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
+ 0, seqno % recordsProduced)
+ );
- // also consume from the cluster's global offsets topic; again,
just need to read one offset record
- offsetRecord = connect.kafka()
- .consume(
- 1,
+ // also consume from the cluster's global offsets topic
+ offsetRecords = connect.kafka()
+ .consumeAll(
TimeUnit.MINUTES.toMillis(1),
+ null,
+ null,
globalOffsetsTopic
- ).iterator().next();
- seqno = parseAndAssertOffsetForSingleTask(offsetRecord);
- assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
- 0, seqno % recordsProduced);
+ );
+ seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
+ seqnos.forEach(seqno ->
+ assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
+ 0, seqno % recordsProduced)
+ );
// Shut down the whole cluster
connect.workers().forEach(connect::removeWorker);
@@ -831,15 +849,22 @@ public class ExactlyOnceSourceIntegrationTest {
assertConnectorStopped(connectorStop);
// consume all records from the source topic or fail, to ensure
that they were correctly produced
- ConsumerRecords<byte[], byte[]> records =
connectorTargetedCluster.consumeAll(
+ ConsumerRecords<byte[], byte[]> sourceRecords =
connectorTargetedCluster.consumeAll(
CONSUME_RECORDS_TIMEOUT_MS,
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed"),
null,
topic
);
- assertTrue("Not enough records produced by source connector.
Expected at least: " + recordsProduced + " + but got " + records.count(),
- records.count() >= recordsProduced);
- assertExactlyOnceSeqnos(records, numTasks);
+ assertTrue("Not enough records produced by source connector.
Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
+ sourceRecords.count() >= recordsProduced);
+ // also have to check which offsets have actually been committed,
since we no longer have exactly-once guarantees
+ offsetRecords = connectorTargetedCluster.consumeAll(
+ CONSUME_RECORDS_TIMEOUT_MS,
+
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed"),
+ null,
+ offsetsTopic
+ );
+ assertAtLeastOnceSeqnos(sourceRecords, offsetRecords, numTasks);
}
}
@@ -896,27 +921,10 @@ public class ExactlyOnceSourceIntegrationTest {
.orElseThrow(() -> new AssertionError("Failed to find
configuration validation result for property '" + property + "'"));
}
- @SuppressWarnings("unchecked")
- private long parseAndAssertOffsetForSingleTask(ConsumerRecord<byte[],
byte[]> offsetRecord) {
- JsonConverter offsetsConverter = new JsonConverter();
- // The JSON converter behaves identically for keys and values. If that
ever changes, we may need to update this test to use
- // separate converter instances.
-
-
offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
"false"), false);
- Object keyObject = offsetsConverter.toConnectData("topic name is not
used by converter", offsetRecord.key()).value();
- Object valueObject = offsetsConverter.toConnectData("topic name is not
used by converter", offsetRecord.value()).value();
-
- assertNotNull("Offset value should not be null", valueObject);
-
- assertEquals("Serialized source partition should match expected
format",
- Arrays.asList(CONNECTOR_NAME,
MonitorableSourceConnector.sourcePartition(MonitorableSourceConnector.taskId(CONNECTOR_NAME,
0))),
- keyObject);
-
- Map<String, Object> value = assertAndCast(valueObject, Map.class,
"Value");
-
- Object seqnoObject = value.get("saved");
- assertNotNull("Serialized source offset should contain 'seqno' field
from MonitorableSourceConnector", seqnoObject);
- return assertAndCast(seqnoObject, Long.class, "Seqno offset field");
+ private List<Long>
parseAndAssertOffsetsForSingleTask(ConsumerRecords<byte[], byte[]>
offsetRecords) {
+ Map<Integer, List<Long>> parsedOffsets =
parseOffsetForTasks(offsetRecords);
+ assertEquals("Expected records to only be produced from a single
task", Collections.singleton(0), parsedOffsets.keySet());
+ return parsedOffsets.get(0);
}
private List<Long>
parseAndAssertValuesForSingleTask(ConsumerRecords<byte[], byte[]>
sourceRecords) {
@@ -927,6 +935,25 @@ public class ExactlyOnceSourceIntegrationTest {
private void assertExactlyOnceSeqnos(ConsumerRecords<byte[], byte[]>
sourceRecords, int numTasks) {
Map<Integer, List<Long>> parsedValues =
parseValuesForTasks(sourceRecords);
+ assertSeqnos(parsedValues, numTasks);
+ }
+
+ private void assertAtLeastOnceSeqnos(ConsumerRecords<byte[], byte[]>
sourceRecords, ConsumerRecords<byte[], byte[]> offsetRecords, int numTasks) {
+ Map<Integer, List<Long>> parsedValues =
parseValuesForTasks(sourceRecords);
+ Map<Integer, Long> lastCommittedValues =
parseOffsetForTasks(offsetRecords)
+ .entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> Collections.max(e.getValue())
+ ));
+ parsedValues.replaceAll((task, values) -> {
+ Long committedValue = lastCommittedValues.get(task);
+ assertNotNull("No committed offset found for task " + task,
committedValue);
+ return values.stream().filter(v -> v <=
committedValue).collect(Collectors.toList());
+ });
+ assertSeqnos(parsedValues, numTasks);
+ }
+
+ private void assertSeqnos(Map<Integer, List<Long>> parsedValues, int
numTasks) {
Set<Integer> expectedKeys = IntStream.range(0,
numTasks).boxed().collect(Collectors.toSet());
assertEquals("Expected records to be produced by each task",
expectedKeys, parsedValues.keySet());
@@ -935,10 +962,19 @@ public class ExactlyOnceSourceIntegrationTest {
// which makes in-order consumption impossible
Set<Long> expectedSeqnos = LongStream.range(1, seqnos.size() +
1).boxed().collect(Collectors.toSet());
Set<Long> actualSeqnos = new HashSet<>(seqnos);
- assertEquals(
- "Seqnos for task " + taskId + " should start at 1 and
increase strictly by 1 with each record",
- expectedSeqnos,
- actualSeqnos
+
+ Set<Long> missingSeqnos = new HashSet<>(expectedSeqnos);
+ missingSeqnos.removeAll(actualSeqnos);
+ Set<Long> extraSeqnos = new HashSet<>(actualSeqnos);
+ extraSeqnos.removeAll(expectedSeqnos);
+
+ // Try to provide the most friendly error message possible if this
test fails
+ assertTrue(
+ "Seqnos for task " + taskId + " should start at 1 and
increase strictly by 1 with each record, " +
+ "but the actual seqnos did not.\n" +
+ "Seqnos that should have been emitted but were
not: " + missingSeqnos + "\n" +
+ "seqnos that should not have been emitted but
were: " + extraSeqnos,
+ missingSeqnos.isEmpty() && extraSeqnos.isEmpty()
);
});
}
@@ -986,6 +1022,54 @@ public class ExactlyOnceSourceIntegrationTest {
return result;
}
+ private Map<Integer, List<Long>>
parseOffsetForTasks(ConsumerRecords<byte[], byte[]> offsetRecords) {
+ JsonConverter offsetsConverter = new JsonConverter();
+ // The JSON converter behaves identically for keys and values. If that
ever changes, we may need to update this test to use
+ // separate converter instances.
+
offsetsConverter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
"false"), false);
+
+ Map<Integer, List<Long>> result = new HashMap<>();
+ for (ConsumerRecord<byte[], byte[]> offsetRecord : offsetRecords) {
+ Object keyObject = offsetsConverter.toConnectData("topic name is
not used by converter", offsetRecord.key()).value();
+ Object valueObject = offsetsConverter.toConnectData("topic name is
not used by converter", offsetRecord.value()).value();
+
+ assertNotNull("Offset key should not be null", keyObject);
+ assertNotNull("Offset value should not be null", valueObject);
+
+ @SuppressWarnings("unchecked")
+ List<Object> key = assertAndCast(keyObject, List.class, "Key");
+ assertEquals(
+ "Offset topic key should be a list containing two
elements: the name of the connector, and the connector-provided source
partition",
+ 2,
+ key.size()
+ );
+ assertEquals(CONNECTOR_NAME, key.get(0));
+ @SuppressWarnings("unchecked")
+ Map<String, Object> partition = assertAndCast(key.get(1),
Map.class, "Key[1]");
+ Object taskIdObject = partition.get("task.id");
+ assertNotNull("Serialized source partition should contain
'task.id' field from MonitorableSourceConnector", taskIdObject);
+ String taskId = assertAndCast(taskIdObject, String.class, "task
ID");
+ assertTrue("task ID should match pattern
'<connectorName>-<taskId>", taskId.startsWith(CONNECTOR_NAME + "-"));
+ String taskIdRemainder = taskId.substring(CONNECTOR_NAME.length()
+ 1);
+ int taskNum;
+ try {
+ taskNum = Integer.parseInt(taskIdRemainder);
+ } catch (NumberFormatException e) {
+ throw new AssertionError("task ID should match pattern
'<connectorName>-<taskId>', where <taskId> is an integer", e);
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> value = assertAndCast(valueObject, Map.class,
"Value");
+
+ Object seqnoObject = value.get("saved");
+ assertNotNull("Serialized source offset should contain 'seqno'
field from MonitorableSourceConnector", seqnoObject);
+ long seqno = assertAndCast(seqnoObject, Long.class, "Seqno offset
field");
+
+ result.computeIfAbsent(taskNum, t -> new ArrayList<>()).add(seqno);
+ }
+ return result;
+ }
+
@SuppressWarnings("unchecked")
private static <T> T assertAndCast(Object o, Class<T> klass, String
objectDescription) {
String className = o == null ? "null" : o.getClass().getName();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index c2820315d6..33ba1588a7 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -172,7 +172,7 @@ public class MonitorableSourceConnector extends
SampleSourceConnector {
batchSize =
Integer.parseInt(props.getOrDefault(MESSAGES_PER_POLL_CONFIG, "1"));
taskHandle =
RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
Map<String, Object> offset = Optional.ofNullable(
-
context.offsetStorageReader().offset(Collections.singletonMap("task.id",
taskId)))
+
context.offsetStorageReader().offset(sourcePartition(taskId)))
.orElse(Collections.emptyMap());
startingSeqno = Optional.ofNullable((Long)
offset.get("saved")).orElse(0L);
seqno = startingSeqno;