This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1e8284b71d2c550a202f42a5455ffed62a99bbb7
Author: Bill <[email protected]>
AuthorDate: Tue Nov 26 14:15:52 2024 -0500

    Fixes from rebase
---
 .../kafka/api/IntegrationTestHarness.scala         |  4 --
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  1 -
 .../KafkaStreamsTelemetryIntegrationTest.java      |  3 +-
 .../SmokeTestDriverIntegrationTest.java            | 67 ++++++++--------------
 4 files changed, 27 insertions(+), 48 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index a66752c30f3..b7efed1d495 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -74,10 +74,6 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
       
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,share"))
       
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true"))
     }
-    if (isStreamsGroupTest()) {
-      
cfgs.foreach(_.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams"))
-      
cfgs.foreach(_.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true"))
-    }
 
     if(isKRaftTest()) {
       cfgs.foreach(_.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, 
TestUtils.tempDir().getAbsolutePath))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d3edc955211..39a77f7de52 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -77,7 +77,6 @@ import 
org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAn
 import 
org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
 CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, 
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, 
SHARE_SESSION_TIMEOUT_MS_CONFIG}
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinator, 
GroupCoordinatorConfig}
-import org.apache.kafka.coordinator.group.{GroupCoordinator, 
GroupCoordinatorConfig}
 import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
 import org.apache.kafka.coordinator.share.{ShareCoordinator, 
ShareCoordinatorConfigTest}
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index fc90c621435..1de8fa21307 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -432,7 +432,8 @@ public class KafkaStreamsTelemetryIntegrationTest {
         }
 
         @Override
-        public Consumer<byte[], byte[]> 
getStreamsRebalanceProtocolConsumer(Map<String, Object> config, 
StreamsAssignmentInterface assignmentInterface) {
+        public Consumer<byte[], byte[]> 
getStreamsRebalanceProtocolConsumer(final Map<String, Object> config,
+                                                                            
final StreamsAssignmentInterface assignmentInterface) {
             return null;
         }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 4ed0e946426..88da4f1aa0c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -16,21 +16,26 @@
  */
 package org.apache.kafka.streams.integration;
 
-
-
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.server.config.ServerConfigs;
 import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.tests.SmokeTestClient;
 import org.apache.kafka.streams.tests.SmokeTestDriver;
 
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Locale;
@@ -45,12 +50,25 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(600)
 @Tag("integration")
 public class SmokeTestDriverIntegrationTest {
+    
+    
+    public static EmbeddedKafkaCluster cluster;
+
+    @BeforeAll
+    public static void startCluster() throws IOException {
+        final Properties props = new Properties();
+        
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG,
 "classic,consumer,streams");
+        props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, 
"true");
+        cluster = new EmbeddedKafkaCluster(3, props);
+        cluster.start();
+    }
 
-    @Override
-    public int brokerCount() {
-        return 3;
+    @AfterAll
+    public static void closeCluster() {
+        cluster.stop();
     }
 
+
     private static class Driver extends Thread {
         private final String bootstrapServers;
         private final int numKeys;
@@ -86,21 +104,6 @@ public class SmokeTestDriverIntegrationTest {
 
     }
 
-    @Override
-    public boolean isStreamsGroupTest() {
-        return true;
-    }
-
-    @Override
-    public boolean isZkMigrationTest() {
-        return false;
-    }
-
-    @Override
-    public boolean isKRaftTest() {
-        return true;
-    }
-
     // In this test, we try to keep creating new stream, and closing the old 
one, to maintain only 3 streams alive.
     // During the new stream added and old stream left, the stream process 
should still complete without issue.
     // We set 2 timeout condition to fail the test before passing the 
verification:
@@ -122,27 +125,9 @@ public class SmokeTestDriverIntegrationTest {
         int numClientsCreated = 0;
         final ArrayList<SmokeTestClient> clients = new ArrayList<>();
 
-        for (final String topic: SmokeTestDriver.topics()) {
-            deleteTopic(topic, listenerName());
-        }
-
-        for (final String topic: new String[]{
-            "data",
-            "echo",
-            "max",
-            "min", "min-suppressed", "min-raw",
-            "dif",
-            "sum",
-            "sws-raw", "sws-suppressed",
-            "cnt",
-            "avg",
-            "tagg",
-            "fk"
-        }) {
-            createTopic(topic, 3, 1, new Properties(), listenerName(), new 
Properties());
-        }
+        IntegrationTestUtils.cleanStateBeforeTest(cluster, 
SmokeTestDriver.topics());
 
-        final String bootstrapServers = bootstrapServers(listenerName());
+        final String bootstrapServers = cluster.bootstrapServers();
         final Driver driver = new Driver(bootstrapServers, 10, 1000);
         driver.start();
         System.out.println("started driver");
@@ -202,7 +187,5 @@ public class SmokeTestDriverIntegrationTest {
             throw new AssertionError(driver.exception());
         }
         assertTrue(driver.result().passed(), driver.result().result());
-
-
     }
 }

Reply via email to