This is an automated email from the ASF dual-hosted git repository.
wcarlson pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 69a5f14b523 KAFKA-15415: On producer-batch retry, skip-backoff on a
new leader (#14384) (#14522)
69a5f14b523 is described below
commit 69a5f14b52314a011b52b53f223848339f61bbd7
Author: Mayank Shekhar Narula <[email protected]>
AuthorDate: Wed Oct 11 16:18:12 2023 +0100
KAFKA-15415: On producer-batch retry, skip-backoff on a new leader (#14384)
(#14522)
This PR backports https://github.com/apache/kafka/pull/14384
Reviewers: Walker Carlson <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../clients/producer/internals/ProducerBatch.java | 49 +++
.../producer/internals/RecordAccumulator.java | 56 ++-
.../kafka/clients/producer/internals/Sender.java | 6 +-
.../producer/internals/ProducerBatchTest.java | 51 +++
.../producer/internals/RecordAccumulatorTest.java | 427 +++++++++++++++++----
.../clients/producer/internals/SenderTest.java | 89 ++++-
.../producer/internals/TransactionManagerTest.java | 24 +-
8 files changed, 600 insertions(+), 104 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 2ded5338710..88c5c2016ca 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -93,7 +93,7 @@
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
<suppress checks="NPathComplexity"
-
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler).java"/>
+
files="(ConsumerCoordinator|BufferPool|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest|KafkaProducer|AbstractStickyAssignor|KafkaRaftClient|Authorizer|FetchSessionHandler|RecordAccumulator).java"/>
<suppress checks="(JavaNCSS|CyclomaticComplexity|MethodLength)"
files="CoordinatorClient.java"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 4da03627be1..408b8316eb8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.util.Optional;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
@@ -79,6 +80,11 @@ public final class ProducerBatch {
private boolean retry;
private boolean reopened;
+ // Tracks the current-leader's epoch to which this batch would be sent, in
the current to produce the batch.
+ private Optional<Integer> currentLeaderEpoch;
+ // Tracks the attempt in which leader was changed to currentLeaderEpoch
for the 1st time.
+ private int attemptsWhenLeaderLastChanged;
+
public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder
recordsBuilder, long createdMs) {
this(tp, recordsBuilder, createdMs, false);
}
@@ -94,9 +100,42 @@ public final class ProducerBatch {
this.isSplitBatch = isSplitBatch;
float compressionRatioEstimation =
CompressionRatioEstimator.estimation(topicPartition.topic(),
recordsBuilder.compressionType());
+ this.currentLeaderEpoch = Optional.empty();
+ this.attemptsWhenLeaderLastChanged = 0;
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
}
+ /**
+ * It will update the leader to which this batch will be produced for the
ongoing attempt, if a newer leader is known.
+ * @param latestLeaderEpoch latest leader's epoch.
+ */
+ void maybeUpdateLeaderEpoch(Optional<Integer> latestLeaderEpoch) {
+ if (!currentLeaderEpoch.equals(latestLeaderEpoch)) {
+ log.trace("For {}, leader will be updated, currentLeaderEpoch: {},
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
+ this, currentLeaderEpoch, attemptsWhenLeaderLastChanged,
latestLeaderEpoch, attempts);
+ attemptsWhenLeaderLastChanged = attempts();
+ currentLeaderEpoch = latestLeaderEpoch;
+ } else {
+ log.trace("For {}, leader wasn't updated, currentLeaderEpoch: {},
attemptsWhenLeaderLastChanged:{}, latestLeaderEpoch: {}, current attempt: {}",
+ this, currentLeaderEpoch, attemptsWhenLeaderLastChanged,
latestLeaderEpoch, attempts);
+ }
+ }
+
+ /**
+ * It will return true, for a when batch is being retried, it will be
retried to a newer leader.
+ */
+
+ boolean hasLeaderChangedForTheOngoingRetry() {
+ int attempts = attempts();
+ boolean isRetry = attempts >= 1;
+ if (!isRetry)
+ return false;
+ if (attempts == attemptsWhenLeaderLastChanged)
+ return true;
+ return false;
+ }
+
+
/**
* Append the record to the current record set and return the relative
offset within that record set
*
@@ -517,4 +556,14 @@ public final class ProducerBatch {
public boolean sequenceHasBeenReset() {
return reopened;
}
+
+ // VisibleForTesting
+ Optional<Integer> currentLeaderEpoch() {
+ return currentLeaderEpoch;
+ }
+
+ // VisibleForTesting
+ int attemptsWhenLeaderLastChanged() {
+ return attemptsWhenLeaderLastChanged;
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 983426316df..98d0bfea22a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
@@ -640,7 +641,7 @@ public class RecordAccumulator {
* into the set of ready nodes. If partition has no leader, add the topic
to the set of topics with
* no leader. This function also calculates stats for adaptive
partitioning.
*
- * @param cluster The cluster metadata
+ * @param metadata The cluster metadata
* @param nowMs The current time
* @param topic The topic
* @param topicInfo The topic info
@@ -649,14 +650,14 @@ public class RecordAccumulator {
* @param unknownLeaderTopics The set of topics with no leader (to be
filled in)
* @return The delay for next check
*/
- private long partitionReady(Cluster cluster, long nowMs, String topic,
+ private long partitionReady(Metadata metadata, long nowMs, String topic,
TopicInfo topicInfo,
long nextReadyCheckDelayMs, Set<Node>
readyNodes, Set<String> unknownLeaderTopics) {
ConcurrentMap<Integer, Deque<ProducerBatch>> batches =
topicInfo.batches;
// Collect the queue sizes for available partitions to be used in
adaptive partitioning.
int[] queueSizes = null;
int[] partitionIds = null;
- if (enableAdaptivePartitioning && batches.size() >=
cluster.partitionsForTopic(topic).size()) {
+ if (enableAdaptivePartitioning && batches.size() >=
metadata.fetch().partitionsForTopic(topic).size()) {
// We don't do adaptive partitioning until we scheduled at least a
batch for all
// partitions (i.e. we have the corresponding entries in the
batches map), we just
// do uniform. The reason is that we build queue sizes from the
batches map,
@@ -672,7 +673,9 @@ public class RecordAccumulator {
TopicPartition part = new TopicPartition(topic, entry.getKey());
// Advance queueSizesIndex so that we properly index available
// partitions. Do it here so that it's done for all code paths.
- Node leader = cluster.leaderFor(part);
+
+ Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(part);
+ Node leader = leaderAndEpoch.leader.orElse(null);
if (leader != null && queueSizes != null) {
++queueSizesIndex;
assert queueSizesIndex < queueSizes.length;
@@ -701,7 +704,8 @@ public class RecordAccumulator {
}
waitedTimeMs = batch.waitedTimeMs(nowMs);
- backingOff = batch.attempts() > 0 && waitedTimeMs <
retryBackoffMs;
+ batch.maybeUpdateLeaderEpoch(leaderAndEpoch.epoch);
+ backingOff =
shouldBackoff(batch.hasLeaderChangedForTheOngoingRetry(), batch, waitedTimeMs);
dequeSize = deque.size();
full = dequeSize > 1 || batch.isFull();
}
@@ -760,7 +764,7 @@ public class RecordAccumulator {
* </ul>
* </ol>
*/
- public ReadyCheckResult ready(Cluster cluster, long nowMs) {
+ public ReadyCheckResult ready(Metadata metadata, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
@@ -768,7 +772,7 @@ public class RecordAccumulator {
// cumulative frequency table (used in partitioner).
for (Map.Entry<String, TopicInfo> topicInfoEntry :
this.topicInfoMap.entrySet()) {
final String topic = topicInfoEntry.getKey();
- nextReadyCheckDelayMs = partitionReady(cluster, nowMs, topic,
topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes,
unknownLeaderTopics);
+ nextReadyCheckDelayMs = partitionReady(metadata, nowMs, topic,
topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes,
unknownLeaderTopics);
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs,
unknownLeaderTopics);
}
@@ -788,6 +792,19 @@ public class RecordAccumulator {
return false;
}
+ private boolean shouldBackoff(boolean hasLeaderChanged, final
ProducerBatch batch, final long waitedTimeMs) {
+ boolean shouldWaitMore = batch.attempts() > 0 && waitedTimeMs <
retryBackoffMs;
+ boolean shouldBackoff = !hasLeaderChanged && shouldWaitMore;
+ if (shouldBackoff) {
+ log.trace(
+ "For {}, will backoff", batch);
+ } else {
+ log.trace(
+ "For {}, will not backoff, shouldWaitMore {}, hasLeaderChanged
{}", batch, shouldWaitMore, hasLeaderChanged);
+ }
+ return shouldBackoff;
+ }
+
private boolean shouldStopDrainBatchesForPartition(ProducerBatch first,
TopicPartition tp) {
ProducerIdAndEpoch producerIdAndEpoch = null;
if (transactionManager != null) {
@@ -826,22 +843,31 @@ public class RecordAccumulator {
return false;
}
- private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node
node, int maxSize, long now) {
+ private List<ProducerBatch> drainBatchesForOneNode(Metadata metadata, Node
node, int maxSize, long now) {
int size = 0;
- List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
+ List<PartitionInfo> parts =
metadata.fetch().partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
+ if (parts.isEmpty())
+ return ready;
/* to make starvation less likely each node has it's own drainIndex */
int drainIndex = getDrainIndex(node.idString());
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
+
TopicPartition tp = new TopicPartition(part.topic(),
part.partition());
updateDrainIndex(node.idString(), drainIndex);
drainIndex = (drainIndex + 1) % parts.size();
// Only proceed if the partition has no in-flight batches.
if (isMuted(tp))
continue;
-
+ Metadata.LeaderAndEpoch leaderAndEpoch =
metadata.currentLeader(tp);
+ // Although a small chance, but skip this partition if leader has
changed since the partition -> node assignment obtained from outside the loop.
+ // In this case, skip sending it to the old leader, as it would
return aa NO_LEADER_OR_FOLLOWER error.
+ if (!leaderAndEpoch.leader.isPresent())
+ continue;
+ if (!node.equals(leaderAndEpoch.leader.get()))
+ continue;
Deque<ProducerBatch> deque = getDeque(tp);
if (deque == null)
continue;
@@ -854,9 +880,9 @@ public class RecordAccumulator {
continue;
// first != null
- boolean backoff = first.attempts() > 0 &&
first.waitedTimeMs(now) < retryBackoffMs;
+ first.maybeUpdateLeaderEpoch(leaderAndEpoch.epoch);
// Only drain the batch if it is not during backoff period.
- if (backoff)
+ if (shouldBackoff(first.hasLeaderChangedForTheOngoingRetry(),
first, first.waitedTimeMs(now)))
continue;
if (size + first.estimatedSizeInBytes() > maxSize &&
!ready.isEmpty()) {
@@ -922,19 +948,19 @@ public class RecordAccumulator {
* Drain all the data for the given nodes and collate them into a list of
batches that will fit within the specified
* size on a per-node basis. This method attempts to avoid choosing the
same topic-node over and over.
*
- * @param cluster The current cluster metadata
+ * @param metadata The current cluster metadata
* @param nodes The list of node to drain
* @param maxSize The maximum number of bytes to drain
* @param now The current unix time in milliseconds
* @return A list of {@link ProducerBatch} for each node specified with
total size less than the requested maxSize.
*/
- public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node>
nodes, int maxSize, long now) {
+ public Map<Integer, List<ProducerBatch>> drain(Metadata metadata,
Set<Node> nodes, int maxSize, long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
- List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node,
maxSize, now);
+ List<ProducerBatch> ready = drainBatchesForOneNode(metadata, node,
maxSize, now);
batches.put(node.id(), ready);
}
return batches;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 75968bfba8b..ef2151e9819 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -23,7 +23,6 @@ import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.RequestCompletionHandler;
-import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
@@ -359,9 +358,8 @@ public class Sender implements Runnable {
}
private long sendProducerData(long now) {
- Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
- RecordAccumulator.ReadyCheckResult result =
this.accumulator.ready(cluster, now);
+ RecordAccumulator.ReadyCheckResult result =
this.accumulator.ready(metadata, now);
// if there are any partitions whose leaders are not known yet, force
metadata update
if (!result.unknownLeaderTopics.isEmpty()) {
@@ -396,7 +394,7 @@ public class Sender implements Runnable {
}
// create produce requests
- Map<Integer, List<ProducerBatch>> batches =
this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
+ Map<Integer, List<ProducerBatch>> batches =
this.accumulator.drain(metadata, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 03f5e0b6aa8..24629b612b2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.util.Optional;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
@@ -263,6 +264,56 @@ public class ProducerBatchTest {
testCompleteExceptionally(recordCount, topLevelException, null));
}
+ /**
+ * This tests that leader is correctly maintained & leader-change is
correctly detected across retries
+ * of the batch. It does so by testing primarily testing methods
+ * 1. maybeUpdateLeaderEpoch
+ * 2. hasLeaderChangedForTheOngoingRetry
+ */
+
+ @Test
+ public void testWithLeaderChangesAcrossRetries() {
+ ProducerBatch batch = new ProducerBatch(new TopicPartition("topic",
1), memoryRecordsBuilder, now);
+
+ // Starting state for the batch, no attempt made to send it yet.
+ assertEquals(Optional.empty(), batch.currentLeaderEpoch());
+ assertEquals(0, batch.attemptsWhenLeaderLastChanged()); // default
value
+ batch.maybeUpdateLeaderEpoch(Optional.empty());
+ assertFalse(batch.hasLeaderChangedForTheOngoingRetry());
+
+ // 1st attempt[Not a retry] to send the batch.
+ // Check leader isn't flagged as a new leader.
+ int batchLeaderEpoch = 100;
+ batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch));
+ assertFalse(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader
is assigned for 1st time");
+ assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get());
+ assertEquals(0, batch.attemptsWhenLeaderLastChanged());
+
+ // 2nd attempt[1st retry] to send the batch to a new leader.
+ // Check leader change is detected.
+ batchLeaderEpoch = 101;
+ batch.reenqueued(0);
+ batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch));
+ assertTrue(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader
has changed");
+ assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get());
+ assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+ // 2nd attempt[1st retry] still ongoing, yet to be made.
+ // Check same leaderEpoch(101) is still considered as a leader-change.
+ batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch));
+ assertTrue(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader
has changed");
+ assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get());
+ assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+ // 3rd attempt[2nd retry] to the same leader-epoch(101).
+ // Check same leaderEpoch(101) as not detected as a leader-change.
+ batch.reenqueued(0);
+ batch.maybeUpdateLeaderEpoch(Optional.of(batchLeaderEpoch));
+ assertFalse(batch.hasLeaderChangedForTheOngoingRetry(), "batch leader
has not changed");
+ assertEquals(batchLeaderEpoch, batch.currentLeaderEpoch().get());
+ assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+ }
+
private void testCompleteExceptionally(
int recordCount,
RuntimeException topLevelException,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 3605bc11711..9a0170aa7d5 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -16,7 +16,10 @@
*/
package org.apache.kafka.clients.producer.internals;
+import java.util.Optional;
+import java.util.function.Function;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Partitioner;
@@ -44,6 +47,7 @@ import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -63,6 +67,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+
import static java.util.Arrays.asList;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -79,6 +84,7 @@ public class RecordAccumulatorTest {
private int partition3 = 2;
private Node node1 = new Node(0, "localhost", 1111);
private Node node2 = new Node(1, "localhost", 1112);
+
private TopicPartition tp1 = new TopicPartition(topic, partition1);
private TopicPartition tp2 = new TopicPartition(topic, partition2);
private TopicPartition tp3 = new TopicPartition(topic, partition3);
@@ -89,15 +95,22 @@ public class RecordAccumulatorTest {
private byte[] key = "key".getBytes();
private byte[] value = "value".getBytes();
private int msgSize = DefaultRecord.sizeInBytes(0, 0, key.length,
value.length, Record.EMPTY_HEADERS);
+ Metadata metadataMock;
private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2, part3),
Collections.emptySet(), Collections.emptySet());
private Metrics metrics = new Metrics(time);
private final long maxBlockTimeMs = 1000;
private final LogContext logContext = new LogContext();
+ @BeforeEach
+ public void setup() {
+ metadataMock = setupMetadata(cluster);
+ }
+
@AfterEach
public void teardown() {
this.metrics.close();
+ Mockito.reset(metadataMock);
}
@Test
@@ -112,6 +125,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator((int) batchSize,
Integer.MAX_VALUE, CompressionType.NONE, 10);
Cluster cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2, part3, part4),
Collections.emptySet(), Collections.emptySet());
+ metadataMock = setupMetadata(cluster);
// initial data
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
@@ -120,7 +134,7 @@ public class RecordAccumulatorTest {
accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
// drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the
max request size is full after the first batch drained
- Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches1 = accum.drain(metadataMock,
new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches1, tp1, tp3);
// add record for tp1, tp3
@@ -129,11 +143,11 @@ public class RecordAccumulatorTest {
// drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the
max request size is full after the first batch drained
// The drain index should start from next topic partition, that is,
node1 => tp2, node2 => tp4
- Map<Integer, List<ProducerBatch>> batches2 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches2 = accum.drain(metadataMock,
new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches2, tp2, tp4);
// make sure in next run, the drain index will start from the beginning
- Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches3 = accum.drain(metadataMock,
new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches3, tp1, tp3);
// add record for tp2, tp3, tp4 and mute the tp4
@@ -142,7 +156,7 @@ public class RecordAccumulatorTest {
accum.append(topic, partition4, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
accum.mutePartition(tp4);
// drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4
is muted)
- Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
+ Map<Integer, List<ProducerBatch>> batches4 = accum.drain(metadataMock,
new HashSet<>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batches4, tp2, tp3);
// add record for tp1, tp2, tp3, and unmute tp4
@@ -151,7 +165,7 @@ public class RecordAccumulatorTest {
accum.append(topic, partition3, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
accum.unmutePartition(tp4);
// set maxSize as a max value, so that the all partitions in 2 nodes
should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
- Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new
HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
+ Map<Integer, List<ProducerBatch>> batches5 = accum.drain(metadataMock,
new HashSet<>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
verifyTopicPartitionInBatches(batches5, tp1, tp2, tp3, tp4);
}
@@ -188,7 +202,7 @@ public class RecordAccumulatorTest {
ProducerBatch batch = partitionBatches.peekFirst();
assertTrue(batch.isWritable());
- assertEquals(0, accum.ready(cluster, now).readyNodes.size(), "No
partitions should be ready.");
+ assertEquals(0, accum.ready(metadataMock, now).readyNodes.size(),
"No partitions should be ready.");
}
// this append doesn't fit in the first batch, so a new batch is
created and the first batch is closed
@@ -198,9 +212,9 @@ public class RecordAccumulatorTest {
assertEquals(2, partitionBatches.size());
Iterator<ProducerBatch> partitionBatchesIterator =
partitionBatches.iterator();
assertTrue(partitionBatchesIterator.next().isWritable());
- assertEquals(Collections.singleton(node1), accum.ready(cluster,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
+ assertEquals(Collections.singleton(node1), accum.ready(metadataMock,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
- List<ProducerBatch> batches = accum.drain(cluster,
Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
+ List<ProducerBatch> batches = accum.drain(metadataMock,
Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
ProducerBatch batch = batches.get(0);
@@ -229,7 +243,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
1024, compressionType, 0);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
- assertEquals(Collections.singleton(node1), accum.ready(cluster,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
+ assertEquals(Collections.singleton(node1), accum.ready(metadataMock,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
Deque<ProducerBatch> batches = accum.getDeque(tp1);
assertEquals(1, batches.size());
@@ -267,7 +281,7 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator(
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
1024, compressionType, 0);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
- assertEquals(Collections.singleton(node1), accum.ready(cluster,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
+ assertEquals(Collections.singleton(node1), accum.ready(metadataMock,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
Deque<ProducerBatch> batches = accum.getDeque(tp1);
assertEquals(1, batches.size());
@@ -291,10 +305,10 @@ public class RecordAccumulatorTest {
RecordAccumulator accum = createTestRecordAccumulator(
1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
CompressionType.NONE, lingerMs);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
- assertEquals(0, accum.ready(cluster,
time.milliseconds()).readyNodes.size(), "No partitions should be ready");
+ assertEquals(0, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size(), "No partitions should be ready");
time.sleep(10);
- assertEquals(Collections.singleton(node1), accum.ready(cluster,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
- List<ProducerBatch> batches = accum.drain(cluster,
Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
+ assertEquals(Collections.singleton(node1), accum.ready(metadataMock,
time.milliseconds()).readyNodes, "Our partition's leader should be ready");
+ List<ProducerBatch> batches = accum.drain(metadataMock,
Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id());
assertEquals(1, batches.size());
ProducerBatch batch = batches.get(0);
@@ -315,9 +329,9 @@ public class RecordAccumulatorTest {
for (int i = 0; i < appends; i++)
accum.append(tp.topic(), tp.partition(), 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
}
- assertEquals(Collections.singleton(node1), accum.ready(cluster,
time.milliseconds()).readyNodes, "Partition's leader should be ready");
+ assertEquals(Collections.singleton(node1), accum.ready(metadataMock,
time.milliseconds()).readyNodes, "Partition's leader should be ready");
- List<ProducerBatch> batches = accum.drain(cluster,
Collections.singleton(node1), 1024, 0).get(node1.id());
+ List<ProducerBatch> batches = accum.drain(metadataMock,
Collections.singleton(node1), 1024, 0).get(node1.id());
assertEquals(1, batches.size(), "But due to size bound only one
partition should have been retrieved");
}
@@ -346,8 +360,8 @@ public class RecordAccumulatorTest {
int read = 0;
long now = time.milliseconds();
while (read < numThreads * msgs) {
- Set<Node> nodes = accum.ready(cluster, now).readyNodes;
- List<ProducerBatch> batches = accum.drain(cluster, nodes, 5 *
1024, 0).get(node1.id());
+ Set<Node> nodes = accum.ready(metadataMock, now).readyNodes;
+ List<ProducerBatch> batches = accum.drain(metadataMock, nodes, 5 *
1024, 0).get(node1.id());
if (batches != null) {
for (ProducerBatch batch : batches) {
for (Record record : batch.records().records())
@@ -378,7 +392,7 @@ public class RecordAccumulatorTest {
// Partition on node1 only
for (int i = 0; i < appends; i++)
accum.append(topic, partition1, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
assertEquals(0, result.readyNodes.size(), "No nodes should be ready.");
assertEquals(lingerMs, result.nextReadyCheckDelayMs, "Next check time
should be the linger time");
@@ -387,14 +401,14 @@ public class RecordAccumulatorTest {
// Add partition on node2 only
for (int i = 0; i < appends; i++)
accum.append(topic, partition3, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
- result = accum.ready(cluster, time.milliseconds());
+ result = accum.ready(metadataMock, time.milliseconds());
assertEquals(0, result.readyNodes.size(), "No nodes should be ready.");
assertEquals(lingerMs / 2, result.nextReadyCheckDelayMs, "Next check
time should be defined by node1, half remaining linger time");
// Add data for another partition on node1, enough to make data
sendable immediately
for (int i = 0; i < appends + 1; i++)
accum.append(topic, partition2, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
- result = accum.ready(cluster, time.milliseconds());
+ result = accum.ready(metadataMock, time.milliseconds());
assertEquals(Collections.singleton(node1), result.readyNodes, "Node1
should be ready");
// Note this can actually be < linger time because it may use delays
from partitions that aren't sendable
// but have leaders with other sendable data.
@@ -416,9 +430,9 @@ public class RecordAccumulatorTest {
long now = time.milliseconds();
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, now +
lingerMs + 1);
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
now + lingerMs + 1);
assertEquals(Collections.singleton(node1), result.readyNodes, "Node1
should be ready");
- Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
+ Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock,
result.readyNodes, Integer.MAX_VALUE, now + lingerMs + 1);
assertEquals(1, batches.size(), "Node1 should be the only ready
node.");
assertEquals(1, batches.get(0).size(), "Partition 0 should only have
one batch drained.");
@@ -428,24 +442,23 @@ public class RecordAccumulatorTest {
// Put message for partition 1 into accumulator
accum.append(topic, partition2, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
- result = accum.ready(cluster, now + lingerMs + 1);
+ result = accum.ready(metadataMock, now + lingerMs + 1);
assertEquals(Collections.singleton(node1), result.readyNodes, "Node1
should be ready");
// tp1 should backoff while tp2 should not
- batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
now + lingerMs + 1);
+ batches = accum.drain(metadataMock, result.readyNodes,
Integer.MAX_VALUE, now + lingerMs + 1);
assertEquals(1, batches.size(), "Node1 should be the only ready
node.");
assertEquals(1, batches.get(0).size(), "Node1 should only have one
batch drained.");
assertEquals(tp2, batches.get(0).get(0).topicPartition, "Node1 should
only have one batch for partition 1.");
// Partition 0 can be drained after retry backoff
- result = accum.ready(cluster, now + retryBackoffMs + 1);
+ result = accum.ready(metadataMock, now + retryBackoffMs + 1);
assertEquals(Collections.singleton(node1), result.readyNodes, "Node1
should be ready");
- batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
now + retryBackoffMs + 1);
+ batches = accum.drain(metadataMock, result.readyNodes,
Integer.MAX_VALUE, now + retryBackoffMs + 1);
assertEquals(1, batches.size(), "Node1 should be the only ready
node.");
assertEquals(1, batches.get(0).size(), "Node1 should only have one
batch drained.");
assertEquals(tp1, batches.get(0).get(0).topicPartition, "Node1 should
only have one batch for partition 0.");
}
-
@Test
public void testFlush() throws Exception {
int lingerMs = Integer.MAX_VALUE;
@@ -456,14 +469,14 @@ public class RecordAccumulatorTest {
accum.append(topic, i % 3, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
assertTrue(accum.hasIncomplete());
}
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
assertEquals(0, result.readyNodes.size(), "No nodes should be ready.");
accum.beginFlush();
- result = accum.ready(cluster, time.milliseconds());
+ result = accum.ready(metadataMock, time.milliseconds());
// drain and deallocate all batches
- Map<Integer, List<ProducerBatch>> results = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> results = accum.drain(metadataMock,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertTrue(accum.hasIncomplete());
for (List<ProducerBatch> batches: results.values())
@@ -523,9 +536,9 @@ public class RecordAccumulatorTest {
}
for (int i = 0; i < numRecords; i++)
accum.append(topic, i % 3, 0L, key, value, null, new
TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster);
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
assertFalse(result.readyNodes.isEmpty());
- Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertTrue(accum.hasUndrained());
assertTrue(accum.hasIncomplete());
@@ -568,9 +581,9 @@ public class RecordAccumulatorTest {
}
for (int i = 0; i < numRecords; i++)
accum.append(topic, i % 3, 0L, key, value, null, new
TestCallback(), maxBlockTimeMs, false, time.milliseconds(), cluster);
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
assertFalse(result.readyNodes.isEmpty());
- Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE,
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock,
result.readyNodes, Integer.MAX_VALUE,
time.milliseconds());
assertTrue(accum.hasUndrained());
assertTrue(accum.hasIncomplete());
@@ -607,10 +620,10 @@ public class RecordAccumulatorTest {
if (time.milliseconds() < System.currentTimeMillis())
time.setCurrentTimeMs(System.currentTimeMillis());
accum.append(topic, partition1, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
- assertEquals(0, accum.ready(cluster,
time.milliseconds()).readyNodes.size(), "No partition should be ready.");
+ assertEquals(0, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size(), "No partition should be ready.");
time.sleep(lingerMs);
- readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ readyNodes = accum.ready(metadataMock,
time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our
partition's leader should be ready");
expiredBatches = accum.expiredBatches(time.milliseconds());
@@ -625,7 +638,7 @@ public class RecordAccumulatorTest {
time.sleep(deliveryTimeoutMs - lingerMs);
expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals(1, expiredBatches.size(), "The batch may expire when
the partition is muted");
- assertEquals(0, accum.ready(cluster,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
+ assertEquals(0, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
}
}
@@ -656,11 +669,11 @@ public class RecordAccumulatorTest {
// Test batches not in retry
for (int i = 0; i < appends; i++) {
accum.append(topic, partition1, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
- assertEquals(0, accum.ready(cluster,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
+ assertEquals(0, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
}
// Make the batches ready due to batch full
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, 0, false, time.milliseconds(), cluster);
- Set<Node> readyNodes = accum.ready(cluster,
time.milliseconds()).readyNodes;
+ Set<Node> readyNodes = accum.ready(metadataMock,
time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our
partition's leader should be ready");
// Advance the clock to expire the batch.
time.sleep(deliveryTimeoutMs + 1);
@@ -671,7 +684,7 @@ public class RecordAccumulatorTest {
accum.unmutePartition(tp1);
expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals(0, expiredBatches.size(), "All batches should have been
expired earlier");
- assertEquals(0, accum.ready(cluster,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
+ assertEquals(0, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
// Advance the clock to make the next batch ready due to linger.ms
time.sleep(lingerMs);
@@ -685,15 +698,15 @@ public class RecordAccumulatorTest {
accum.unmutePartition(tp1);
expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals(0, expiredBatches.size(), "All batches should have been
expired");
- assertEquals(0, accum.ready(cluster,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
+ assertEquals(0, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
// Test batches in retry.
// Create a retried batch
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, 0, false, time.milliseconds(), cluster);
time.sleep(lingerMs);
- readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our
partition's leader should be ready");
- Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertEquals(drained.get(node1.id()).size(), 1, "There should be only
one batch.");
time.sleep(1000L);
accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
@@ -715,7 +728,7 @@ public class RecordAccumulatorTest {
// Test that when being throttled muted batches are expired before the
throttle time is over.
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, 0, false, time.milliseconds(), cluster);
time.sleep(lingerMs);
- readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our
partition's leader should be ready");
// Advance the clock to expire the batch.
time.sleep(requestTimeout + 1);
@@ -733,7 +746,7 @@ public class RecordAccumulatorTest {
time.sleep(throttleTimeMs);
expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals(0, expiredBatches.size(), "All batches should have been
expired earlier");
- assertEquals(1, accum.ready(cluster,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
+ assertEquals(1, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size(), "No partitions should be ready.");
}
@Test
@@ -747,28 +760,28 @@ public class RecordAccumulatorTest {
int appends = expectedNumAppends(batchSize);
for (int i = 0; i < appends; i++) {
accum.append(topic, partition1, 0L, key, value,
Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds(),
cluster);
- assertEquals(0, accum.ready(cluster, now).readyNodes.size(), "No
partitions should be ready.");
+ assertEquals(0, accum.ready(metadataMock, now).readyNodes.size(),
"No partitions should be ready.");
}
time.sleep(2000);
// Test ready with muted partition
accum.mutePartition(tp1);
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
assertEquals(0, result.readyNodes.size(), "No node should be ready");
// Test ready without muted partition
accum.unmutePartition(tp1);
- result = accum.ready(cluster, time.milliseconds());
+ result = accum.ready(metadataMock, time.milliseconds());
assertTrue(result.readyNodes.size() > 0, "The batch should be ready");
// Test drain with muted partition
accum.mutePartition(tp1);
- Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertEquals(0, drained.get(node1.id()).size(), "No batch should have
been drained");
// Test drain without muted partition.
accum.unmutePartition(tp1);
- drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ drained = accum.drain(metadataMock, result.readyNodes,
Integer.MAX_VALUE, time.milliseconds());
assertTrue(drained.get(node1.id()).size() > 0, "The batch should have
been drained.");
}
@@ -818,20 +831,20 @@ public class RecordAccumulatorTest {
false, time.milliseconds(), cluster);
assertTrue(accumulator.hasUndrained());
- RecordAccumulator.ReadyCheckResult firstResult =
accumulator.ready(cluster, time.milliseconds());
+ RecordAccumulator.ReadyCheckResult firstResult =
accumulator.ready(metadataMock, time.milliseconds());
assertEquals(0, firstResult.readyNodes.size());
- Map<Integer, List<ProducerBatch>> firstDrained =
accumulator.drain(cluster, firstResult.readyNodes,
+ Map<Integer, List<ProducerBatch>> firstDrained =
accumulator.drain(metadataMock, firstResult.readyNodes,
Integer.MAX_VALUE, time.milliseconds());
assertEquals(0, firstDrained.size());
// Once the transaction begins completion, then the batch should be
drained immediately.
Mockito.when(transactionManager.isCompleting()).thenReturn(true);
- RecordAccumulator.ReadyCheckResult secondResult =
accumulator.ready(cluster, time.milliseconds());
+ RecordAccumulator.ReadyCheckResult secondResult =
accumulator.ready(metadataMock, time.milliseconds());
assertEquals(1, secondResult.readyNodes.size());
Node readyNode = secondResult.readyNodes.iterator().next();
- Map<Integer, List<ProducerBatch>> secondDrained =
accumulator.drain(cluster, secondResult.readyNodes,
+ Map<Integer, List<ProducerBatch>> secondDrained =
accumulator.drain(metadataMock, secondResult.readyNodes,
Integer.MAX_VALUE, time.milliseconds());
assertEquals(Collections.singleton(readyNode.id()),
secondDrained.keySet());
List<ProducerBatch> batches = secondDrained.get(readyNode.id());
@@ -861,16 +874,16 @@ public class RecordAccumulatorTest {
accum.reenqueue(batch, now);
time.sleep(101L);
// Drain the batch.
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
assertTrue(result.readyNodes.size() > 0, "The batch should be ready");
- Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertEquals(1, drained.size(), "Only node1 should be drained");
assertEquals(1, drained.get(node1.id()).size(), "Only one batch should
be drained");
// Split and reenqueue the batch.
accum.splitAndReenqueue(drained.get(node1.id()).get(0));
time.sleep(101L);
- drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ drained = accum.drain(metadataMock, result.readyNodes,
Integer.MAX_VALUE, time.milliseconds());
assertFalse(drained.isEmpty());
assertFalse(drained.get(node1.id()).isEmpty());
drained.get(node1.id()).get(0).complete(acked.get(), 100L);
@@ -878,7 +891,7 @@ public class RecordAccumulatorTest {
assertTrue(future1.isDone());
assertEquals(0, future1.get().offset());
- drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ drained = accum.drain(metadataMock, result.readyNodes,
Integer.MAX_VALUE, time.milliseconds());
assertFalse(drained.isEmpty());
assertFalse(drained.get(node1.id()).isEmpty());
drained.get(node1.id()).get(0).complete(acked.get(), 100L);
@@ -899,14 +912,14 @@ public class RecordAccumulatorTest {
int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20);
assertTrue(numSplitBatches > 0, "There should be some split batches");
// Drain all the split batches.
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
for (int i = 0; i < numSplitBatches; i++) {
Map<Integer, List<ProducerBatch>> drained =
- accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ accum.drain(metadataMock, result.readyNodes,
Integer.MAX_VALUE, time.milliseconds());
assertFalse(drained.isEmpty());
assertFalse(drained.get(node1.id()).isEmpty());
}
- assertTrue(accum.ready(cluster,
time.milliseconds()).readyNodes.isEmpty(), "All the batches should have been
drained.");
+ assertTrue(accum.ready(metadataMock,
time.milliseconds()).readyNodes.isEmpty(), "All the batches should have been
drained.");
assertEquals(bufferCapacity, accum.bufferPoolAvailableMemory(),
"The split batches should be allocated off the accumulator");
}
@@ -953,16 +966,16 @@ public class RecordAccumulatorTest {
batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 *
batchSize, CompressionType.NONE, lingerMs);
accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, time.milliseconds(), cluster);
- Set<Node> readyNodes = accum.ready(cluster,
time.milliseconds()).readyNodes;
- Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Set<Node> readyNodes = accum.ready(metadataMock,
time.milliseconds()).readyNodes;
+ Map<Integer, List<ProducerBatch>> drained = accum.drain(metadataMock,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertTrue(drained.isEmpty());
//assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
// advanced clock and send one batch out but it should not be included
in soon to expire inflight
// batches because batch's expiry is quite far.
time.sleep(lingerMs + 1);
- readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
- drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes;
+ drained = accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE,
time.milliseconds());
assertEquals(1, drained.size(), "A batch did not drain after linger");
//assertTrue(accum.soonToExpireInFlightBatches().isEmpty());
@@ -971,8 +984,8 @@ public class RecordAccumulatorTest {
time.sleep(lingerMs * 4);
// Now drain and check that accumulator picked up the drained batch
because its expiry is soon.
- readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
- drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE,
time.milliseconds());
+ readyNodes = accum.ready(metadataMock, time.milliseconds()).readyNodes;
+ drained = accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE,
time.milliseconds());
assertEquals(1, drained.size(), "A batch did not drain after linger");
}
@@ -994,9 +1007,9 @@ public class RecordAccumulatorTest {
for (Boolean mute : muteStates) {
accum.append(topic, partition1, 0L, key, value,
Record.EMPTY_HEADERS, null, 0, false, time.milliseconds(), cluster);
time.sleep(lingerMs);
- readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
+ readyNodes = accum.ready(metadataMock,
time.milliseconds()).readyNodes;
assertEquals(Collections.singleton(node1), readyNodes, "Our
partition's leader should be ready");
- Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster,
readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> drained =
accum.drain(metadataMock, readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertEquals(1, drained.get(node1.id()).size(), "There should be
only one batch.");
time.sleep(rtt);
accum.reenqueue(drained.get(node1.id()).get(0),
time.milliseconds());
@@ -1008,7 +1021,7 @@ public class RecordAccumulatorTest {
// test expiration
time.sleep(deliveryTimeoutMs - rtt);
- accum.drain(cluster, Collections.singleton(node1),
Integer.MAX_VALUE, time.milliseconds());
+ accum.drain(metadataMock, Collections.singleton(node1),
Integer.MAX_VALUE, time.milliseconds());
expiredBatches = accum.expiredBatches(time.milliseconds());
assertEquals(mute ? 1 : 0, expiredBatches.size(),
"RecordAccumulator has expired batches if the partition is not muted");
}
@@ -1049,12 +1062,12 @@ public class RecordAccumulatorTest {
// We only appended if we do not retry.
if (!switchPartition) {
appends++;
- assertEquals(0, accum.ready(cluster, now).readyNodes.size(),
"No partitions should be ready.");
+ assertEquals(0, accum.ready(metadataMock,
now).readyNodes.size(), "No partitions should be ready.");
}
}
// Batch should be full.
- assertEquals(1, accum.ready(cluster,
time.milliseconds()).readyNodes.size());
+ assertEquals(1, accum.ready(metadataMock,
time.milliseconds()).readyNodes.size());
assertEquals(appends, expectedAppends);
switchPartition = false;
@@ -1182,7 +1195,7 @@ public class RecordAccumulatorTest {
}
// Let the accumulator generate the probability tables.
- accum.ready(cluster, time.milliseconds());
+ accum.ready(metadataMock, time.milliseconds());
// Set up callbacks so that we know what partition is chosen.
final AtomicInteger partition = new
AtomicInteger(RecordMetadata.UNKNOWN_PARTITION);
@@ -1226,7 +1239,7 @@ public class RecordAccumulatorTest {
// Test that partitions residing on high-latency nodes don't get
switched to.
accum.updateNodeLatencyStats(0, time.milliseconds() - 200, true);
accum.updateNodeLatencyStats(0, time.milliseconds(), false);
- accum.ready(cluster, time.milliseconds());
+ accum.ready(metadataMock, time.milliseconds());
// Do one append, because partition gets switched after append.
accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null,
largeValue, Record.EMPTY_HEADERS,
@@ -1263,9 +1276,9 @@ public class RecordAccumulatorTest {
time.sleep(10);
// We should have one batch ready.
- Set<Node> nodes = accum.ready(cluster,
time.milliseconds()).readyNodes;
+ Set<Node> nodes = accum.ready(metadataMock,
time.milliseconds()).readyNodes;
assertEquals(1, nodes.size(), "Should have 1 leader ready");
- List<ProducerBatch> batches = accum.drain(cluster, nodes,
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
+ List<ProducerBatch> batches = accum.drain(metadataMock, nodes,
Integer.MAX_VALUE, 0).entrySet().iterator().next().getValue();
assertEquals(1, batches.size(), "Should have 1 batch ready");
int actualBatchSize = batches.get(0).records().sizeInBytes();
assertTrue(actualBatchSize > batchSize / 2, "Batch must be greater
than half batch.size");
@@ -1273,6 +1286,237 @@ public class RecordAccumulatorTest {
}
}
+ /**
+ * For a batch being retried, this validates ready() and drain() whether a
batch should skip-backoff(retries-immediately), or backoff, based on -
+ * 1. how long it has waited between retry attempts.
+ * 2. change in leader hosting the partition.
+ */
+ @Test
+ public void testReadyAndDrainWhenABatchIsBeingRetried() throws
InterruptedException {
+ int part1LeaderEpoch = 100;
+ // Create cluster metadata, partition1 being hosted by node1.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalEpoch);
+
+ int batchSize = 10;
+ int lingerMs = 10;
+ int retryBackoffMs = 100;
+ int deliveryTimeoutMs = Integer.MAX_VALUE;
+ long totalSize = 10 * 1024;
+ String metricGrpName = "producer-metrics";
+ final RecordAccumulator accum = new RecordAccumulator(logContext,
batchSize,
+ CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs,
+ metrics, metricGrpName, time, new ApiVersions(), null,
+ new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
+
+ // Create 1 batch(batchA) to be produced to partition1.
+ long now = time.milliseconds();
+ accum.append(topic, partition1, 0L, key, value, Record.EMPTY_HEADERS,
null, maxBlockTimeMs, false, now, cluster);
+
+ // 1st attempt(not a retry) to produce batchA, it should be ready &
drained to be produced.
+ {
+ now += lingerMs + 1;
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(0, batch.attemptsWhenLeaderLastChanged());
+ // Re-enqueue batch for subsequent retries & test-cases
+ accum.reenqueue(batch, now);
+ }
+
+ // In this retry of batchA, wait-time between retries is less than
configured and no leader change, so should backoff.
+ {
+ now += 1;
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertFalse(result.readyNodes.contains(node1), "Node1 is not
ready");
+
+ // Try to drain from node1, it should return no batches.
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ new HashSet<>(Arrays.asList(node1)), 999999 /* maxSize */,
now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).isEmpty(),
+ "No batches ready to be drained on Node1");
+ }
+
+ // In this retry of batchA, wait-time between retries is less than
configured and leader has changed, so should not backoff.
+ {
+ now += 1;
+ part1LeaderEpoch++;
+ // Create cluster metadata, with new leader epoch.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalPart1LeaderEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+ // Re-enqueue batch for subsequent retries/test-cases.
+ accum.reenqueue(batch, now);
+ }
+
+ // In this retry of batchA, wait-time between retries is more than
configured and no leader change, so should not backoff.
+ {
+ now += 2 * retryBackoffMs;
+ // Create cluster metadata, with new leader epoch.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalPart1LeaderEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(1, batch.attemptsWhenLeaderLastChanged());
+
+ // Re-enqueue batch for subsequent retries/test-cases.
+ accum.reenqueue(batch, now);
+ }
+
+ // In this retry of batchA, wait-time between retries is more than
configured and leader has changed, so should not backoff.
+ {
+ now += 2 * retryBackoffMs;
+ part1LeaderEpoch++;
+ // Create cluster metadata, with new leader epoch.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ final int finalPart1LeaderEpoch = part1LeaderEpoch;
+ metadataMock = setupMetadata(cluster, tp -> finalPart1LeaderEpoch);
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, now);
+ assertTrue(result.readyNodes.contains(node1), "Node1 is ready");
+
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ result.readyNodes, 999999 /* maxSize */, now);
+ assertTrue(batches.containsKey(node1.id()) &&
batches.get(node1.id()).size() == 1, "Node1 has 1 batch ready & drained");
+ ProducerBatch batch = batches.get(node1.id()).get(0);
+ assertEquals(Optional.of(part1LeaderEpoch),
batch.currentLeaderEpoch());
+ assertEquals(3, batch.attemptsWhenLeaderLastChanged());
+
+ // Re-enqueue batch for subsequent retries/test-cases.
+ accum.reenqueue(batch, now);
+ }
+ }
+
+ @Test
+ public void testDrainWithANodeThatDoesntHostAnyPartitions() {
+ int batchSize = 10;
+ int lingerMs = 10;
+ long totalSize = 10 * 1024;
+ RecordAccumulator accum = createTestRecordAccumulator(batchSize,
totalSize,
+ CompressionType.NONE, lingerMs);
+
+ // Create cluster metadata, node2 doesn't host any partitions.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null, null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1),
+ Collections.emptySet(), Collections.emptySet());
+ metadataMock = Mockito.mock(Metadata.class);
+ Mockito.when(metadataMock.fetch()).thenReturn(cluster);
+ Mockito.when(metadataMock.currentLeader(tp1)).thenReturn(
+ new Metadata.LeaderAndEpoch(Optional.of(node1),
+ Optional.of(999 /* dummy value */)));
+
+ // Drain for node2, it should return 0 batches,
+ Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock,
+ new HashSet<>(Arrays.asList(node2)), 999999 /* maxSize */,
time.milliseconds());
+ assertTrue(batches.get(node2.id()).isEmpty());
+ }
+
+ @Test
+ public void testDrainOnANodeWhenItCeasesToBeALeader() throws
InterruptedException {
+ int batchSize = 10;
+ int lingerMs = 10;
+ long totalSize = 10 * 1024;
+ RecordAccumulator accum = createTestRecordAccumulator(batchSize,
totalSize,
+ CompressionType.NONE, lingerMs);
+
+ // While node1 is being drained, leader changes from node1 -> node2
for a partition.
+ {
+ // Create cluster metadata, partition1&2 being hosted by node1&2
resp.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ part2 = new PartitionInfo(topic, partition2, node2, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2),
+ Collections.emptySet(), Collections.emptySet());
+ metadataMock = Mockito.mock(Metadata.class);
+ Mockito.when(metadataMock.fetch()).thenReturn(cluster);
+ // But metadata has a newer leader for partition1 i.e node2.
+ Mockito.when(metadataMock.currentLeader(tp1)).thenReturn(
+ new Metadata.LeaderAndEpoch(Optional.of(node2),
+ Optional.of(999 /* dummy value */)));
+ Mockito.when(metadataMock.currentLeader(tp2)).thenReturn(
+ new Metadata.LeaderAndEpoch(Optional.of(node2),
+ Optional.of(999 /* dummy value */)));
+
+ // Create 1 batch each for partition1 & partition2.
+ long now = time.milliseconds();
+ accum.append(topic, partition1, 0L, key, value,
Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, false, now, cluster);
+ accum.append(topic, partition2, 0L, key, value,
Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, false, now, cluster);
+
+ // Drain for node1, it should return 0 batches, as partition1's
leader in metadata changed.
+ // Drain for node2, it should return 1 batch, for partition2.
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ new HashSet<>(Arrays.asList(node1, node2)), 999999 /* maxSize
*/, now);
+ assertTrue(batches.get(node1.id()).isEmpty());
+ assertEquals(1, batches.get(node2.id()).size());
+ }
+
+ // Cleanup un-drained batches to have an empty accum before next test.
+ accum.abortUndrainedBatches(new RuntimeException());
+
+ // While node1 is being drained, leader changes from node1 ->
"no-leader" for partition.
+ {
+ // Create cluster metadata, partition1&2 being hosted by node1&2
resp.
+ part1 = new PartitionInfo(topic, partition1, node1, null, null,
null);
+ part2 = new PartitionInfo(topic, partition2, node2, null, null,
null);
+ cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2),
+ Collections.emptySet(), Collections.emptySet());
+ metadataMock = Mockito.mock(Metadata.class);
+ Mockito.when(metadataMock.fetch()).thenReturn(cluster);
+ // But metadata no longer has a leader for partition1.
+ Mockito.when(metadataMock.currentLeader(tp1)).thenReturn(
+ new Metadata.LeaderAndEpoch(Optional.empty(),
+ Optional.of(999 /* dummy value */)));
+ Mockito.when(metadataMock.currentLeader(tp2)).thenReturn(
+ new Metadata.LeaderAndEpoch(Optional.of(node2),
+ Optional.of(999 /* dummy value */)));
+
+ // Create 1 batch each for partition1 & partition2.
+ long now = time.milliseconds();
+ accum.append(topic, partition1, 0L, key, value,
Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, false, now, cluster);
+ accum.append(topic, partition2, 0L, key, value,
Record.EMPTY_HEADERS, null,
+ maxBlockTimeMs, false, now, cluster);
+
+ // Drain for node1, it should return 0 batches, as partition1's
leader in metadata changed.
+ // Drain for node2, it should return 1 batch, for partition2.
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock,
+ new HashSet<>(Arrays.asList(node1, node2)), 999999 /* maxSize
*/, now);
+ assertTrue(batches.get(node1.id()).isEmpty());
+ assertEquals(1, batches.get(node2.id()).size());
+ }
+ }
+
private int prepareSplitBatches(RecordAccumulator accum, long seed, int
recordSize, int numRecords)
throws InterruptedException {
Random random = new Random();
@@ -1285,9 +1529,9 @@ public class RecordAccumulatorTest {
accum.append(topic, partition1, 0L, null,
bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0,
false, time.milliseconds(), cluster);
}
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result = accum.ready(metadataMock,
time.milliseconds());
assertFalse(result.readyNodes.isEmpty());
- Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> batches = accum.drain(metadataMock,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertEquals(1, batches.size());
assertEquals(1, batches.values().iterator().next().size());
ProducerBatch batch = batches.values().iterator().next().get(0);
@@ -1303,8 +1547,8 @@ public class RecordAccumulatorTest {
boolean batchDrained;
do {
batchDrained = false;
- RecordAccumulator.ReadyCheckResult result = accum.ready(cluster,
time.milliseconds());
- Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster,
result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ RecordAccumulator.ReadyCheckResult result =
accum.ready(metadataMock, time.milliseconds());
+ Map<Integer, List<ProducerBatch>> batches =
accum.drain(metadataMock, result.readyNodes, Integer.MAX_VALUE,
time.milliseconds());
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList) {
batchDrained = true;
@@ -1421,4 +1665,27 @@ public class RecordAccumulatorTest {
txnManager,
new BufferPool(totalSize, batchSize, metrics, time,
metricGrpName));
}
+
+ /**
+ * Setup a mocked metadata object.
+ */
+ private Metadata setupMetadata(Cluster cluster) {
+ return setupMetadata(cluster, tp -> 999 /* dummy epoch */);
+ }
+
+ /**
+ * Setup a mocked metadata object.
+ */
+ private Metadata setupMetadata(Cluster cluster, final
Function<TopicPartition, Integer> epochSupplier) {
+ Metadata metadataMock = Mockito.mock(Metadata.class);
+ Mockito.when(metadataMock.fetch()).thenReturn(cluster);
+ for (String topic: cluster.topics()) {
+ for (PartitionInfo partInfo: cluster.partitionsForTopic(topic)) {
+ TopicPartition tp = new TopicPartition(partInfo.topic(),
partInfo.partition());
+ Integer partLeaderEpoch = epochSupplier.apply(tp);
+ Mockito.when(metadataMock.currentLeader(tp)).thenReturn(new
Metadata.LeaderAndEpoch(Optional.of(partInfo.leader()),
Optional.of(partLeaderEpoch)));
+ }
+ }
+ return metadataMock;
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index c0701c5cb38..06c9448a336 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -497,7 +497,7 @@ public class SenderTest {
Node clusterNode = metadata.fetch().nodes().get(0);
Map<Integer, List<ProducerBatch>> drainedBatches =
- accumulator.drain(metadata.fetch(),
Collections.singleton(clusterNode), Integer.MAX_VALUE, time.milliseconds());
+ accumulator.drain(metadata, Collections.singleton(clusterNode),
Integer.MAX_VALUE, time.milliseconds());
sender.addToInflightBatches(drainedBatches);
// Disconnect the target node for the pending produce request. This
will ensure that sender will try to
@@ -3146,6 +3146,93 @@ public class SenderTest {
txnManager.beginTransaction();
}
+ @Test
+ public void testProducerBatchRetriesWhenPartitionLeaderChanges() throws
Exception {
+ Metrics m = new Metrics();
+ SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+ try {
+ // SETUP
+ String metricGrpName = "producer-metrics-test-stats-1";
+ long totalSize = 1024 * 1024;
+ BufferPool pool = new BufferPool(totalSize, batchSize, metrics,
time,
+ metricGrpName);
+ long retryBackoffMs = 10L;
+ // lingerMs is 0 to send batch as soon as any records are
available on it.
+ this.accumulator = new RecordAccumulator(logContext, batchSize,
+ CompressionType.NONE, 0, retryBackoffMs, DELIVERY_TIMEOUT_MS,
+ metrics, metricGrpName, time, apiVersions, null, pool);
+ Sender sender = new Sender(logContext, client, metadata,
this.accumulator, false,
+ MAX_REQUEST_SIZE, ACKS_ALL,
+ 10, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS,
null,
+ apiVersions);
+ // Update metadata with leader-epochs.
+ int tp0LeaderEpoch = 100;
+ int epoch = tp0LeaderEpoch;
+ this.client.updateMetadata(
+ RequestTestUtils.metadataUpdateWith(1,
Collections.singletonMap("test", 2),
+ tp -> {
+ if (tp0.equals(tp)) {
+ return epoch;
+ } else if (tp1.equals(tp)) {
+ return 0;
+ } else {
+ throw new RuntimeException("unexpected tp " + tp);
+ }
+ }));
+
+ // Produce batch, it returns with a retry-able error like
NOT_LEADER_OR_FOLLOWER, scheduled for retry.
+ Future<RecordMetadata> futureIsProduced = appendToAccumulator(tp0,
0L, "key", "value");
+ sender.runOnce(); // connect
+ sender.runOnce(); // send produce request
+ assertEquals(1, client.inFlightRequestCount(),
+ "We should have a single produce request in flight.");
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+ assertTrue(client.hasInFlightRequests());
+ client.respond(produceResponse(tp0, -1,
Errors.NOT_LEADER_OR_FOLLOWER, 0));
+ sender.runOnce(); // receive produce response, batch scheduled for
retry
+ assertTrue(!futureIsProduced.isDone(), "Produce request is yet not
done.");
+
+ // TEST that as new-leader(with epochA) is discovered, the batch
is retried immediately i.e. skips any backoff period.
+ // Update leader epoch for tp0
+ int newEpoch = ++tp0LeaderEpoch;
+ this.client.updateMetadata(
+ RequestTestUtils.metadataUpdateWith(1,
Collections.singletonMap("test", 2),
+ tp -> {
+ if (tp0.equals(tp)) {
+ return newEpoch;
+ } else if (tp1.equals(tp)) {
+ return 0;
+ } else {
+ throw new RuntimeException("unexpected tp " + tp);
+ }
+ }));
+ sender.runOnce(); // send produce request, immediately.
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+ assertTrue(client.hasInFlightRequests());
+ client.respond(produceResponse(tp0, -1,
Errors.NOT_LEADER_OR_FOLLOWER, 0));
+ sender.runOnce(); // receive produce response, schedule batch for
retry.
+ assertTrue(!futureIsProduced.isDone(), "Produce request is yet not
done.");
+
+ // TEST that a subsequent retry to the same leader(epochA) waits
the backoff period.
+ sender.runOnce(); //send produce request
+ // No batches in-flight
+ assertEquals(0, sender.inFlightBatches(tp0).size());
+ assertTrue(!client.hasInFlightRequests());
+
+ // TEST that after waiting for longer than backoff period, batch
is retried again.
+ time.sleep(2 * retryBackoffMs);
+ sender.runOnce(); // send produce request
+ assertEquals(1, sender.inFlightBatches(tp0).size());
+ assertTrue(client.hasInFlightRequests());
+ long offset = 999;
+ client.respond(produceResponse(tp0, offset, Errors.NONE, 0));
+ sender.runOnce(); // receive response.
+ assertTrue(futureIsProduced.isDone(), "Request to tp0 successfully
done");
+ assertEquals(offset, futureIsProduced.get().offset());
+ } finally {
+ m.close();
+ }
+ }
private void verifyErrorMessage(ProduceResponse response, String
expectedMessage) throws Exception {
Future<RecordMetadata> future = appendToAccumulator(tp0, 0L, "key",
"value");
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 1fbfcec0a3e..bb6e4af7dde 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CommitFailedException;
@@ -99,6 +100,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import org.mockito.Mockito;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
@@ -2474,10 +2476,11 @@ public class TransactionManagerTest {
Cluster cluster = new Cluster(null, Arrays.asList(node1, node2),
Arrays.asList(part1, part2),
Collections.emptySet(), Collections.emptySet());
+ Metadata metadataMock = setupMetadata(cluster);
Set<Node> nodes = new HashSet<>();
nodes.add(node1);
nodes.add(node2);
- Map<Integer, List<ProducerBatch>> drainedBatches =
accumulator.drain(cluster, nodes, Integer.MAX_VALUE,
+ Map<Integer, List<ProducerBatch>> drainedBatches =
accumulator.drain(metadataMock, nodes, Integer.MAX_VALUE,
time.milliseconds());
// We shouldn't drain batches which haven't been added to the
transaction yet.
@@ -2506,8 +2509,9 @@ public class TransactionManagerTest {
PartitionInfo part1 = new PartitionInfo(topic, 1, node1, null, null);
Cluster cluster = new Cluster(null, Collections.singletonList(node1),
Collections.singletonList(part1),
Collections.emptySet(), Collections.emptySet());
+ Metadata metadataMock = setupMetadata(cluster);
appendToAccumulator(tp1);
- Map<Integer, List<ProducerBatch>> drainedBatches =
accumulator.drain(cluster, Collections.singleton(node1),
+ Map<Integer, List<ProducerBatch>> drainedBatches =
accumulator.drain(metadataMock, Collections.singleton(node1),
Integer.MAX_VALUE,
time.milliseconds());
@@ -2529,9 +2533,11 @@ public class TransactionManagerTest {
Cluster cluster = new Cluster(null, Collections.singletonList(node1),
Collections.singletonList(part1),
Collections.emptySet(), Collections.emptySet());
+ Metadata metadataMock = setupMetadata(cluster);
+
Set<Node> nodes = new HashSet<>();
nodes.add(node1);
- Map<Integer, List<ProducerBatch>> drainedBatches =
accumulator.drain(cluster, nodes, Integer.MAX_VALUE,
+ Map<Integer, List<ProducerBatch>> drainedBatches =
accumulator.drain(metadataMock, nodes, Integer.MAX_VALUE,
time.milliseconds());
// We shouldn't drain batches which haven't been added to the
transaction yet.
@@ -3833,4 +3839,16 @@ public class TransactionManagerTest {
ProducerTestUtils.runUntil(sender, condition);
}
+ private Metadata setupMetadata(Cluster cluster) {
+ Metadata metadataMock = Mockito.mock(Metadata.class);
+ Mockito.when(metadataMock.fetch()).thenReturn(cluster);
+ for (String topic: cluster.topics()) {
+ for (PartitionInfo partInfo: cluster.partitionsForTopic(topic)) {
+ TopicPartition tp = new TopicPartition(partInfo.topic(),
partInfo.partition());
+ Mockito.when(metadataMock.currentLeader(tp)).thenReturn(new
Metadata.LeaderAndEpoch(Optional.of(partInfo.leader()), Optional.of(999 /*
dummy value */)));
+ }
+ }
+ return metadataMock;
+ }
+
}