I'd disagree that I can fix the issue as you suggest, because:
- if I remove the `Collections.synchronizedMap` from the `commitMap` I
get unsynchronized map and therefore the asynchronous writes to this map
would result in undefined state
- if I remove the manual synchronization then there is a race
condition between the call to `commitSync` and `clear` of the
`commitMap` - some other thread could write to the `commitMap` between
calls to `commitSync` and `clear` and therefore the update to the map
would be lost - this is the same reason why I cannot use
ConcurrentHashMap, because there would be no synchronization between
commiting the map and clearing it
It seems to me quite natural to clone the map in call to synchronous
commit, if it cannot be guaranteed that synchronous responses are
handled by the same thread that issued the request (which in my point of
view would be the best choice, but I still don't enough understand the
details of kafka network stack).
Jan
On 02/02/2017 01:25 PM, Ismael Juma wrote:
OK, you can fix this by removing `Collections.synchronizedMap` from the
following line or by removing the synchronized blocks.
Map<TopicPartition, OffsetAndMetadata> commitMap =
Collections.synchronizedMap(...);
There is no reason to do manual and automatic synchronization at the same
time in this case. Because `Collections.synchonizedMap` uses the returned
map for synchronization, it means that even calling `get` on it will block
in this case. The consumer could copy the map to avoid this scenario as the
heartbeat thread is meant to be an implementation detail. Jason, what do
you think?
Let me know if this fixes your issue.
Ismael
On Thu, Feb 2, 2017 at 12:17 PM, Jan Lukavský <je...@seznam.cz> wrote:
Hi Ismael,
yes, no problem:
The following thread is the main thread interacting with the KafkaConsumer
(polling topic and committing offsets):
"pool-3-thread-1" #14 prio=5 os_prio=0 tid=0x00007f00f4434800 nid=0x32a9
runnable [0x00007f00b6662000]
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java
:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x00000005c0abb218> (a sun.nio.ch.Util$3)
- locked <0x00000005c0abb208> (a java.util.Collections$Unmodifi
ableSet)
- locked <0x00000005c0abaa48> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.kafka.common.network.Selector.select(Selector.jav
a:470)
at org.apache.kafka.common.network.Selector.poll(Selector.java:
286)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.ja
va:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:232)
- locked <0x00000005c0acf630> (a org.apache.kafka.clients.consu
mer.internals.ConsumerNetworkClient)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:180)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor.commitOffsetsSync(ConsumerCoordinator.java:499)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(K
afkaConsumer.java:1104)
at cz.o2.<package hidden>.KafkaCommitLog.lambda$
observePartitions$7(KafkaCommitLog.java:204)
- locked <0x00000005c0612c88> (a java.util.Collections$Synchron
izedMap)
at cz.o2.<package
hidden>.KafkaCommitLog$$Lambda$62/1960388071.run(Unknown
Source) <- here is the synchronized block that takes monitor of the
`commitMap`
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
Executor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
lExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This thread just spins around in epoll returning 0. The other thread is
the coordinator
"kafka-coordinator-heartbeat-thread | consumer" #15 daemon prio=5
os_prio=0 tid=0x00007f0084067000 nid=0x32aa waiting for monitor entry
[0x00007f00b6361000]
java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.Collections$SynchronizedMap.get(Collections.java:2
584)
- waiting to lock <0x00000005c0612c88> (a
java.util.Collections$SynchronizedMap) <- waiting for the `commitMap`,
which will never happen
at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:635) <-
handles response to the commitSync request
at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
onSuccess(RequestFuture.java:186)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fi
reSuccess(RequestFuture.java:149)
at org.apache.kafka.clients.consumer.internals.RequestFuture.co
mplete(RequestFuture.java:116)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient$RequestFutureCompletionHandler.fireCompletion(Consumer
NetworkClient.java:479)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.poll(ConsumerNetworkClient.java:219)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
lient.pollNoWakeup(ConsumerNetworkClient.java:266)
at org.apache.kafka.clients.consumer.internals.AbstractCoordina
tor$HeartbeatThread.run(AbstractCoordinator.java:865)
- locked <0x00000005c0acefc8> (a org.apache.kafka.clients.consu
mer.internals.ConsumerCoordinator)
Hope this helps, if you needed any more debug info, I'm here to help. :)
Cheers,
Jan
On 02/02/2017 12:48 PM, Ismael Juma wrote:
Hi Jan,
Do you have stacktraces showing the issue? That would help. Also, if you
can test 0.10.1.1, which is the latest stable release, that would be even
better. From looking at the code briefly, I don't see where the consumer
is
locking on the received offsets map, so not sure what would cause it to
block in the way you describe. Hopefully a stacktrace when the consumer is
blocked would clarify. You can get a stacktrace via the jstack tool.
Ismael
On Thu, Feb 2, 2017 at 10:45 AM, je.ik <je...@seznam.cz> wrote:
Hi all,
I have a question about a very suspicious behavior I see during consuming
messages using manual synchronous commit with Kafka 0.10.1.0. The code
looks something like this:
try (KafkaConsumer<...> consumer = ...) {
Map<TopicPartition, OffsetAndMetadata> commitMap =
Collections.synchronizedMap(...);
while (!Thread.currentThread().isInterrupted()) {
ConsumerRecords records = consumer.poll(..);
for (...) {
// queue records for asynchronous processing in different thread.
// when the asynchronous processing finishes, it updates the
// `commitMap', so it has to be synchronized somehow
}
synchronized (commitMap) {
// commit if we have anything to commit
if (!commitMap.isEmpty()) {
consumer.commitSync(commitMap);
commitMap.clear();
}
}
}
}
Now, what time to time happens in my case is that the consumer thread is
stuck in the call to `commitSync`. By straing the PID I found out that it
periodically epolls on an *empty* list of file descriptors. By further
investigation I found out, that response to the `commitSync` is being
handled by the kafka-coordinator-heartbeat-thread, which during handling
of the response needs to access the `commitMap`, and therefore blocks,
because the lock is being held by the application main thread. Therefore,
the whole consumption stops and ends in live-lock. The solution in my
case
was to clone the map and unsynchronize the call to `commitSync` like
this:
final Map<TopicPartition, OffsetAndMetadata> clone;
synchronized (commitMap) {
if (!commitMap.isEmpty()) {
clone = new HashMap<>(commitMap);
commitMap.clear();
} else {
clone = null;
}
}
if (clone != null) {
consumer.commitSync(clone);
}
which seems to work fine. My question is whether my interpretation of the
problem is correct and if so, should be anything done to avoid this? I
see
two possibilities - either the call to `commitSync` should clone the map
itself, or there should be somehow guaranteed that the same thread that
issues synchronous requests receives the response. Am I right?
Thanks for comments,
best,
Jan