This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e3f953483cb KAFKA-17857 Move AbstractResetIntegrationTest and
subclasses to tools (#17594)
e3f953483cb is described below
commit e3f953483cb480631bf041698770b47ddb82796f
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Nov 5 04:16:19 2024 +0800
KAFKA-17857 Move AbstractResetIntegrationTest and subclasses to tools
(#17594)
Reviewers: Chia-Ping Tsai <[email protected]>
---
build.gradle | 4 ++++
checkstyle/import-control.xml | 1 +
.../kafka/tools}/AbstractResetIntegrationTest.java | 16 +++++++---------
.../org/apache/kafka/tools}/ResetIntegrationTest.java | 16 +++++++---------
.../apache/kafka/tools}/ResetIntegrationWithSslTest.java | 2 +-
5 files changed, 20 insertions(+), 19 deletions(-)
diff --git a/build.gradle b/build.gradle
index aff84c949ce..2a40ab4b9b1 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2405,6 +2405,9 @@ project(':tools') {
testImplementation project(':connect:runtime').sourceSets.test.output
testImplementation project(':storage:storage-api').sourceSets.main.output
testImplementation project(':storage').sourceSets.test.output
+ testImplementation project(':streams')
+ testImplementation project(':streams').sourceSets.test.output
+ testImplementation
project(':streams:integration-tests').sourceSets.test.output
testImplementation project(':test-common')
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
@@ -2422,6 +2425,7 @@ project(':tools') {
testImplementation libs.apachedsLdifPartition
testRuntimeOnly libs.junitPlatformLanucher
+ testRuntimeOnly libs.hamcrest
testRuntimeOnly project(':test-common')
}
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 77d668c6994..cb632c3daaa 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -295,6 +295,7 @@
<allow pkg="org.apache.kafka.server.log.remote.metadata.storage" />
<allow pkg="org.apache.kafka.server.log.remote.storage" />
<allow pkg="org.apache.kafka.server.quota" />
+ <allow pkg="org.apache.kafka.streams" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.clients.admin" />
<allow pkg="org.apache.kafka.clients.producer" />
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
similarity index 97%
rename from
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
rename to
tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
index f4e33e36ce8..9d0f030b5b1 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/AbstractResetIntegrationTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.tools;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
@@ -41,7 +41,6 @@ import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.tools.StreamsResetter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -61,8 +60,7 @@ import java.util.stream.Collectors;
import static java.time.Duration.ofMillis;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@@ -247,7 +245,7 @@ public abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
- assertThat(resultRerun, equalTo(result));
+ assertEquals(result, resultRerun);
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER *
STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
@@ -307,17 +305,17 @@ public abstract class AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun2 =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC_2_RERUN, 40);
streams.close();
- assertThat(resultRerun, equalTo(result));
- assertThat(resultRerun2, equalTo(result2));
+ assertEquals(result, resultRerun);
+ assertEquals(result2, resultRerun2);
if (!useRepartitioned) {
final Properties props =
TestUtils.consumerConfig(cluster.bootstrapServers(), appID +
"-result-consumer", LongDeserializer.class, StringDeserializer.class,
commonClientConfig);
final List<KeyValue<Long, String>> resultIntermediate =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(props,
INTERMEDIATE_USER_TOPIC, 21);
for (int i = 0; i < 10; i++) {
- assertThat(resultIntermediate.get(i),
equalTo(resultIntermediate.get(i + 11)));
+ assertEquals(resultIntermediate.get(i + 11),
resultIntermediate.get(i));
}
- assertThat(resultIntermediate.get(10), equalTo(badMessage));
+ assertEquals(badMessage, resultIntermediate.get(10));
}
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER *
STREAMS_CONSUMER_TIMEOUT);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
similarity index 96%
rename from
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
rename to tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
index a197de03ec6..fc2e8ffcfdf 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.tools;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.network.SocketServerConfigs;
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.tools.StreamsResetter;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -48,10 +47,9 @@ import java.util.Properties;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.isEmptyConsumerGroup;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests local state store and global application cleanup.
@@ -210,7 +208,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// Reset will success with --force, it will force delete active
members on broker side
cleanGlobal(false, "--force", null, appID);
- assertThat("Group is not empty after cleanGlobal",
isEmptyConsumerGroup(adminClient, appID));
+ assertTrue(isEmptyConsumerGroup(adminClient, appID), "Group is not
empty after cleanGlobal");
assertInternalTopicsGotDeleted(null);
@@ -219,7 +217,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
- assertThat(resultRerun, equalTo(result));
+ assertEquals(result, resultRerun);
cleanGlobal(false, "--force", null, appID);
}
@@ -259,7 +257,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
streams.close();
result.remove(0);
- assertThat(resultRerun, equalTo(result));
+ assertEquals(result, resultRerun);
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER *
STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
@@ -306,7 +304,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
- assertThat(resultRerun, equalTo(result));
+ assertEquals(result, resultRerun);
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER *
STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
@@ -348,7 +346,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
- assertThat(resultRerun, equalTo(result));
+ assertEquals(result, resultRerun);
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER *
STREAMS_CONSUMER_TIMEOUT);
cleanGlobal(false, null, null, appID);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationWithSslTest.java
similarity index 98%
rename from
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
rename to
tools/src/test/java/org/apache/kafka/tools/ResetIntegrationWithSslTest.java
index 7f47233361e..9a072877eda 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ResetIntegrationWithSslTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.streams.integration;
+package org.apache.kafka.tools;
import org.apache.kafka.common.network.ConnectionMode;
import org.apache.kafka.network.SocketServerConfigs;