artemlivshits commented on code in PR #12956:
URL: https://github.com/apache/kafka/pull/12956#discussion_r1045132016
##########
core/src/test/scala/integration/kafka/server/FetchFromFollowerIntegrationTest.scala:
##########
@@ -103,44 +105,89 @@ class FetchFromFollowerIntegrationTest extends
BaseFetchRequestTest {
TestUtils.generateAndProduceMessages(brokers, topic, numMessages = 10)
+ assertEquals(1, getPreferredReplica)
+
+ // Shutdown follower broker. Consumer will reach out to leader after
metadata.max.age.ms
Review Comment:
Removed.
##########
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##########
@@ -208,10 +209,10 @@ public static MetadataResponse metadataUpdateWith(final
String clusterId,
for (int i = 0; i < numPartitions; i++) {
TopicPartition tp = new TopicPartition(topic, i);
Node leader = nodes.get(i % nodes.size());
- List<Integer> replicaIds =
Collections.singletonList(leader.id());
+ List<Integer> replicaIds =
nodes.stream().map(Node::id).collect(Collectors.toList());
partitionMetadata.add(partitionSupplier.supply(
Errors.NONE, tp, Optional.of(leader.id()),
Optional.ofNullable(epochSupplier.apply(tp)),
- replicaIds, replicaIds, replicaIds));
+ replicaIds, replicaIds, Collections.emptyList()));
Review Comment:
It's needed, otherwise, all replicas are marked as offline.
##########
clients/src/test/java/org/apache/kafka/common/requests/RequestTestUtils.java:
##########
@@ -208,10 +233,10 @@ public static MetadataResponse metadataUpdateWith(final
String clusterId,
for (int i = 0; i < numPartitions; i++) {
TopicPartition tp = new TopicPartition(topic, i);
Node leader = nodes.get(i % nodes.size());
- List<Integer> replicaIds =
Collections.singletonList(leader.id());
+ List<Integer> replicaIds = leaderOnly ?
Collections.singletonList(leader.id()) :
nodes.stream().map(Node::id).collect(Collectors.toList());
Review Comment:
Updated to use an explicit flag to use all replicas vs just the leader.
@hachikuji
##########
clients/src/main/java/org/apache/kafka/common/Cluster.java:
##########
@@ -253,7 +253,11 @@ public Node nodeById(int id) {
public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
Node node = nodeById(id);
PartitionInfo partitionInfo = partition(partition);
- if (node != null && partitionInfo != null &&
!Arrays.asList(partitionInfo.offlineReplicas()).contains(node)) {
+
+ if (node != null && partitionInfo != null &&
+ !Arrays.asList(partitionInfo.offlineReplicas()).contains(node) &&
Review Comment:
Looking at the source code, asList does a shallow copy (i.e. just creates a
new adaptor on the same data, so it's O(1)), so it shouldn't be too much
overhead. Let me know if you still want to create helper functions, I'll file
a bug (if we're going to do that, then I think we should do a generic library
of functions to be used everywhere).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]