drcrallen edited a comment on issue #6014: Optionally refuse to consume new data until the prior chunk is being consumed URL: https://github.com/apache/incubator-druid/pull/6014#issuecomment-406045358 I'm a bit baffled what is blocking here. There are a few competing threads for locks: ``` "HttpClient-Netty-Worker-87" - Thread t@123 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - parking to wait for <28fa59c3> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at io.druid.client.DirectDruidClient$1.handleChunk(DirectDruidClient.java:335) at io.druid.java.util.http.client.NettyHttpClient$1.messageReceived(NettyHttpClient.java:225) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.timeout.ReadTimeoutHandler.messageReceived(ReadTimeoutHandler.java:184) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:135) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:459) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536) at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:485) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:92) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Locked ownable synchronizers: - locked <576b7c74> (a java.util.concurrent.ThreadPoolExecutor$Worker) ``` for producing into the inputstream queue, and ``` "processing-fjp-3" - Thread t@437 java.lang.Thread.State: WAITING at sun.misc.Unsafe.park(Native Method) - parking to wait for <4e912efa> (a com.google.common.util.concurrent.AbstractFuture$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:285) at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) at io.druid.client.DirectDruidClient$JsonParserIterator$1.block(DirectDruidClient.java:619) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313) at io.druid.client.DirectDruidClient$JsonParserIterator.init(DirectDruidClient.java:613) at io.druid.client.DirectDruidClient$JsonParserIterator.hasNext(DirectDruidClient.java:574) at io.druid.java.util.common.guava.BaseSequence.makeYielder(BaseSequence.java:87) at io.druid.java.util.common.guava.BaseSequence.toYielder(BaseSequence.java:67) at io.druid.java.util.common.guava.MappedSequence.toYielder(MappedSequence.java:49) at io.druid.java.util.common.guava.MergeSequence.lambda$toYielder$0(MergeSequence.java:56) at io.druid.java.util.common.guava.MergeSequence$$Lambda$177/186424324.accumulate(Unknown Source) at io.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:45) at io.druid.java.util.common.guava.MergeSequence.toYielder(MergeSequence.java:53) at io.druid.java.util.common.guava.YieldingSequenceBase.accumulate(YieldingSequenceBase.java:32) at io.druid.java.util.common.guava.Sequence.toList(Sequence.java:76) at io.druid.java.util.common.guava.MergeWorkTask.exec(MergeWorkTask.java:199) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Locked ownable synchronizers: - None ``` for initializing the results response. Also ``` "processing-fjp-0" - Thread t@434 java.lang.Thread.State: TIMED_WAITING at sun.misc.Unsafe.park(Native Method) - parking to wait for <6b5e2920> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) at io.druid.client.DirectDruidClient$1$2.nextElement(DirectDruidClient.java:303) at io.druid.client.DirectDruidClient$1$2.nextElement(DirectDruidClient.java:279) at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:110) at java.io.SequenceInputStream.read(SequenceInputStream.java:211) at com.fasterxml.jackson.dataformat.smile.SmileParser.loadMore(SmileParser.java:412) at com.fasterxml.jackson.dataformat.smile.SmileParser.nextToken(SmileParser.java:590) at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.mapObject(UntypedObjectDeserializer.java:652) at com.fasterxml.jackson.databind.deser.std.UntypedObjectDeserializer$Vanilla.deserialize(UntypedObjectDeserializer.java:496) at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:245) at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:217) at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:25) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromArray(BeanDeserializerBase.java:1229) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:157) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:136) at com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:520) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:463) at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:378) at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1099) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:296) at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:133) at com.fasterxml.jackson.databind.ObjectMapper._readValue(ObjectMapper.java:3708) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2052) at io.druid.client.DirectDruidClient$JsonParserIterator.next(DirectDruidClient.java:593) at io.druid.java.util.common.guava.BaseSequence.makeYielder(BaseSequence.java:88) at io.druid.java.util.common.guava.BaseSequence.toYielder(BaseSequence.java:67) at io.druid.java.util.common.guava.MappedSequence.toYielder(MappedSequence.java:49) at io.druid.java.util.common.guava.MergeSequence.lambda$toYielder$0(MergeSequence.java:56) at io.druid.java.util.common.guava.MergeSequence$$Lambda$177/186424324.accumulate(Unknown Source) at io.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:45) at io.druid.java.util.common.guava.MergeSequence.toYielder(MergeSequence.java:53) at io.druid.java.util.common.guava.YieldingSequenceBase.accumulate(YieldingSequenceBase.java:32) at io.druid.java.util.common.guava.Sequence.toList(Sequence.java:76) at io.druid.java.util.common.guava.MergeWorkTask.exec(MergeWorkTask.java:199) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Locked ownable synchronizers: - None ``` for moving data from the input streams into the merge sequences Still looking to see where things are deadlocking
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
