[
https://issues.apache.org/jira/browse/FLINK-9242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710599#comment-16710599
]
Miguel E. Coimbra commented on FLINK-9242:
------------------------------------------
[~NicoK] I am getting similar behavior now on Apache Flink 1.8-SNAPSHOT.
Execution holds at a random point.
Have the components pertaining this issue been modified again?
> LocalEnvironment - Operator threads stuck on java.lang.Thread.State: WAITING
> ----------------------------------------------------------------------------
>
> Key: FLINK-9242
> URL: https://issues.apache.org/jira/browse/FLINK-9242
> Project: Flink
> Issue Type: Bug
> Components: Cluster Management
> Affects Versions: 1.5.0, 1.6.0
> Environment: *SETUP 1*
> - Windows 7 Pro x64
> - Java 1.8.0_162 x64
> - 8 GB RAM
> - Intel i7 620M
> *SETUP 2*
> - Slackware 14.2 x64 GNU/Linux 4.4.88
> - Java openjdk version "1.8.0_151"
> OpenJDK Runtime Environment (IcedTea 3.6.0) (Slackware)
> OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode)
> - 256 GB RAM
> - 8x Intel(R) Xeon(R) CPU E7- 4830
> Reporter: Miguel E. Coimbra
> Priority: Major
> Attachments: flink_debugging.PNG
>
>
> Hello,
> As per Fabian Hueske's advice on the mailing list, I am detailing the problem
> here.
> This happens on my code in both 1.5-SNAPSHOT and 1.6-SNAPSHOT but not on
> 1.4.2 (stable).
> I believe it might be some sort of regression which was introduced post
> 1.4.2.
> I'm getting different DataSet operators blocked on java.lang.Thread.State:
> WAITING for no apparent reason.
> I only tested this using a LocalEnvironment which is created like so:
> {code:java}
> final Configuration conf = new Configuration();
> conf.setString("web.log.path", logPath);
> conf.setString("jobmanager.rpc.address", "127.0.0.1");
> conf.setString("web.port", "8081-9000");
> conf.setString("query.server.ports", "2000-30000");
> conf.setString("query.proxy.ports", "30001-60000");
> LocalEnvironment lenv = (LocalEnvironment)
> ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
> {code}
> (also tried creating the LocalEnvironment without the web interface and it
> also happens)
> I have debugged with IntelliJ IDEA and obtained thread dumps from different
> executions, and realized quite a few operator threads are stuck on
> java.lang.Thread.State: WAITING.
> I cannot share my code at the moment, but essentially I have a series of jobs
> and some use common data (I made sure it was written to disk in job _i_ and
> read back from disk in job _i + 1_)
> There are three major threads that I find to be in this waiting state.
> I'm running on local mode with a parallelism of one.
> The thread dumps I obtained show me where the wait calls originated:
>
> {code:java}
> Number 1:
> "CHAIN Join (Join at selectEdges(GraphUtils.java:328)) -> Combine (Distinct
> at selectEdges(GraphUtils.java:330)) (1/1)@9158" prio=5 tid=0xd93 nid=NA
> waiting
> java.lang.Thread.State: WAITING
> at java.lang.Object.wait(Object.java:-1)
> at java.lang.Object.wait(Object.java:502)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
> at
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
> at
> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:java}
> Number 2:
> "Join (Join at summaryGraph(SummaryGraphBuilder.java:92)) (1/1)@9153" prio=5
> tid=0xd8e nid=NA waiting
> java.lang.Thread.State: WAITING
> at java.lang.Object.wait(Object.java:-1)
> at java.lang.Object.wait(Object.java:502)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
> at
> org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator.callWithNextKey(ReusingBuildSecondHashJoinIterator.java:122)
> at
> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> {code:java}
> Number 3:
> "Join (Join at selectEdges(GraphUtils.java:324)) (1/1)@9118" prio=5 tid=0xd75
> nid=NA waiting
> java.lang.Thread.State: WAITING
> at java.lang.Object.wait(Object.java:-1)
> at java.lang.Object.wait(Object.java:502)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:522)
> at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:491)
> at
> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
> at
> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
> at
> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)
> at
> org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:36)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable$ProbeIterator.next(MutableHashTable.java:1929)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:505)
> at
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:666)
> at
> org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator.callWithNextKey(ReusingBuildFirstHashJoinIterator.java:123)
> at
> org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:221)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> at java.lang.Thread.run(Thread.java:748){code}
>
> While I realize these dumps on their own may not be helpful, they at least
> (as far as I know) indicate that the threads are all waiting on something.
> But if it was resource scarcity I believe the program would terminate with
> an exception.
> And if it was garbage collection activity, I believe the JVM process would
> not be at 0% CPU usage.
> I realize I didn't provide the user-code code that generates the execution
> plan for Flink which led to the contexts in which the threads are waiting, my
> apologies. I will do so as soon a I get a chance.
> To highlight the symptoms:
> - The memory assigned to the JVM is fully used, but there are no exceptions
> about lack of memory (and the system had plenty more memory available).
> - The CPU usage is at 0% and all threads are all in a waiting state, but I
> don't understand what signal they're waiting for exactly.
> I noticed something suspicious as well: I have chains of operators where the
> first operator will ingest the expected amount of records but will not emit
> any, leaving the following operator empty in a "RUNNING" state (see attached
> image).
> I think we may consider there is some complexity in my scenario, at least
> when compared to samples in the Flink documentation. When visualizing the job
> plan, it is necessary to zoom in and out to check on specific parts of the
> execution scheme.
> Among the sequence of operations, I am:
> 1 - Creating a DataSet
> 2 - Using it as an initial workset in a DeltaIteration
> 2.1 - Joining the workset on each iteration with the edges of a graph
> 3 - Using the final solution set resulting from the DeltaIteration to build a
> graph and execute an algorithm over it (.run method).
> - The graph is not prohibitively big and I have a very low limit on the
> number of iterations (at most 4 or 5).
> I will add more information as soon as it is available.
> It seems, however, that there is some sort of lack of synchronization
> occurring and perhaps the operators _become isolated_?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)