[ 
https://issues.apache.org/jira/browse/FLINK-9242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710599#comment-16710599
 ] 

Miguel E. Coimbra commented on FLINK-9242:
------------------------------------------

[~NicoK] I am getting similar behavior now on Apache Flink 1.8-SNAPSHOT.

Execution holds at a random point.

Have the components pertaining this issue been modified again?

> LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING
> ----------------------------------------------------------------------------
>
>                 Key: FLINK-9242
>                 URL: https://issues.apache.org/jira/browse/FLINK-9242
>             Project: Flink
>          Issue Type: Bug
>          Components: Cluster Management
>    Affects Versions: 1.5.0, 1.6.0
>         Environment: *SETUP 1*
> - Windows 7 Pro x64
> - Java 1.8.0_162 x64
> - 8 GB RAM
> - Intel i7 620M
> *SETUP 2*
> - Slackware 14.2 x64 GNU/Linux 4.4.88
> - Java openjdk version "1.8.0_151"
> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
> - 256 GB RAM
> - 8x Intel(R) Xeon(R) CPU E7- 4830
>            Reporter: Miguel E. Coimbra
>            Priority: Major
>         Attachments: flink_debugging.PNG
>
>
> Hello,
> As per Fabian Hueske's advice on the mailing list, I am detailing the problem 
> here.
>  This happens on my code in both 1.5-SNAPSHOT and 1.6-SNAPSHOT but not on 
> 1.4.2 (stable).
>  I believe it might be some sort of regression which was introduced post 
> 1.4.2.
> I'm getting different DataSet operators blocked on java.lang.Thread.State: 
> WAITING for no apparent reason.
>  I only tested this using a LocalEnvironment which is created like so:
> {code:java}
> final Configuration conf = new Configuration();
> conf.setString("web.log.path", logPath);
> conf.setString("jobmanager.rpc.address", "127.0.0.1");
> conf.setString("web.port", "8081-9000");
> conf.setString("query.server.ports", "2000-30000");
> conf.setString("query.proxy.ports", "30001-60000");
> LocalEnvironment lenv = (LocalEnvironment) 
> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> {code}
> (also tried creating the LocalEnvironment without the web interface and it 
> also happens)
> I have debugged with IntelliJ IDEA and obtained thread dumps from different 
> executions, and realized quite a few operator threads are stuck on 
> java.lang.Thread.State: WAITING.
> I cannot share my code at the moment, but essentially I have a series of jobs 
> and some use common data (I made sure it was written to disk in job _i_ and 
> read back from disk in job _i + 1_)
> There are three major threads that I find to be in this waiting state.
> I'm running on local mode with a parallelism of one.
>  The thread dumps I obtained show me where the wait calls originated:
>  
> {code:java}
> Number 1:
> "CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct 
> at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA 
> waiting
>   java.lang.Thread.State: WAITING
>       at java.lang.Object.wait(Object.java:-1)
>       at java.lang.Object.wait(Object.java:502)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>       at 
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>       at 
> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:java}
> Number 2:
> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5 
> tid=0xd8e nid=NA waiting
>   java.lang.Thread.State: WAITING
>       at java.lang.Object.wait(Object.java:-1)
>       at java.lang.Object.wait(Object.java:502)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>       at 
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
>       at 
> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:java}
> Number 3:
> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75 
> nid=NA waiting
>   java.lang.Thread.State: WAITING
>       at java.lang.Object.wait(Object.java:-1)
>       at java.lang.Object.wait(Object.java:502)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
>       at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
>       at 
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
>       at 
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
>       at 
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
>       at 
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
>       at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
>       at 
> org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
>       at 
> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
>       at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
>       at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748){code}
>  
> While I realize these dumps on their own may not be helpful, they at least 
> (as far as I know) indicate that the threads are all waiting on something.
>  But if it was resource scarcity I believe the program would terminate with 
> an exception.
>  And if it was garbage collection activity, I believe the JVM process would 
> not be at 0% CPU usage.
> I realize I didn't provide the user-code code that generates the execution 
> plan for Flink which led to the contexts in which the threads are waiting, my 
> apologies. I will do so as soon a I get a chance.
> To highlight the symptoms:
>  - The memory assigned to the JVM is fully used, but there are no exceptions 
> about lack of memory (and the system had plenty more memory available).
>  - The CPU usage is at 0% and all threads are all in a waiting state, but I 
> don't understand what signal they're waiting for exactly.
> I noticed something suspicious as well: I have chains of operators where the 
> first operator will ingest the expected amount of records but will not emit 
> any, leaving the following operator empty in a "RUNNING" state (see attached 
> image).
> I think we may consider there is some complexity in my scenario, at least 
> when compared to samples in the Flink documentation. When visualizing the job 
> plan, it is necessary to zoom in and out to check on specific parts of the 
> execution scheme.
> Among the sequence of operations, I am:
> 1 - Creating a DataSet
> 2 - Using it as an initial workset in a DeltaIteration
> 2.1 - Joining the workset on each iteration with the edges of a graph
> 3 - Using the final solution set resulting from the DeltaIteration to build a 
> graph and execute an algorithm over it (.run method).
> - The graph is not prohibitively big and I have a very low limit on the 
> number of iterations (at most 4 or 5).
>  I will add more information as soon as it is available.
> It seems, however, that there is some sort of lack of synchronization 
> occurring and perhaps the operators _become isolated_?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to