Xuguang zhan created KAFKA-15064:
------------------------------------
Summary: Use KafkaTemplate to send message with below exception -
IllegalMonitorStateException
Key: KAFKA-15064
URL: https://issues.apache.org/jira/browse/KAFKA-15064
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 3.1.2
Reporter: Xuguang zhan
*Running env:*
1.openjdk-17.0.2-5.el7.x86_64
2.Spring-kafka :2.8.11
3. Kafka client: 3.1.2
Special case would be: one Tomcat have three web applications or we say context
, Kafka client put into tomcat share lib.
*Java Stack:*
java.lang.IllegalMonitorStateException: current thread is not owner
at java.lang.Object.wait(Native Method) ~[?:?]
at org.apache.kafka.common.utils.SystemTime.waitObject(SystemTime.java:55)
~[kafka-clients-3.1.2.jar:?]
at
org.apache.kafka.clients.producer.internals.ProducerMetadata.awaitUpdate(ProducerMetadata.java:119)
~[kafka-clients-3.1.2.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1088)
~[kafka-clients-3.1.2.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:935)
~[kafka-clients-3.1.2.jar:?]
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:914)
~[kafka-clients-3.1.2.jar:?]
at
org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1087)
~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655)
~[spring-kafka-2.8.11.jar:2.8.11]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:429)
~[spring-kafka-2.8.11.jar:2.8.11]
Below is low level code stack :
{code:java}
// @Override
public void waitObject(Object obj, Supplier<Boolean> condition, long
deadlineMs) throws InterruptedException {
synchronized (obj) {
while (true) {
if (condition.get())
return;
long currentTimeMs = milliseconds();
if (currentTimeMs >= deadlineMs)
throw new TimeoutException("Condition not satisfied before
deadline");
obj.wait(deadlineMs - currentTimeMs);
}
}
} {code}
{code:java}
// code placeholder
/**
* Wait for metadata update until the current version is larger than the
last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long
timeoutMs) throws InterruptedException {
long currentTimeMs = time.milliseconds();
long deadlineMs = currentTimeMs + timeoutMs < 0 ? Long.MAX_VALUE :
currentTimeMs + timeoutMs;
time.waitObject(this, () -> {
// Throw fatal exceptions, if there are any. Recoverable topic
errors will be handled by the caller.
maybeThrowFatalException();
return updateVersion() > lastVersion || isClosed();
}, deadlineMs);
if (isClosed())
throw new KafkaException("Requested metadata update after close");
}
{code}
I checked same issue check the jira which have been reported
https://issues.apache.org/jira/browse/KAFKA-10902
--
This message was sent by Atlassian Jira
(v8.20.10#820010)