This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4dd434f5e1 KAFKA-17692 Remove KafkaServer references from 
IntegrationTestUtils (#17372)
b4dd434f5e1 is described below

commit b4dd434f5e138f0ea2fc8898d79ba891adfd9029
Author: Mickael Maison <[email protected]>
AuthorDate: Sun Oct 6 21:27:33 2024 +0200

    KAFKA-17692 Remove KafkaServer references from IntegrationTestUtils (#17372)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../integration/utils/IntegrationTestUtils.java    | 110 ---------------------
 1 file changed, 110 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 536e43b715a..18b4fb21683 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -16,9 +16,6 @@
  */
 package org.apache.kafka.streams.integration.utils;
 
-import kafka.server.KafkaServer;
-import kafka.server.MetadataCache;
-
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.ConsumerGroupDescription;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -35,8 +32,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Headers;
-import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
-import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
@@ -53,7 +48,6 @@ import 
org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
-import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
 import 
org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
 import org.apache.kafka.streams.query.FailureReason;
 import org.apache.kafka.streams.query.QueryResult;
@@ -91,19 +85,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-import scala.Option;
-
 import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.sleep;
 import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
@@ -887,21 +877,6 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
-    /**
-     * Wait until enough data (value records) has been consumed.
-     *
-     * @param consumerConfig     Kafka Consumer configuration
-     * @param topic              Topic to consume from
-     * @param expectedNumRecords Minimum number of expected records
-     * @return All the records consumed, or null if no records are consumed
-     * @throws AssertionError    if the given wait time elapses
-     */
-    public static <V> List<V> waitUntilMinValuesRecordsReceived(final 
Properties consumerConfig,
-                                                                final String 
topic,
-                                                                final int 
expectedNumRecords) throws Exception {
-        return waitUntilMinValuesRecordsReceived(consumerConfig, topic, 
expectedNumRecords, DEFAULT_TIMEOUT);
-    }
-
     /**
      * Wait until enough data (value records) has been consumed.
      *
@@ -934,53 +909,6 @@ public class IntegrationTestUtils {
         return accumData;
     }
 
-    @SuppressWarnings("WeakerAccess")
-    public static void waitForTopicPartitions(final List<KafkaServer> servers,
-                                              final List<TopicPartition> 
partitions,
-                                              final long timeout) throws 
InterruptedException {
-        final long end = System.currentTimeMillis() + timeout;
-        for (final TopicPartition partition : partitions) {
-            final long remaining = end - System.currentTimeMillis();
-            if (remaining <= 0) {
-                throw new AssertionError("timed out while waiting for 
partitions to become available. Timeout=" + timeout);
-            }
-            waitUntilMetadataIsPropagated(servers, partition.topic(), 
partition.partition(), remaining);
-        }
-    }
-
-    private static void waitUntilMetadataIsPropagated(final List<KafkaServer> 
servers,
-                                                     final String topic,
-                                                     final int partition,
-                                                     final long timeout) 
throws InterruptedException {
-        final String baseReason = String.format("Metadata for topic=%s 
partition=%d was not propagated to all brokers within %d ms. ",
-            topic, partition, timeout);
-
-        retryOnExceptionWithTimeout(timeout, () -> {
-            final List<KafkaServer> emptyPartitionInfos = new ArrayList<>();
-            final List<KafkaServer> invalidBrokerIds = new ArrayList<>();
-
-            for (final KafkaServer server : servers) {
-                final MetadataCache metadataCache = 
server.dataPlaneRequestProcessor().metadataCache();
-                final Option<UpdateMetadataPartitionState> partitionInfo =
-                    metadataCache.getPartitionInfo(topic, partition);
-
-                if (partitionInfo.isEmpty()) {
-                    emptyPartitionInfos.add(server);
-                    continue;
-                }
-
-                final UpdateMetadataPartitionState metadataPartitionState = 
partitionInfo.get();
-                if 
(!FetchRequest.isValidBrokerId(metadataPartitionState.leader())) {
-                    invalidBrokerIds.add(server);
-                }
-            }
-
-            final String reason = baseReason + ". Brokers without partition 
info: " + emptyPartitionInfos +
-                ". Brokers with invalid broker id for partition leader: " + 
invalidBrokerIds;
-            assertThat(reason, emptyPartitionInfos.isEmpty() && 
invalidBrokerIds.isEmpty());
-        });
-    }
-
     public static void startApplicationAndWaitUntilRunning(final KafkaStreams 
streams) throws Exception {
         startApplicationAndWaitUntilRunning(singletonList(streams));
     }
@@ -1452,44 +1380,6 @@ public class IntegrationTestUtils {
         });
     }
 
-    public static class StableAssignmentListener implements AssignmentListener 
{
-        final AtomicInteger numStableAssignments = new AtomicInteger(0);
-        int nextExpectedNumStableAssignments;
-
-        @Override
-        public void onAssignmentComplete(final boolean stable) {
-            if (stable) {
-                numStableAssignments.incrementAndGet();
-            }
-        }
-
-        public int numStableAssignments() {
-            return numStableAssignments.get();
-        }
-
-        /**
-         * Saves the current number of stable rebalances so that we can tell 
when the next stable assignment has been
-         * reached. This should be called once for every invocation of {@link 
#waitForNextStableAssignment(long)},
-         * before the rebalance-triggering event.
-         */
-        public void prepareForRebalance() {
-            nextExpectedNumStableAssignments = numStableAssignments.get() + 1;
-        }
-
-        /**
-         * Waits for the assignment to stabilize after the group rebalances. 
You must call {@link #prepareForRebalance()}
-         * prior to the rebalance-triggering event before using this method to 
wait.
-         */
-        public void waitForNextStableAssignment(final long maxWaitMs) throws 
InterruptedException {
-            waitForCondition(
-                () -> numStableAssignments() >= 
nextExpectedNumStableAssignments,
-                maxWaitMs,
-                () -> "Client did not reach " + 
nextExpectedNumStableAssignments + " stable assignments on time, " +
-                    "numStableAssignments was " + numStableAssignments()
-            );
-        }
-    }
-
     /**
      * Tracks the offsets and number of restored records on a per-partition 
basis.
      * Currently assumes only one store in the topology; you will need to 
update this

Reply via email to