Dumitru Postoronca created KAFKA-4950:
-----------------------------------------
Summary: ConcurrentModificationException when iterating over Kafka
Metrics
Key: KAFKA-4950
URL: https://issues.apache.org/jira/browse/KAFKA-4950
Project: Kafka
Issue Type: Bug
Affects Versions: 0.10.1.1
Reporter: Dumitru Postoronca
Priority: Minor
It looks like the when calling {{PartitionStates.partitionSet()}}, while the
resulting Hashmap is being built, the internal state of the allocations can
change, which leads to ConcurrentModificationException during the copy
operation.
{code}
java.util.ConcurrentModificationException
at
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
at
java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.<init>(HashSet.java:119)
at
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66)
at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783)
at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
{code}
{code}
// client code:
private final KafkaConsumer client;
Map<MetricName, org.apache.kafka.common.Metric> m = client.metrics();
for (Map.Entry<MetricName, org.apache.kafka.common.Metric> e :
m.entrySet()) {
gauges.put(name(e.getKey().group(), e.getKey().name(), "count"),
new Gauge<Double>() {
@Override
public Double getValue() {
return e.getValue().value(); // exception thrown here
}
});
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)