This is an automated email from the ASF dual-hosted git repository.

edocomar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 867fb295d0b KAFKA-14742: Throttle connectors in 
ExactlyOnceSourceIntegrationTest to fix flakey OOMEs (#13291)
867fb295d0b is described below

commit 867fb295d0b4e91d650d21594eeb515bd9accc54
Author: Greg Harris <[email protected]>
AuthorDate: Tue Feb 28 02:21:36 2023 -0800

    KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix 
flakey OOMEs (#13291)
    
    KAFKA-14742: Throttle connectors in ExactlyOnceSourceIntegrationTest to fix 
flakey OOMEs
    fixup: rename config constant, use meaningful constant names in EOSIT
    
    Signed-off-by: Greg Harris <[email protected]>
    
    Reviewers: Edoardo Comar <[email protected]>
---
 .../ExactlyOnceSourceIntegrationTest.java          | 106 +++++++++++----------
 .../integration/MonitorableSourceConnector.java    |   3 +-
 2 files changed, 56 insertions(+), 53 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index a2dd730b344..bacfd311f5d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -81,6 +81,7 @@ import static 
org.apache.kafka.clients.producer.ProducerConfig.TRANSACTIONAL_ID_
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.CUSTOM_TRANSACTION_BOUNDARIES_CONFIG;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.MESSAGES_PER_POLL_CONFIG;
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.MAX_MESSAGES_PER_SECOND_CONFIG;
 import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX;
@@ -115,6 +116,12 @@ public class ExactlyOnceSourceIntegrationTest {
     private static final int SOURCE_TASK_PRODUCE_TIMEOUT_MS = 30_000;
     private static final int DEFAULT_NUM_WORKERS = 3;
 
+    // Tests require that a minimum but not unreasonably large number of 
records are sourced.
+    // Throttle the poll such that a reasonable amount of records are produced 
while the test runs.
+    private static final int MINIMUM_MESSAGES = 100;
+    private static final String MESSAGES_PER_POLL = 
Integer.toString(MINIMUM_MESSAGES);
+    private static final String MESSAGES_PER_SECOND = 
Long.toString(MINIMUM_MESSAGES / 2);
+
     private Properties brokerProps;
     private Map<String, String> workerProps;
     private EmbeddedConnectCluster.Builder connectBuilder;
@@ -255,7 +262,6 @@ public class ExactlyOnceSourceIntegrationTest {
         connect.kafka().createTopic(topic, 3);
 
         int numTasks = 1;
-        int recordsProduced = 100;
 
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getName());
@@ -265,11 +271,12 @@ public class ExactlyOnceSourceIntegrationTest {
         props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
         props.put(NAME_CONFIG, CONNECTOR_NAME);
         props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
-        props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+        props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+        props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
 
         // expect all records to be consumed and committed by the connector
-        connectorHandle.expectedRecords(recordsProduced);
-        connectorHandle.expectedCommits(recordsProduced);
+        connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+        connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
@@ -293,8 +300,8 @@ public class ExactlyOnceSourceIntegrationTest {
                 null,
                 topic
         );
-        assertTrue("Not enough records produced by source connector. Expected 
at least: " + recordsProduced + " + but got " + records.count(),
-                records.count() >= recordsProduced);
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+                records.count() >= MINIMUM_MESSAGES);
         assertExactlyOnceSeqnos(records, numTasks);
     }
 
@@ -314,7 +321,6 @@ public class ExactlyOnceSourceIntegrationTest {
         connect.kafka().createTopic(topic, 3);
 
         int numTasks = 1;
-        int recordsProduced = 100;
 
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getName());
@@ -325,11 +331,12 @@ public class ExactlyOnceSourceIntegrationTest {
         props.put(NAME_CONFIG, CONNECTOR_NAME);
         props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString());
         props.put(TRANSACTION_BOUNDARY_INTERVAL_CONFIG, "10000");
-        props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+        props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+        props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
 
         // expect all records to be consumed and committed by the connector
-        connectorHandle.expectedRecords(recordsProduced);
-        connectorHandle.expectedCommits(recordsProduced);
+        connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+        connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
@@ -353,8 +360,8 @@ public class ExactlyOnceSourceIntegrationTest {
                 null,
                 topic
         );
-        assertTrue("Not enough records produced by source connector. Expected 
at least: " + recordsProduced + " + but got " + records.count(),
-                records.count() >= recordsProduced);
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+                records.count() >= MINIMUM_MESSAGES);
         assertExactlyOnceSeqnos(records, numTasks);
     }
 
@@ -375,8 +382,6 @@ public class ExactlyOnceSourceIntegrationTest {
         String topic = "test-topic";
         connect.kafka().createTopic(topic, 3);
 
-        int recordsProduced = 100;
-
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getName());
         props.put(TASKS_MAX_CONFIG, "1");
@@ -386,11 +391,12 @@ public class ExactlyOnceSourceIntegrationTest {
         props.put(NAME_CONFIG, CONNECTOR_NAME);
         props.put(TRANSACTION_BOUNDARY_CONFIG, CONNECTOR.toString());
         props.put(CUSTOM_TRANSACTION_BOUNDARIES_CONFIG, 
MonitorableSourceConnector.TRANSACTION_BOUNDARIES_SUPPORTED);
-        props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+        props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+        props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
 
         // expect all records to be consumed and committed by the connector
-        connectorHandle.expectedRecords(recordsProduced);
-        connectorHandle.expectedCommits(recordsProduced);
+        connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+        connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
@@ -412,15 +418,15 @@ public class ExactlyOnceSourceIntegrationTest {
                 null,
                 topic
         );
-        assertTrue("Not enough records produced by source connector. Expected 
at least: " + recordsProduced + " + but got " + sourceRecords.count(),
-                sourceRecords.count() >= recordsProduced);
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
+                sourceRecords.count() >= MINIMUM_MESSAGES);
 
         // also consume from the cluster's offsets topic to verify that the 
expected offsets (which should correspond to the connector's
         // custom transaction boundaries) were committed
         List<Long> expectedOffsetSeqnos = new ArrayList<>();
         long lastExpectedOffsetSeqno = 1;
         long nextExpectedOffsetSeqno = 1;
-        while (nextExpectedOffsetSeqno <= recordsProduced) {
+        while (nextExpectedOffsetSeqno <= MINIMUM_MESSAGES) {
             expectedOffsetSeqnos.add(nextExpectedOffsetSeqno);
             nextExpectedOffsetSeqno += lastExpectedOffsetSeqno;
             lastExpectedOffsetSeqno = nextExpectedOffsetSeqno - 
lastExpectedOffsetSeqno;
@@ -438,7 +444,7 @@ public class ExactlyOnceSourceIntegrationTest {
         assertEquals("Committed offsets should match connector-defined 
transaction boundaries",
                 expectedOffsetSeqnos, actualOffsetSeqnos.subList(0, 
expectedOffsetSeqnos.size()));
 
-        List<Long> expectedRecordSeqnos = LongStream.range(1, recordsProduced 
+ 1).boxed().collect(Collectors.toList());
+        List<Long> expectedRecordSeqnos = LongStream.range(1, MINIMUM_MESSAGES 
+ 1).boxed().collect(Collectors.toList());
         long priorBoundary = 1;
         long nextBoundary = 2;
         while (priorBoundary < 
expectedRecordSeqnos.get(expectedRecordSeqnos.size() - 1)) {
@@ -474,7 +480,6 @@ public class ExactlyOnceSourceIntegrationTest {
         connect.kafka().createTopic(topic, 3);
 
         int numTasks = 1;
-        int recordsProduced = 100;
 
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getName());
@@ -484,11 +489,12 @@ public class ExactlyOnceSourceIntegrationTest {
         props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
         props.put(NAME_CONFIG, CONNECTOR_NAME);
         props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
-        props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+        props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+        props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
 
         // expect all records to be consumed and committed by the connector
-        connectorHandle.expectedRecords(recordsProduced);
-        connectorHandle.expectedCommits(recordsProduced);
+        connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+        connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
         // make sure the worker is actually up (otherwise, it may fence out 
our simulated zombie leader, instead of the other way around)
         assertEquals(404, 
connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus());
@@ -526,8 +532,8 @@ public class ExactlyOnceSourceIntegrationTest {
                 null,
                 topic
         );
-        assertTrue("Not enough records produced by source connector. Expected 
at least: " + recordsProduced + " + but got " + records.count(),
-                records.count() >= recordsProduced);
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+                records.count() >= MINIMUM_MESSAGES);
         assertExactlyOnceSeqnos(records, numTasks);
     }
 
@@ -545,19 +551,18 @@ public class ExactlyOnceSourceIntegrationTest {
         String topic = "test-topic";
         connect.kafka().createTopic(topic, 3);
 
-        int recordsProduced = 100;
-
         Map<String, String> props = new HashMap<>();
         props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getName());
         props.put(TOPIC_CONFIG, topic);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
         props.put(NAME_CONFIG, CONNECTOR_NAME);
-        props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
+        props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+        props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
 
         // expect all records to be consumed and committed by the connector
-        connectorHandle.expectedRecords(recordsProduced);
-        connectorHandle.expectedCommits(recordsProduced);
+        connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+        connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
         StartAndStopLatch connectorStart = connectorAndTaskStart(3);
         props.put(TASKS_MAX_CONFIG, "3");
@@ -590,8 +595,8 @@ public class ExactlyOnceSourceIntegrationTest {
                 null,
                 topic
         );
-        assertTrue("Not enough records produced by source connector. Expected 
at least: " + recordsProduced + " + but got " + records.count(),
-                records.count() >= recordsProduced);
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + MINIMUM_MESSAGES + " + but got " + records.count(),
+                records.count() >= MINIMUM_MESSAGES);
         // We used at most five tasks during the tests; each of them should 
have been able to produce records
         assertExactlyOnceSeqnos(records, 5);
     }
@@ -743,7 +748,6 @@ public class ExactlyOnceSourceIntegrationTest {
             connectorTargetedCluster.createTopic(topic, 3);
 
             int numTasks = 1;
-            int recordsProduced = 100;
 
             Map<String, String> props = new HashMap<>();
             props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getName());
@@ -753,7 +757,8 @@ public class ExactlyOnceSourceIntegrationTest {
             props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
             props.put(NAME_CONFIG, CONNECTOR_NAME);
             props.put(TRANSACTION_BOUNDARY_CONFIG, POLL.toString());
-            props.put(MESSAGES_PER_POLL_CONFIG, 
Integer.toString(recordsProduced));
+            props.put(MESSAGES_PER_POLL_CONFIG, MESSAGES_PER_POLL);
+            props.put(MAX_MESSAGES_PER_SECOND_CONFIG, MESSAGES_PER_SECOND);
             props.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + 
BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
             props.put(CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
             props.put(CONNECTOR_CLIENT_ADMIN_OVERRIDES_PREFIX + 
BOOTSTRAP_SERVERS_CONFIG, connectorTargetedCluster.bootstrapServers());
@@ -761,8 +766,8 @@ public class ExactlyOnceSourceIntegrationTest {
             props.put(OFFSETS_TOPIC_CONFIG, offsetsTopic);
 
             // expect all records to be consumed and committed by the connector
-            connectorHandle.expectedRecords(recordsProduced);
-            connectorHandle.expectedCommits(recordsProduced);
+            connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+            connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
             // start a source connector
             connect.configureConnector(CONNECTOR_NAME, props);
@@ -778,13 +783,13 @@ public class ExactlyOnceSourceIntegrationTest {
             // consume at least the expected number of records from the source 
topic or fail, to ensure that they were correctly produced
             int recordNum = connectorTargetedCluster
                     .consume(
-                            recordsProduced,
+                        MINIMUM_MESSAGES,
                             TimeUnit.MINUTES.toMillis(1),
                             
Collections.singletonMap(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
"read_committed"),
                             "test-topic")
                     .count();
-            assertTrue("Not enough records produced by source connector. 
Expected at least: " + recordsProduced + " + but got " + recordNum,
-                    recordNum >= recordsProduced);
+            assertTrue("Not enough records produced by source connector. 
Expected at least: " + MINIMUM_MESSAGES + " + but got " + recordNum,
+                    recordNum >= MINIMUM_MESSAGES);
 
             // also consume from the connector's dedicated offsets topic
             ConsumerRecords<byte[], byte[]> offsetRecords = 
connectorTargetedCluster
@@ -796,8 +801,8 @@ public class ExactlyOnceSourceIntegrationTest {
                     );
             List<Long> seqnos = 
parseAndAssertOffsetsForSingleTask(offsetRecords);
             seqnos.forEach(seqno ->
-                assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
-                        0, seqno % recordsProduced)
+                assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + MINIMUM_MESSAGES + " records",
+                        0, seqno % MINIMUM_MESSAGES)
             );
 
             // also consume from the cluster's global offsets topic
@@ -810,8 +815,8 @@ public class ExactlyOnceSourceIntegrationTest {
                     );
             seqnos = parseAndAssertOffsetsForSingleTask(offsetRecords);
             seqnos.forEach(seqno ->
-                assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + recordsProduced + " records",
-                        0, seqno % recordsProduced)
+                assertEquals("Offset commits should occur on connector-defined 
poll boundaries, which happen every " + MINIMUM_MESSAGES + " records",
+                        0, seqno % MINIMUM_MESSAGES)
             );
 
             // Shut down the whole cluster
@@ -820,8 +825,8 @@ public class ExactlyOnceSourceIntegrationTest {
             workerProps.put(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG, "disabled");
 
             // Establish new expectations for records+offsets
-            connectorHandle.expectedRecords(recordsProduced);
-            connectorHandle.expectedCommits(recordsProduced);
+            connectorHandle.expectedRecords(MINIMUM_MESSAGES);
+            connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
             // Restart the whole cluster
             for (int i = 0; i < DEFAULT_NUM_WORKERS; i++) {
@@ -856,8 +861,8 @@ public class ExactlyOnceSourceIntegrationTest {
                     null,
                     topic
             );
-            assertTrue("Not enough records produced by source connector. 
Expected at least: " + recordsProduced + " + but got " + sourceRecords.count(),
-                    sourceRecords.count() >= recordsProduced);
+            assertTrue("Not enough records produced by source connector. 
Expected at least: " + MINIMUM_MESSAGES + " + but got " + sourceRecords.count(),
+                    sourceRecords.count() >= MINIMUM_MESSAGES);
             // also have to check which offsets have actually been committed, 
since we no longer have exactly-once semantics
             offsetRecords = connectorTargetedCluster.consumeAll(
                     CONSUME_RECORDS_TIMEOUT_MS,
@@ -896,8 +901,6 @@ public class ExactlyOnceSourceIntegrationTest {
         String topic = "test-topic";
         connect.kafka().createTopic(topic, 3);
 
-        int recordsProduced = 100;
-
         Map<String, String> props = new HashMap<>();
         // See below; this connector does nothing except request offsets from 
the worker in SourceTask::poll
         // and then return a single record targeted at its offsets topic
@@ -905,7 +908,6 @@ public class ExactlyOnceSourceIntegrationTest {
         props.put(TASKS_MAX_CONFIG, "1");
         props.put(NAME_CONFIG, CONNECTOR_NAME);
         props.put(TRANSACTION_BOUNDARY_CONFIG, INTERVAL.toString());
-        props.put(MESSAGES_PER_POLL_CONFIG, Integer.toString(recordsProduced));
         props.put(OFFSETS_TOPIC_CONFIG, "whoops");
 
         // start a source connector
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 33ba1588a7d..102b10d7fb7 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -52,6 +52,7 @@ public class MonitorableSourceConnector extends 
SampleSourceConnector {
 
     public static final String TOPIC_CONFIG = "topic";
     public static final String MESSAGES_PER_POLL_CONFIG = "messages.per.poll";
+    public static final String MAX_MESSAGES_PER_SECOND_CONFIG = "throughput";
 
     public static final String CUSTOM_EXACTLY_ONCE_SUPPORT_CONFIG = 
"custom.exactly.once.support";
     public static final String EXACTLY_ONCE_SUPPORTED = "supported";
@@ -177,7 +178,7 @@ public class MonitorableSourceConnector extends 
SampleSourceConnector {
             startingSeqno = Optional.ofNullable((Long) 
offset.get("saved")).orElse(0L);
             seqno = startingSeqno;
             log.info("Started {} task {} with properties {}", 
this.getClass().getSimpleName(), taskId, props);
-            throttler = new 
ThroughputThrottler(Long.parseLong(props.getOrDefault("throughput", "-1")), 
System.currentTimeMillis());
+            throttler = new 
ThroughputThrottler(Long.parseLong(props.getOrDefault(MAX_MESSAGES_PER_SECOND_CONFIG,
 "-1")), System.currentTimeMillis());
             taskHandle.recordTaskStart();
             priorTransactionBoundary = 0;
             nextTransactionBoundary = 1;

Reply via email to