This is an automated email from the ASF dual-hosted git repository.
edocomar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 867fb295d0b KAFKA-14742: Throttle connectors in
ExactlyOnceSourceIntegrationTest to fix flakey OOMEs (#13291)
867fb295d0b is described below
commit 867fb295d0b4e91d650d21594eeb515bd9accc54
Author: Greg Harris <[email protected]>
AuthorDate: Tue Feb 28 02:21:36 2023 -0800
KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix
flakey OOMEs (#13291)
KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix
flakey OOMEs
fixup: rename config constant, use meaningful constant names in EOSIT
Signed-off-by: Greg Harris <[email protected]>
Reviewers: Edoardo Comar <[email protected]>
---
.../ExactlyOnceSourceIntegrationTest.java | 106 +++++++++++----------
.../integration/MonitorableSourceConnector.java | 3 +-
2 files changed, 56 insertions(+), 53 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index a2dd730b344..bacfd311f5d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -81,6 +81,7 @@ import static
org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG;
import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG;
import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG;
+import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG;
import static
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
@@ -115,6 +116,12 @@ public class ExactlyOnceSourceIntegrationTest {
private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000;
private static final int DEFAULT_NUM_WORKERS = 3;
+ // Tests require that a minimum but not unreasonably large number of
records are sourced.
+ // Throttle the poll such that a reasonable amount of records are produced
while the test runs.
+ private static final int MINIMUM_MESSAGES = 100;
+ private static final String MESSAGES_PER_POLL =
Integer.toString(MINIMUM_MESSAGES);
+ private static final String MESSAGES_PER_SECOND =
Long.toString(MINIMUM_MESSAGES / 2);
+
private Properties brokerProps;
private Map<String, String> workerProps;
private EmbeddedConnectCluster.Builder connectBuilder;
@@ -255,7 +262,6 @@ public class ExactlyOnceSourceIntegrationTest {
connect.kafka().createTopic(topic, 3);
int numTasks = 1;
- int recordsProduced = 100;
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getName());
@@ -265,11 +271,12 @@ public class ExactlyOnceSourceIntegrationTest {
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
props.put(NAME_CONFIG, CONNECTOR_NAME);
props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
- props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+ props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+ props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
// expect all records to be consumed and committed by the connector
- connectorHandle.expectedRecords(recordsProduced);
- connectorHandle.expectedCommits(recordsProduced);
+ connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+ connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -293,8 +300,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
- assertTrue("Not enough records produced by source connector. Expected
at least: " + recordsProduced + " + but got " + records.count(),
- records.count() >= recordsProduced);
+ assertTrue("Not enough records produced by source connector. Expected
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+ records.count() >= MINIMUM_MESSAGES);
assertExactlyOnceSeqnos(records, numTasks);
}
@@ -314,7 +321,6 @@ public class ExactlyOnceSourceIntegrationTest {
connect.kafka().createTopic(topic, 3);
int numTasks = 1;
- int recordsProduced = 100;
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getName());
@@ -325,11 +331,12 @@ public class ExactlyOnceSourceIntegrationTest {
props.put(NAME_CONFIG, CONNECTOR_NAME);
props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString());
props.put(TRANSACTION_BOUNDARY_INTERVAL_CONFIG, "10000");
- props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+ props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+ props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
// expect all records to be consumed and committed by the connector
- connectorHandle.expectedRecords(recordsProduced);
- connectorHandle.expectedCommits(recordsProduced);
+ connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+ connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -353,8 +360,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
- assertTrue("Not enough records produced by source connector. Expected
at least: " + recordsProduced + " + but got " + records.count(),
- records.count() >= recordsProduced);
+ assertTrue("Not enough records produced by source connector. Expected
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+ records.count() >= MINIMUM_MESSAGES);
assertExactlyOnceSeqnos(records, numTasks);
}
@@ -375,8 +382,6 @@ public class ExactlyOnceSourceIntegrationTest {
String topic = "test-topic";
connect.kafka().createTopic(topic, 3);
- int recordsProduced = 100;
-
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getName());
props.put(TASKS_MAX_CONFIG, "1");
@@ -386,11 +391,12 @@ public class ExactlyOnceSourceIntegrationTest {
props.put(NAME_CONFIG, CONNECTOR_NAME);
props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString());
props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG,
MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED);
- props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+ props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+ props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
// expect all records to be consumed and committed by the connector
- connectorHandle.expectedRecords(recordsProduced);
- connectorHandle.expectedCommits(recordsProduced);
+ connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+ connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -412,15 +418,15 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
- assertTrue("Not enough records produced by source connector. Expected
at least: " + recordsProduced + " + but got " + sourceRecords.count(),
- sourceRecords.count() >= recordsProduced);
+ assertTrue("Not enough records produced by source connector. Expected
at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
+ sourceRecords.count() >= MINIMUM_MESSAGES);
// also consume from the cluster's offsets topic to verify that the
expected offsets (which should correspond to the connector's
// custom transaction boundaries) were committed
List<Long> expectedOffsetSeqnos = new ArrayList<>();
long lastExpectedOffsetSeqno = 1;
long nextExpectedOffsetSeqno = 1;
- while (nextExpectedOffsetSeqno <= recordsProduced) {
+ while (nextExpectedOffsetSeqno <= MINIMUM_MESSAGES) {
expectedOffsetSeqnos.add(nextExpectedOffsetSeqno);
nextExpectedOffsetSeqno += lastExpectedOffsetSeqno;
lastExpectedOffsetSeqno = nextExpectedOffsetSeqno -
lastExpectedOffsetSeqno;
@@ -438,7 +444,7 @@ public class ExactlyOnceSourceIntegrationTest {
assertEquals("Committed offsets should match connector-defined
transaction boundaries",
expectedOffsetSeqnos, actualOffsetSeqnos.subList(0,
expectedOffsetSeqnos.size()));
- List<Long> expectedRecordSeqnos = LongStream.range(1, recordsProduced
+ 1).boxed().collect(Collectors.toList());
+ List<Long> expectedRecordSeqnos = LongStream.range(1, MINIMUM_MESSAGES
+ 1).boxed().collect(Collectors.toList());
long priorBoundary = 1;
long nextBoundary = 2;
while (priorBoundary <
expectedRecordSeqnos.get(expectedRecordSeqnos.size() - 1)) {
@@ -474,7 +480,6 @@ public class ExactlyOnceSourceIntegrationTest {
connect.kafka().createTopic(topic, 3);
int numTasks = 1;
- int recordsProduced = 100;
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getName());
@@ -484,11 +489,12 @@ public class ExactlyOnceSourceIntegrationTest {
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
props.put(NAME_CONFIG, CONNECTOR_NAME);
props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
- props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+ props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+ props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
// expect all records to be consumed and committed by the connector
- connectorHandle.expectedRecords(recordsProduced);
- connectorHandle.expectedCommits(recordsProduced);
+ connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+ connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// make sure the worker is actually up (otherwise, it may fence out
our simulated zombie leader, instead of the other way around)
assertEquals(404,
connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus());
@@ -526,8 +532,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
- assertTrue("Not enough records produced by source connector. Expected
at least: " + recordsProduced + " + but got " + records.count(),
- records.count() >= recordsProduced);
+ assertTrue("Not enough records produced by source connector. Expected
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+ records.count() >= MINIMUM_MESSAGES);
assertExactlyOnceSeqnos(records, numTasks);
}
@@ -545,19 +551,18 @@ public class ExactlyOnceSourceIntegrationTest {
String topic = "test-topic";
connect.kafka().createTopic(topic, 3);
- int recordsProduced = 100;
-
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getName());
props.put(TOPIC_CONFIG, topic);
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
props.put(NAME_CONFIG, CONNECTOR_NAME);
- props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+ props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+ props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
// expect all records to be consumed and committed by the connector
- connectorHandle.expectedRecords(recordsProduced);
- connectorHandle.expectedCommits(recordsProduced);
+ connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+ connectorHandle.expectedCommits(MINIMUM_MESSAGES);
StartAndStopLatch connectorStart = connectorAndTaskStart(3);
props.put(TASKS_MAX_CONFIG, "3");
@@ -590,8 +595,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
- assertTrue("Not enough records produced by source connector. Expected
at least: " + recordsProduced + " + but got " + records.count(),
- records.count() >= recordsProduced);
+ assertTrue("Not enough records produced by source connector. Expected
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+ records.count() >= MINIMUM_MESSAGES);
// We used at most five tasks during the tests; each of them should
have been able to produce records
assertExactlyOnceSeqnos(records, 5);
}
@@ -743,7 +748,6 @@ public class ExactlyOnceSourceIntegrationTest {
connectorTargetedCluster.createTopic(topic, 3);
int numTasks = 1;
- int recordsProduced = 100;
Map<String, String> props = new HashMap<>();
props.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getName());
@@ -753,7 +757,8 @@ public class ExactlyOnceSourceIntegrationTest {
props.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getName());
props.put(NAME_CONFIG, CONNECTOR_NAME);
props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
- props.put(MESSAGES_PER_POLL_CONFIG,
Integer.toString(recordsProduced));
+ props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+ props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX +
BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
props.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX +
BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX +
BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
@@ -761,8 +766,8 @@ public class ExactlyOnceSourceIntegrationTest {
props.put(OFFSETS_TOPIC_CONFIG, offsetsTopic);
// expect all records to be consumed and committed by the connector
- connectorHandle.expectedRecords(recordsProduced);
- connectorHandle.expectedCommits(recordsProduced);
+ connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+ connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
@@ -778,13 +783,13 @@ public class ExactlyOnceSourceIntegrationTest {
// consume at least the expected number of records from the source
topic or fail, to ensure that they were correctly produced
int recordNum = connectorTargetedCluster
.consume(
- recordsProduced,
+ MINIMUM_MESSAGES,
TimeUnit.MINUTES.toMillis(1),
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
"read_committed"),
"test-topic")
.count();
- assertTrue("Not enough records produced by source connector.
Expected at least: " + recordsProduced + " + but got " + recordNum,
- recordNum >= recordsProduced);
+ assertTrue("Not enough records produced by source connector.
Expected at least: " + MINIMUM_MESSAGES + " + but got " + recordNum,
+ recordNum >= MINIMUM_MESSAGES);
// also consume from the connector's dedicated offsets topic
ConsumerRecords<byte[], byte[]> offsetRecords =
connectorTargetedCluster
@@ -796,8 +801,8 @@ public class ExactlyOnceSourceIntegrationTest {
);
List<Long> seqnos =
parseAndAssertOffsetsForSingleTask(offsetRecords);
seqnos.forEach(seqno ->
- assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
- 0, seqno % recordsProduced)
+ assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + MINIMUM_MESSAGES + " records",
+ 0, seqno % MINIMUM_MESSAGES)
);
// also consume from the cluster's global offsets topic
@@ -810,8 +815,8 @@ public class ExactlyOnceSourceIntegrationTest {
);
seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
seqnos.forEach(seqno ->
- assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + recordsProduced + " records",
- 0, seqno % recordsProduced)
+ assertEquals("Offset commits should occur on connector-defined
poll boundaries, which happen every " + MINIMUM_MESSAGES + " records",
+ 0, seqno % MINIMUM_MESSAGES)
);
// Shut down the whole cluster
@@ -820,8 +825,8 @@ public class ExactlyOnceSourceIntegrationTest {
workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
// Establish new expectations for records+offsets
- connectorHandle.expectedRecords(recordsProduced);
- connectorHandle.expectedCommits(recordsProduced);
+ connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+ connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// Restart the whole cluster
for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
@@ -856,8 +861,8 @@ public class ExactlyOnceSourceIntegrationTest {
null,
topic
);
- assertTrue("Not enough records produced by source connector.
Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
- sourceRecords.count() >= recordsProduced);
+ assertTrue("Not enough records produced by source connector.
Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
+ sourceRecords.count() >= MINIMUM_MESSAGES);
// also have to check which offsets have actually been committed,
since we no longer have exactly-once semantics
offsetRecords = connectorTargetedCluster.consumeAll(
CONSUME_RECORDS_TIMEOUT_MS,
@@ -896,8 +901,6 @@ public class ExactlyOnceSourceIntegrationTest {
String topic = "test-topic";
connect.kafka().createTopic(topic, 3);
- int recordsProduced = 100;
-
Map<String, String> props = new HashMap<>();
// See below; this connector does nothing except request offsets from
the worker in SourceTask::poll
// and then return a single record targeted at its offsets topic
@@ -905,7 +908,6 @@ public class ExactlyOnceSourceIntegrationTest {
props.put(TASKS_MAX_CONFIG, "1");
props.put(NAME_CONFIG, CONNECTOR_NAME);
props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString());
- props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
props.put(OFFSETS_TOPIC_CONFIG, "whoops");
// start a source connector
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 33ba1588a7d..102b10d7fb7 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -52,6 +52,7 @@ public class MonitorableSourceConnector extends
SampleSourceConnector {
public static final String TOPIC_CONFIG = "topic";
public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";
+ public static final String MAX_MESSAGES_PER_SECOND_CONFIG = "throughput";
public static final String CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG =
"custom.exactly.once.support";
public static final String EXACTLY_ONCE_SUPPORTED = "supported";
@@ -177,7 +178,7 @@ public class MonitorableSourceConnector extends
SampleSourceConnector {
startingSeqno = Optional.ofNullable((Long)
offset.get("saved")).orElse(0L);
seqno = startingSeqno;
log.info("Started {} task {} with properties {}",
this.getClass().getSimpleName(), taskId, props);
- throttler = new
ThroughputThrottler(Long.parseLong(props.getOrDefault("throughput", "-1")),
System.currentTimeMillis());
+ throttler = new
ThroughputThrottler(Long.parseLong(props.getOrDefault(MAX_MESSAGES_PER_SECOND_CONFIG,
"-1")), System.currentTimeMillis());
taskHandle.recordTaskStart();
priorTransactionBoundary = 0;
nextTransactionBoundary = 1;