This is an automated email from the ASF dual-hosted git repository. cadonna pushed a commit to branch kip1071 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 1e8284b71d2c550a202f42a5455ffed62a99bbb7 Author: Bill <[email protected]> AuthorDate: Tue Nov 26 14:15:52 2024 -0500 Fixes from rebase --- .../kafka/api/IntegrationTestHarness.scala | 4 -- .../scala/unit/kafka/server/KafkaApisTest.scala | 1 - .../KafkaStreamsTelemetryIntegrationTest.java | 3 +- .../SmokeTestDriverIntegrationTest.java | 67 ++++++++-------------- 4 files changed, 27 insertions(+), 48 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index a66752c30f3..b7efed1d495 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -74,10 +74,6 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,share")) cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")) } - if (isStreamsGroupTest()) { - cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams")) - cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")) - } if(isKRaftTest()) { cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, TestUtils.tempDir().getAbsolutePath)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index d3edc955211..39a77f7de52 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -77,7 +77,6 @@ import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG} import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, GroupCoordinatorConfig} -import org.apache.kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorConfig} import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorConfigTest} import org.apache.kafka.coordinator.transaction.TransactionLogConfig diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java index fc90c621435..1de8fa21307 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java @@ -432,7 +432,8 @@ public class KafkaStreamsTelemetryIntegrationTest { } @Override - public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(Map<String, Object> config, StreamsAssignmentInterface assignmentInterface) { + public Consumer<byte[], byte[]> getStreamsRebalanceProtocolConsumer(final Map<String, Object> config, + final StreamsAssignmentInterface assignmentInterface) { return null; } diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java index 4ed0e946426..88da4f1aa0c 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java @@ -16,21 +16,26 @@ */ package org.apache.kafka.streams.integration; - - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.streams.GroupProtocol; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig.InternalConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.tests.SmokeTestClient; import org.apache.kafka.streams.tests.SmokeTestDriver; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Locale; @@ -45,12 +50,25 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") public class SmokeTestDriverIntegrationTest { + + + public static EmbeddedKafkaCluster cluster; + + @BeforeAll + public static void startCluster() throws IOException { + final Properties props = new Properties(); + props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams"); + props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true"); + cluster = new EmbeddedKafkaCluster(3, props); + cluster.start(); + } - @Override - public int brokerCount() { - return 3; + @AfterAll + public static void closeCluster() { + cluster.stop(); } + private static class Driver extends Thread { private final String bootstrapServers; private final int numKeys; @@ -86,21 +104,6 @@ public class SmokeTestDriverIntegrationTest { } - @Override - public boolean isStreamsGroupTest() { - return true; - } - - @Override - public boolean isZkMigrationTest() { - return false; - } - - @Override - public boolean isKRaftTest() { - return true; - } - // In this test, we try to keep creating new stream, and closing the old one, to maintain only 3 streams alive. // During the new stream added and old stream left, the stream process should still complete without issue. // We set 2 timeout condition to fail the test before passing the verification: @@ -122,27 +125,9 @@ public class SmokeTestDriverIntegrationTest { int numClientsCreated = 0; final ArrayList<SmokeTestClient> clients = new ArrayList<>(); - for (final String topic: SmokeTestDriver.topics()) { - deleteTopic(topic, listenerName()); - } - - for (final String topic: new String[]{ - "data", - "echo", - "max", - "min", "min-suppressed", "min-raw", - "dif", - "sum", - "sws-raw", "sws-suppressed", - "cnt", - "avg", - "tagg", - "fk" - }) { - createTopic(topic, 3, 1, new Properties(), listenerName(), new Properties()); - } + IntegrationTestUtils.cleanStateBeforeTest(cluster, SmokeTestDriver.topics()); - final String bootstrapServers = bootstrapServers(listenerName()); + final String bootstrapServers = cluster.bootstrapServers(); final Driver driver = new Driver(bootstrapServers, 10, 1000); driver.start(); System.out.println("started driver"); @@ -202,7 +187,5 @@ public class SmokeTestDriverIntegrationTest { throw new AssertionError(driver.exception()); } assertTrue(driver.result().passed(), driver.result().result()); - - } }
