Thanks, the workaround works flawlessly on my side.
On Thu, Sep 4, 2014 at 6:49 PM, Stephan Ewen <[email protected]> wrote: > The above suggested solution would work for the SuperstepKickoffLatch, but > unfortunately not for the other broker structures. > > So the problem does persist in other cases. > > A correct solution would be to prevent backpressure on forking flows, which > will come as part of the network-stack rewrite. That solves it properly, I > would prefer to go for that solution. > > A temporary workaround would be the following: > - Find the data sets that are consumed both inside the iteration and > outside the iteration. Those are typically preprocesses matrices or so. > - Duplicate that code to actually have two different subprograms > (producing different data sets) for that. > - Use a different data set inside the iteration and outside the iteration. > > Stephan > > > > > On Thu, Sep 4, 2014 at 6:14 PM, Márton Balassi <[email protected]> > wrote: > > > Thanks, Ufuk found the relevant part in the stacktrace: > > > > "Join(Sends the rows of p with multiple keys)) (1/1)" daemon prio=10 > > tid=0x00007f8928014800 nid=0x998 waiting on condition > [0x00007f8912eed000] > > java.lang.Thread.State: WAITING (parking) > > at sun.misc.Unsafe.park(Native Method) > > - parking to wait for <0x00000007d2668ea0> (a > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > > at > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) > > at > > > > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) > > at > > java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:374) > > at > > org.apache.flink.runtime.iterative.concurrent.Broker.get(Broker.java:63) > > at > > > > > org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:84) > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375) > > at > > > > > org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265) > > at java.lang.Thread.run(Thread.java:744) > > > > This part waits for the iteration head which has not been started yet and > > thus induces a deadlock. > > > > Opened a JIRA issue on it: > > https://issues.apache.org/jira/browse/FLINK-1088 > > > > Thanks for the quick response by the way! > > > > > > On Thu, Sep 4, 2014 at 5:39 PM, Fabian Hueske <[email protected]> > wrote: > > > > > Hi Marton, > > > > > > a jstack Java stacktrace can help to identify where the code got stuck. > > > Can you open a JIRA and post a stacktrace there? > > > > > > Cheers, Fabian > > > > > > > > > 2014-09-04 17:25 GMT+02:00 Márton Balassi <[email protected]>: > > > > > > > CPU load: > > > > > > > > Tested on my 4-core machine the CPU load spikes up at the beginning > of > > > the > > > > job and stays relatively high during the whole job when run with > > version > > > > 0.5, then finishes gracefully. On version 0.6 it works seemingly well > > > until > > > > the hangup. Interestingly enough even when no more log messages > appear > > my > > > > CPU utilization stays 10-15% higher per core then without running the > > > job. > > > > > > > > logs: > > > > > > > > For both the implementation it starts like this: > > > > > > > > 09/04/2014 17:05:51: Job execution switched to status SCHEDULED > > > > 09/04/2014 17:05:51: DataSource (CSV Input (|) > > > > /home/mbalassi/git/als-comparison/data/sampledb2b.csv.txt) (1/1) > > switched > > > > to SCHEDULED > > > > 09/04/2014 17:05:51: Reduce(Create q as a random matrix) (1/1) > switched > > > to > > > > SCHEDULED > > > > 09/04/2014 17:05:51: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: Fake Tail (1/1) switched to SCHEDULED > > > > 09/04/2014 17:05:51: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to SCHEDULED > > > > 09/04/2014 17:05:51: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to SCHEDULED > > > > > > > > [Omitted quite some healthy messages...] > > > > > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:53: Join(Sends the rows of p with multiple keys)) > > (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to READY > > > > 09/04/2014 17:05:53: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:53: Fake Tail (1/1) switched to STARTING > > > > 09/04/2014 17:05:54: CoGroup (For fixed p calculates optimal q) (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:05:54: Fake Tail (1/1) switched to RUNNING > > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:54: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to READY > > > > 09/04/2014 17:05:54: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:05:55: CoGroup (For fixed q calculates optimal p) (1/1) > > > > switched to RUNNING > > > > > > > > Flink stops here, Strato continues: > > > > > > > > 09/04/2014 17:09:01: DataSource(CSV Input (|)) (1/1) switched to > > > FINISHING > > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to READY > > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to STARTING > > > > 09/04/2014 17:09:02: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to RUNNING > > > > 09/04/2014 17:09:03: Reduce(Create q as a random matrix) (1/1) > switched > > > to > > > > FINISHING > > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > READY > > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > STARTING > > > > 09/04/2014 17:09:05: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > RUNNING > > > > 09/04/2014 17:09:09: Sync(BulkIteration (Bulk Iteration)) (1/1) > > switched > > > to > > > > FINISHING > > > > 09/04/2014 17:09:09: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > > > ) > > > > (1/1) switched to READY > > > > 09/04/2014 17:09:09: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@7ea742a1 > > > > ) > > > > (1/1) switched to STARTING > > > > > > > > [Omitted quite some healthy messages...] > > > > > > > > 09/04/2014 17:09:10: PartialSolution (BulkIteration (Bulk Iteration)) > > > (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:10: CoGroup(For fixed p calculates optimal q) (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:10: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > > ) > > > > (1/1) switched to RUNNING > > > > 09/04/2014 17:09:10: CoGroup(For fixed q calculates optimal p) (1/1) > > > > switched to FINISHING > > > > 09/04/2014 17:09:10: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > > ) > > > > (1/1) switched to FINISHING > > > > 09/04/2014 17:09:11: Join(Sends the columns of q with multiple keys) > > > (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:11: CoGroup(For fixed q calculates optimal p) (1/1) > > > > switched to FINISHED > > > > 09/04/2014 17:09:11: > > > > > > > > > > > > > > DataSink(hu.sztaki.ilab.cumulonimbus.als_comparison.strato.ColumnOutputFormatStrato@5dcde3f3 > > > > ) > > > > (1/1) switched to FINISHED > > > > 09/04/2014 17:09:11: Job execution switched to status FINISHED > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:33 PM, Ufuk Celebi <[email protected]> wrote: > > > > > > > > > Hey Marton, > > > > > > > > > > thanks for reporting the issue and the link to the repo to > reproduce > > > the > > > > > problem. I will look into it later today. > > > > > > > > > > If you like, you could provide some more information in the > meantime: > > > > > > > > > > - How the CPU load? > > > > > - What are TM logs saying? > > > > > - Can you give a stack trace? Where is it hanging? > > > > > > > > > > > > > > > > > > > > On Thu, Sep 4, 2014 at 3:14 PM, Márton Balassi < > > > [email protected] > > > > > > > > > > wrote: > > > > > > > > > > > Hey, > > > > > > > > > > > > We managed to produce a code, for which the legacy Stratophere > 0.5 > > > > > release > > > > > > implementation works nicely, however the updated Flink 0.6 > release > > > > > > implementation hangs up for slightly larger inputs. > > > > > > > > > > > > > > > > > > Please check out the issue here: > > > > > > https://github.com/mbalassi/als-comparison > > > > > > > > > > > > Any suggestions are welcome. > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Marton > > > > > > > > > > > > > > > > > > > > >
