This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25b13bc KAFKA-7096 : Clear buffered data for partitions that are
explicitly unassigned by user
25b13bc is described below
commit 25b13bca7b696b349bae2e9d5e41b75496bfdb1b
Author: mgharat <[email protected]>
AuthorDate: Mon Sep 10 21:40:08 2018 -0700
KAFKA-7096 : Clear buffered data for partitions that are explicitly
unassigned by user
Author: mgharat <[email protected]>
Reviewers: Dong Lin <[email protected]>
Closes #5289 from MayureshGharat/KAFKA-7096
---
.../kafka/clients/consumer/KafkaConsumer.java | 6 ++--
.../kafka/clients/consumer/internals/Fetcher.java | 34 ++++++++++++++++++++++
.../clients/consumer/internals/FetcherTest.java | 19 ++++++++++++
3 files changed, 57 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 4ea3cfd..4cdc4f8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -918,7 +918,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
throwIfNoAssignorsConfigured();
-
+ fetcher.clearBufferedDataForUnassignedTopics(topics);
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ",
"));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
@@ -1019,10 +1019,11 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
public void unsubscribe() {
acquireAndEnsureOpen();
try {
- log.info("Unsubscribed all topics or patterns and assigned
partitions");
+
fetcher.clearBufferedDataForUnassignedPartitions(Collections.EMPTY_SET);
this.subscriptions.unsubscribe();
this.coordinator.maybeLeaveGroup();
this.metadata.needMetadataForAllTopics(false);
+ log.info("Unsubscribed all topics or patterns and assigned
partitions");
} finally {
release();
}
@@ -1063,6 +1064,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
throw new IllegalArgumentException("Topic partitions
to assign to cannot have null or empty topic");
topics.add(topic);
}
+ fetcher.clearBufferedDataForUnassignedPartitions(partitions);
// make sure the offsets of topic partitions the consumer is
unsubscribing from
// are committed since there will be no following rebalance
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index dc0daa2..a92f57e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1015,6 +1015,40 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
sensors.updatePartitionLagAndLeadSensors(assignment);
}
+ /**
+ * Clear the buffered data which are not a part of newly assigned
partitions
+ *
+ * @param assignedPartitions newly assigned {@link TopicPartition}
+ */
+ public void
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition>
assignedPartitions) {
+ Iterator<CompletedFetch> itr = completedFetches.iterator();
+ while (itr.hasNext()) {
+ TopicPartition tp = itr.next().partition;
+ if (!assignedPartitions.contains(tp)) {
+ itr.remove();
+ }
+ }
+ if (nextInLineRecords != null &&
!assignedPartitions.contains(nextInLineRecords.partition)) {
+ nextInLineRecords.drain();
+ nextInLineRecords = null;
+ }
+ }
+
+ /**
+ * Clear the buffered data which are not a part of newly assigned topics
+ *
+ * @param assignedTopics newly assigned topics
+ */
+ public void clearBufferedDataForUnassignedTopics(Collection<String>
assignedTopics) {
+ Set<TopicPartition> currentTopicPartitions = new HashSet<>();
+ for (TopicPartition tp : subscriptions.assignedPartitions()) {
+ if (assignedTopics.contains(tp.topic())) {
+ currentTopicPartitions.add(tp);
+ }
+ }
+ clearBufferedDataForUnassignedPartitions(currentTopicPartitions);
+ }
+
public static Sensor throttleTimeSensor(Metrics metrics,
FetcherMetricsRegistry metricsRegistry) {
Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
new Avg());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index d314a4d..afe5b2f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -288,6 +288,25 @@ public class FetcherTest {
}
@Test
+ public void testClearBufferedDataForTopicPartitions() {
+ subscriptions.assignFromUser(singleton(tp0));
+ subscriptions.seek(tp0, 0);
+
+ // normal fetch
+ assertEquals(1, fetcher.sendFetches());
+ assertFalse(fetcher.hasCompletedFetches());
+
+ client.prepareResponse(fullFetchResponse(tp0, this.records,
Errors.NONE, 100L, 0));
+ consumerClient.poll(time.timer(0));
+ assertTrue(fetcher.hasCompletedFetches());
+ Set<TopicPartition> newAssignedTopicPartitions = new HashSet<>();
+ newAssignedTopicPartitions.add(tp1);
+
+
fetcher.clearBufferedDataForUnassignedPartitions(newAssignedTopicPartitions);
+ assertFalse(fetcher.hasCompletedFetches());
+ }
+
+ @Test
public void testFetchSkipsBlackedOutNodes() {
subscriptions.assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);