mjsax commented on code in PR #14108: URL: https://github.com/apache/kafka/pull/14108#discussion_r1282569306
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java: ########## @@ -715,23 +1087,49 @@ private Map<UUID, Map<String, Optional<String>>> getRandomProcessRacks(final int return processRacks; } - private SortedMap<TaskId, Set<TopicPartition>> getTaskTopicPartitionMap(final int tpSize) { + private SortedMap<TaskId, Set<TopicPartition>> getTaskTopicPartitionMap(final int tpSize, final boolean changelog) { final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = new TreeMap<>(); + final String topicName = changelog ? CHANGELOG_TOPIC_PREFIX : TOPIC_PREFIX; for (int i = 0; i < tpSize; i++) { - taskTopicPartitionMap.put(new TaskId(i, 0), mkSet(new TopicPartition("topic" + i, 0))); + taskTopicPartitionMap.put(new TaskId(i, 0), mkSet( + new TopicPartition(topicName + i, 0), + new TopicPartition(topicName + (i + 1) % tpSize, 0) + )); } return taskTopicPartitionMap; } - private SortedMap<UUID, ClientState> getRandomClientState(final int clientSize, final int tpSize) { + private InternalTopicManager mockInternalTopicManagerForRandomChangelog(final int nodeSize, final int tpSize) { + final Set<String> changelogNames = new HashSet<>(); + final List<Node> nodeList = getRandomNodes(nodeSize); + final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>(); + for (int i = 0; i < tpSize; i++) { + final String topicName = CHANGELOG_TOPIC_PREFIX + i; + changelogNames.add(topicName); + + final Node firstNode = nodeList.get(i % nodeSize); + final Node secondNode = nodeList.get((i + 1) % nodeSize); + final TopicPartitionInfo info = new TopicPartitionInfo(0, firstNode, Arrays.asList(firstNode, secondNode), Collections.emptyList()); + + topicPartitionInfo.computeIfAbsent(topicName, tp -> new ArrayList<>()).add(info); + } + + final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); + doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames); + return spyTopicManager; + } + + private SortedMap<UUID, ClientState> getRandomClientState(final int clientSize, final int tpSize, final int maxCapacity) { final SortedMap<UUID, ClientState> clientStates = new TreeMap<>(); final List<TaskId> taskIds = new ArrayList<>(tpSize); for (int i = 0; i < tpSize; i++) { taskIds.add(new TaskId(i, 0)); } Collections.shuffle(taskIds); + final Random random = new Random(); Review Comment: We should generate our own seed and print/log the seed -- in case a test fails, we can use the seed to reproduce the issue. ``` final long seed = System.currentTimeMillis(); log.debug("seed: " + seed); final Random random = new Random(seed); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java: ########## @@ -715,23 +1087,49 @@ private Map<UUID, Map<String, Optional<String>>> getRandomProcessRacks(final int return processRacks; } - private SortedMap<TaskId, Set<TopicPartition>> getTaskTopicPartitionMap(final int tpSize) { + private SortedMap<TaskId, Set<TopicPartition>> getTaskTopicPartitionMap(final int tpSize, final boolean changelog) { final SortedMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = new TreeMap<>(); + final String topicName = changelog ? CHANGELOG_TOPIC_PREFIX : TOPIC_PREFIX; for (int i = 0; i < tpSize; i++) { - taskTopicPartitionMap.put(new TaskId(i, 0), mkSet(new TopicPartition("topic" + i, 0))); + taskTopicPartitionMap.put(new TaskId(i, 0), mkSet( + new TopicPartition(topicName + i, 0), + new TopicPartition(topicName + (i + 1) % tpSize, 0) + )); } return taskTopicPartitionMap; } - private SortedMap<UUID, ClientState> getRandomClientState(final int clientSize, final int tpSize) { + private InternalTopicManager mockInternalTopicManagerForRandomChangelog(final int nodeSize, final int tpSize) { + final Set<String> changelogNames = new HashSet<>(); + final List<Node> nodeList = getRandomNodes(nodeSize); + final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>(); + for (int i = 0; i < tpSize; i++) { + final String topicName = CHANGELOG_TOPIC_PREFIX + i; + changelogNames.add(topicName); + + final Node firstNode = nodeList.get(i % nodeSize); + final Node secondNode = nodeList.get((i + 1) % nodeSize); + final TopicPartitionInfo info = new TopicPartitionInfo(0, firstNode, Arrays.asList(firstNode, secondNode), Collections.emptyList()); + + topicPartitionInfo.computeIfAbsent(topicName, tp -> new ArrayList<>()).add(info); + } + + final MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager); + doReturn(topicPartitionInfo).when(spyTopicManager).getTopicPartitionInfo(changelogNames); + return spyTopicManager; + } + + private SortedMap<UUID, ClientState> getRandomClientState(final int clientSize, final int tpSize, final int maxCapacity) { final SortedMap<UUID, ClientState> clientStates = new TreeMap<>(); final List<TaskId> taskIds = new ArrayList<>(tpSize); for (int i = 0; i < tpSize; i++) { taskIds.add(new TaskId(i, 0)); } Collections.shuffle(taskIds); + final Random random = new Random(); Review Comment: We should generate our own seed and print/log the seed -- in case a test fails, we can use the seed to reproduce the issue. ``` final long seed = System.currentTimeMillis(); log.debug("seed: " + seed); final Random random = new Random(seed); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org