This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 811c643 KAFKA-4950; Fix ConcurrentModificationException on
assigned-partitions metric update (#3907)
811c643 is described below
commit 811c6433bdd6c888860d18e28bd9751d57ee8e0d
Author: Sébastien Launay <[email protected]>
AuthorDate: Wed Aug 8 15:43:43 2018 -0700
KAFKA-4950; Fix ConcurrentModificationException on assigned-partitions
metric update (#3907)
Use a volatile field to track the size of the set of assigned partitions to
avoid the concurrent access to the underlying linked hash map.
Reviewers: Vahid Hashemian <[email protected]>, Rajini Sivaram
<[email protected]>, Ismael Juma <[email protected]>, Jason
Gustafson <[email protected]>
---
.../consumer/internals/ConsumerCoordinator.java | 3 +-
.../consumer/internals/SubscriptionState.java | 8 +++
.../kafka/common/internals/PartitionStates.java | 19 ++++++-
.../internals/ConsumerCoordinatorTest.java | 62 +++++++++++++++++++++-
.../consumer/internals/SubscriptionStateTest.java | 23 ++++++++
5 files changed, 111 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 8762480..ce2db35 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -942,7 +942,8 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
Measurable numParts =
new Measurable() {
public double measure(MetricConfig config, long now) {
- return subscriptions.assignedPartitions().size();
+ // Get the number of assigned partitions in a thread
safe manner
+ return subscriptions.numAssignedPartitions();
}
};
metrics.addMetric(metrics.metricName("assigned-partitions",
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index e289734..542c413 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -268,6 +268,14 @@ public class SubscriptionState {
return this.assignment.partitionSet();
}
+ /**
+ * Provides the number of assigned partitions in a thread safe manner.
+ * @return the number of assigned partitions.
+ */
+ public int numAssignedPartitions() {
+ return this.assignment.size();
+ }
+
public List<TopicPartition> fetchablePartitions() {
List<TopicPartition> fetchable = new ArrayList<>(assignment.size());
for (PartitionStates.PartitionState<TopicPartitionState> state :
assignment.partitionStates()) {
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
index 605372c..5b904c2 100644
---
a/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
+++
b/clients/src/main/java/org/apache/kafka/common/internals/PartitionStates.java
@@ -36,11 +36,17 @@ import java.util.Set;
* topic would "wrap around" and appear twice. However, as partitions are
fetched in different orders and partition
* leadership changes, we will deviate from the optimal. If this turns out to
be an issue in practice, we can improve
* it by tracking the partitions per node or calling `set` every so often.
+ *
+ * Note that this class is not thread-safe with the exception of {@link
#size()} which returns the number of
+ * partitions currently tracked.
*/
public class PartitionStates<S> {
private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
+ /* the number of partitions that are currently assigned available in a
thread safe manner */
+ private volatile int size = 0;
+
public PartitionStates() {}
public void moveToEnd(TopicPartition topicPartition) {
@@ -52,10 +58,12 @@ public class PartitionStates<S> {
public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {
map.remove(topicPartition);
map.put(topicPartition, state);
+ updateSize();
}
public void remove(TopicPartition topicPartition) {
map.remove(topicPartition);
+ updateSize();
}
/**
@@ -67,6 +75,7 @@ public class PartitionStates<S> {
public void clear() {
map.clear();
+ updateSize();
}
public boolean contains(TopicPartition topicPartition) {
@@ -95,8 +104,11 @@ public class PartitionStates<S> {
return map.get(topicPartition);
}
+ /**
+ * Get the number of partitions that are currently being tracked. This is
thread-safe.
+ */
public int size() {
- return map.size();
+ return size;
}
/**
@@ -108,6 +120,11 @@ public class PartitionStates<S> {
public void set(Map<TopicPartition, S> partitionToState) {
map.clear();
update(partitionToState);
+ updateSize();
+ }
+
+ private void updateSize() {
+ size = map.size();
}
private void update(Map<TopicPartition, S> partitionToState) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 62c70a0..2b6d303 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -30,6 +30,8 @@ import
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
@@ -55,6 +57,7 @@ import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Before;
@@ -77,6 +80,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static java.util.Collections.singleton;
@@ -440,7 +444,7 @@ public class ConsumerCoordinatorTest {
coordinator.poll(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(2, subscriptions.assignedPartitions().size());
+ assertEquals(2, subscriptions.numAssignedPartitions());
assertEquals(2, subscriptions.groupSubscription().size());
assertEquals(2, subscriptions.subscription().size());
assertEquals(1, rebalanceListener.revokedCount);
@@ -685,7 +689,7 @@ public class ConsumerCoordinatorTest {
coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));
assertFalse(coordinator.rejoinNeededOrPending());
- assertEquals(2, subscriptions.assignedPartitions().size());
+ assertEquals(2, subscriptions.numAssignedPartitions());
assertEquals(2, subscriptions.subscription().size());
assertEquals(1, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
@@ -1741,6 +1745,60 @@ public class ConsumerCoordinatorTest {
}
@Test
+ public void testThreadSafeAssignedPartitionsMetric() throws Exception {
+ // Get the assigned-partitions metric
+ final Metric metric = metrics.metric(new
MetricName("assigned-partitions", "consumer" + groupId + "-coordinator-metrics",
+ "", Collections.<String, String>emptyMap()));
+
+ // Start polling the metric in the background
+ final AtomicBoolean doStop = new AtomicBoolean();
+ final AtomicReference<Exception> exceptionHolder = new
AtomicReference<>();
+ final AtomicInteger observedSize = new AtomicInteger();
+
+ Thread poller = new Thread() {
+ @Override
+ public void run() {
+ // Poll as fast as possible to reproduce
ConcurrentModificationException
+ while (!doStop.get()) {
+ try {
+ int size = ((Double) metric.metricValue()).intValue();
+ observedSize.set(size);
+ } catch (Exception e) {
+ exceptionHolder.set(e);
+ return;
+ }
+ }
+ }
+ };
+ poller.start();
+
+ // Assign two partitions to trigger a metric change that can lead to
ConcurrentModificationException
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ // Change the assignment several times to increase likelihood of
concurrent updates
+ Set<TopicPartition> partitions = new HashSet<>();
+ int totalPartitions = 10;
+ for (int partition = 0; partition < totalPartitions; partition++) {
+ partitions.add(new TopicPartition(topic1, partition));
+ subscriptions.assignFromUser(partitions);
+ }
+
+ // Wait for the metric poller to observe the final assignment change
or raise an error
+ TestUtils.waitForCondition(new TestCondition() {
+ @Override
+ public boolean conditionMet() {
+ return observedSize.get() == totalPartitions ||
exceptionHolder.get() != null;
+ }
+ }, "Failed to observe expected assignment change");
+
+ doStop.set(true);
+ poller.join();
+
+ assertNull("Failed fetching the metric at least once",
exceptionHolder.get());
+ }
+
+ @Test
public void testCloseDynamicAssignment() throws Exception {
ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true,
true, true);
gracefulCloseTest(coordinator, true);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 24255e8..05287e0 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -49,12 +49,14 @@ public class SubscriptionStateTest {
public void partitionAssignment() {
state.assignFromUser(singleton(tp0));
assertEquals(singleton(tp0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
assertFalse(state.hasAllFetchPositions());
state.seek(tp0, 1);
assertTrue(state.isFetchable(tp0));
assertEquals(1L, state.position(tp0).longValue());
state.assignFromUser(Collections.<TopicPartition>emptySet());
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp0));
}
@@ -64,28 +66,34 @@ public class SubscriptionStateTest {
state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1)));
// assigned partitions should immediately change
assertEquals(2, state.assignedPartitions().size());
+ assertEquals(2, state.numAssignedPartitions());
assertTrue(state.assignedPartitions().contains(tp0));
assertTrue(state.assignedPartitions().contains(tp1));
state.unsubscribe();
// assigned partitions should immediately change
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
state.subscribe(singleton(topic1), rebalanceListener);
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
state.assignFromSubscribed(singleton(t1p0));
// assigned partitions should immediately change
assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
state.subscribe(singleton(topic), rebalanceListener);
// assigned partitions should remain unchanged
assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
// assigned partitions should immediately change
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
}
@Test
@@ -93,37 +101,45 @@ public class SubscriptionStateTest {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
state.subscribeFromPattern(new
HashSet<>(Collections.singletonList(topic)));
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
state.assignFromSubscribed(singleton(tp1));
// assigned partitions should immediately change
assertEquals(singleton(tp1), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
assertEquals(singleton(topic), state.subscription());
state.assignFromSubscribed(Collections.singletonList(t1p0));
// assigned partitions should immediately change
assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
assertEquals(singleton(topic), state.subscription());
state.subscribe(Pattern.compile(".*t"), rebalanceListener);
// assigned partitions should remain unchanged
assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
state.subscribeFromPattern(singleton(topic));
// assigned partitions should remain unchanged
assertEquals(singleton(t1p0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
state.assignFromSubscribed(Collections.singletonList(tp0));
// assigned partitions should immediately change
assertEquals(singleton(tp0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
assertEquals(singleton(topic), state.subscription());
state.unsubscribe();
// assigned partitions should immediately change
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
}
@Test
@@ -169,6 +185,7 @@ public class SubscriptionStateTest {
state.subscribe(singleton(topic), rebalanceListener);
assertEquals(1, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
assertTrue(state.partitionsAutoAssigned());
state.assignFromSubscribed(singleton(tp0));
state.seek(tp0, 1);
@@ -178,6 +195,7 @@ public class SubscriptionStateTest {
assertFalse(state.isAssigned(tp0));
assertFalse(state.isFetchable(tp1));
assertEquals(singleton(tp1), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
}
@Test
@@ -261,6 +279,7 @@ public class SubscriptionStateTest {
state.unsubscribe();
state.assignFromUser(singleton(tp0));
assertEquals(singleton(tp0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
}
@Test
@@ -269,17 +288,21 @@ public class SubscriptionStateTest {
state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic,
topic1)));
state.assignFromSubscribed(singleton(tp1));
assertEquals(singleton(tp1), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
assertEquals(0, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
state.assignFromUser(singleton(tp0));
assertEquals(singleton(tp0), state.assignedPartitions());
+ assertEquals(1, state.numAssignedPartitions());
state.unsubscribe();
assertEquals(0, state.subscription().size());
assertTrue(state.assignedPartitions().isEmpty());
+ assertEquals(0, state.numAssignedPartitions());
}
private static class MockRebalanceListener implements
ConsumerRebalanceListener {