This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4dd434f5e1 KAFKA-17692 Remove KafkaServer references from
IntegrationTestUtils (#17372)
b4dd434f5e1 is described below
commit b4dd434f5e138f0ea2fc8898d79ba891adfd9029
Author: Mickael Maison <[email protected]>
AuthorDate: Sun Oct 6 21:27:33 2024 +0200
KAFKA-17692 Remove KafkaServer references from IntegrationTestUtils (#17372)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../integration/utils/IntegrationTestUtils.java | 110 ---------------------
1 file changed, 110 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 536e43b715a..18b4fb21683 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,9 +16,6 @@
*/
package org.apache.kafka.streams.integration.utils;
-import kafka.server.KafkaServer;
-import kafka.server.MetadataCache;
-
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.consumer.Consumer;
@@ -35,8 +32,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
-import
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
-import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
@@ -53,7 +48,6 @@ import
org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
-import
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.QueryResult;
@@ -91,19 +85,15 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-import scala.Option;
-
import static java.util.Collections.singletonList;
import static org.apache.kafka.common.utils.Utils.sleep;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
@@ -887,21 +877,6 @@ public class IntegrationTestUtils {
return accumData;
}
- /**
- * Wait until enough data (value records) has been consumed.
- *
- * @param consumerConfig Kafka Consumer configuration
- * @param topic Topic to consume from
- * @param expectedNumRecords Minimum number of expected records
- * @return All the records consumed, or null if no records are consumed
- * @throws AssertionError if the given wait time elapses
- */
- public static <V> List<V> waitUntilMinValuesRecordsReceived(final
Properties consumerConfig,
- final String
topic,
- final int
expectedNumRecords) throws Exception {
- return waitUntilMinValuesRecordsReceived(consumerConfig, topic,
expectedNumRecords, DEFAULT_TIMEOUT);
- }
-
/**
* Wait until enough data (value records) has been consumed.
*
@@ -934,53 +909,6 @@ public class IntegrationTestUtils {
return accumData;
}
- @SuppressWarnings("WeakerAccess")
- public static void waitForTopicPartitions(final List<KafkaServer> servers,
- final List<TopicPartition>
partitions,
- final long timeout) throws
InterruptedException {
- final long end = System.currentTimeMillis() + timeout;
- for (final TopicPartition partition : partitions) {
- final long remaining = end - System.currentTimeMillis();
- if (remaining <= 0) {
- throw new AssertionError("timed out while waiting for
partitions to become available. Timeout=" + timeout);
- }
- waitUntilMetadataIsPropagated(servers, partition.topic(),
partition.partition(), remaining);
- }
- }
-
- private static void waitUntilMetadataIsPropagated(final List<KafkaServer>
servers,
- final String topic,
- final int partition,
- final long timeout)
throws InterruptedException {
- final String baseReason = String.format("Metadata for topic=%s
partition=%d was not propagated to all brokers within %d ms. ",
- topic, partition, timeout);
-
- retryOnExceptionWithTimeout(timeout, () -> {
- final List<KafkaServer> emptyPartitionInfos = new ArrayList<>();
- final List<KafkaServer> invalidBrokerIds = new ArrayList<>();
-
- for (final KafkaServer server : servers) {
- final MetadataCache metadataCache =
server.dataPlaneRequestProcessor().metadataCache();
- final Option<UpdateMetadataPartitionState> partitionInfo =
- metadataCache.getPartitionInfo(topic, partition);
-
- if (partitionInfo.isEmpty()) {
- emptyPartitionInfos.add(server);
- continue;
- }
-
- final UpdateMetadataPartitionState metadataPartitionState =
partitionInfo.get();
- if
(!FetchRequest.isValidBrokerId(metadataPartitionState.leader())) {
- invalidBrokerIds.add(server);
- }
- }
-
- final String reason = baseReason + ". Brokers without partition
info: " + emptyPartitionInfos +
- ". Brokers with invalid broker id for partition leader: " +
invalidBrokerIds;
- assertThat(reason, emptyPartitionInfos.isEmpty() &&
invalidBrokerIds.isEmpty());
- });
- }
-
public static void startApplicationAndWaitUntilRunning(final KafkaStreams
streams) throws Exception {
startApplicationAndWaitUntilRunning(singletonList(streams));
}
@@ -1452,44 +1380,6 @@ public class IntegrationTestUtils {
});
}
- public static class StableAssignmentListener implements AssignmentListener
{
- final AtomicInteger numStableAssignments = new AtomicInteger(0);
- int nextExpectedNumStableAssignments;
-
- @Override
- public void onAssignmentComplete(final boolean stable) {
- if (stable) {
- numStableAssignments.incrementAndGet();
- }
- }
-
- public int numStableAssignments() {
- return numStableAssignments.get();
- }
-
- /**
- * Saves the current number of stable rebalances so that we can tell
when the next stable assignment has been
- * reached. This should be called once for every invocation of {@link
#waitForNextStableAssignment(long)},
- * before the rebalance-triggering event.
- */
- public void prepareForRebalance() {
- nextExpectedNumStableAssignments = numStableAssignments.get() + 1;
- }
-
- /**
- * Waits for the assignment to stabilize after the group rebalances.
You must call {@link #prepareForRebalance()}
- * prior to the rebalance-triggering event before using this method to
wait.
- */
- public void waitForNextStableAssignment(final long maxWaitMs) throws
InterruptedException {
- waitForCondition(
- () -> numStableAssignments() >=
nextExpectedNumStableAssignments,
- maxWaitMs,
- () -> "Client did not reach " +
nextExpectedNumStableAssignments + " stable assignments on time, " +
- "numStableAssignments was " + numStableAssignments()
- );
- }
- }
-
/**
* Tracks the offsets and number of restored records on a per-partition
basis.
* Currently assumes only one store in the topology; you will need to
update this