This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3f953483cb KAFKA-17857 Move AbstractResetIntegrationTest and 
subclasses to tools (#17594)
e3f953483cb is described below

commit e3f953483cb480631bf041698770b47ddb82796f
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Nov 5 04:16:19 2024 +0800

    KAFKA-17857 Move AbstractResetIntegrationTest and subclasses to tools 
(#17594)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 build.gradle                                             |  4 ++++
 checkstyle/import-control.xml                            |  1 +
 .../kafka/tools}/AbstractResetIntegrationTest.java       | 16 +++++++---------
 .../org/apache/kafka/tools}/ResetIntegrationTest.java    | 16 +++++++---------
 .../apache/kafka/tools}/ResetIntegrationWithSslTest.java |  2 +-
 5 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/build.gradle b/build.gradle
index aff84c949ce..2a40ab4b9b1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2405,6 +2405,9 @@ project(':tools') {
     testImplementation project(':connect:runtime').sourceSets.test.output
     testImplementation project(':storage:storage-api').sourceSets.main.output
     testImplementation project(':storage').sourceSets.test.output
+    testImplementation project(':streams')
+    testImplementation project(':streams').sourceSets.test.output
+    testImplementation 
project(':streams:integration-tests').sourceSets.test.output
     testImplementation project(':test-common')
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
@@ -2422,6 +2425,7 @@ project(':tools') {
     testImplementation libs.apachedsLdifPartition
 
     testRuntimeOnly libs.junitPlatformLanucher
+    testRuntimeOnly libs.hamcrest
     testRuntimeOnly project(':test-common')
   }
 
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 77d668c6994..cb632c3daaa 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -295,6 +295,7 @@
     <allow pkg="org.apache.kafka.server.log.remote.metadata.storage" />
     <allow pkg="org.apache.kafka.server.log.remote.storage" />
     <allow pkg="org.apache.kafka.server.quota" />
+    <allow pkg="org.apache.kafka.streams" />
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.clients.admin" />
     <allow pkg="org.apache.kafka.clients.producer" />
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
similarity index 97%
rename from 
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
rename to 
tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
index f4e33e36ce8..9d0f030b5b1 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
@@ -41,7 +41,6 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.tools.StreamsResetter;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
@@ -61,8 +60,7 @@ import java.util.stream.Collectors;
 import static java.time.Duration.ofMillis;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(600)
@@ -247,7 +245,7 @@ public abstract class AbstractResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
-        assertThat(resultRerun, equalTo(result));
+        assertEquals(result, resultRerun);
 
         waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
         cleanGlobal(false, null, null, appID);
@@ -307,17 +305,17 @@ public abstract class AbstractResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun2 = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC_2_RERUN, 40);
         streams.close();
 
-        assertThat(resultRerun, equalTo(result));
-        assertThat(resultRerun2, equalTo(result2));
+        assertEquals(result, resultRerun);
+        assertEquals(result2, resultRerun2);
 
         if (!useRepartitioned) {
             final Properties props = 
TestUtils.consumerConfig(cluster.bootstrapServers(), appID + 
"-result-consumer", LongDeserializer.class, StringDeserializer.class, 
commonClientConfig);
             final List<KeyValue<Long, String>> resultIntermediate = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props, 
INTERMEDIATE_USER_TOPIC, 21);
 
             for (int i = 0; i < 10; i++) {
-                assertThat(resultIntermediate.get(i), 
equalTo(resultIntermediate.get(i + 11)));
+                assertEquals(resultIntermediate.get(i + 11), 
resultIntermediate.get(i));
             }
-            assertThat(resultIntermediate.get(10), equalTo(badMessage));
+            assertEquals(badMessage, resultIntermediate.get(10));
         }
 
         waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
similarity index 96%
rename from 
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
rename to tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
index a197de03ec6..fc2e8ffcfdf 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.network.SocketServerConfigs;
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.tools.StreamsResetter;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -48,10 +47,9 @@ import java.util.Properties;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests local state store and global application cleanup.
@@ -210,7 +208,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
         // Reset will success with --force, it will force delete active 
members on broker side
         cleanGlobal(false, "--force", null, appID);
-        assertThat("Group is not empty after cleanGlobal", 
isEmptyConsumerGroup(adminClient, appID));
+        assertTrue(isEmptyConsumerGroup(adminClient, appID), "Group is not 
empty after cleanGlobal");
 
         assertInternalTopicsGotDeleted(null);
 
@@ -219,7 +217,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
-        assertThat(resultRerun, equalTo(result));
+        assertEquals(result, resultRerun);
         cleanGlobal(false, "--force", null, appID);
     }
 
@@ -259,7 +257,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         streams.close();
 
         result.remove(0);
-        assertThat(resultRerun, equalTo(result));
+        assertEquals(result, resultRerun);
 
         waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
         cleanGlobal(false, null, null, appID);
@@ -306,7 +304,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
-        assertThat(resultRerun, equalTo(result));
+        assertEquals(result, resultRerun);
 
         waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
         cleanGlobal(false, null, null, appID);
@@ -348,7 +346,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
         final List<KeyValue<Long, Long>> resultRerun = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
         streams.close();
 
-        assertThat(resultRerun, equalTo(result));
+        assertEquals(result, resultRerun);
 
         waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER * 
STREAMS_CONSUMER_TIMEOUT);
         cleanGlobal(false, null, null, appID);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
 b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationWithSslTest.java
similarity index 98%
rename from 
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
rename to 
tools/src/test/java/org/apache/kafka/tools/ResetIntegrationWithSslTest.java
index 7f47233361e..9a072877eda 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationWithSslTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.tools;
 
 import org.apache.kafka.common.network.ConnectionMode;
 import org.apache.kafka.network.SocketServerConfigs;

Reply via email to