Hey Fabian,

I am aware of the way open, preSuperstep(), postSuperstep() etc can help me
within an interation, unfortunately I am writing my own method here. I
could try to briefly describe it:

public static final class PropagateNeighborValues implements
NeighborsFunctionWithVertexValue(...) {
    @Override
    public void iterateNeighbors(Iterable..., Collector...) {
             while(iterator.hasNext) neighbors++;
             // and I would need something like
             appendToFile(myAwesomeFile, neighbors);
    }
}

Open() and synchronised are definitely not doing the trick for me right
now.
Any other way !? :(

On Mon, Jun 29, 2015 at 11:36 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> You can measure the time of each iteration in the open() methods operators
> within an iteration. open() will be called before each iteration.
> The times can be collected by either printing to std out (you need to
> collect the files then...) or by implementing a list accumulator. Each time
> should include the iteration# und parallel task id.
> After the execution, the acculuator will be available in the execution
> result.
>
> Accumulators can of course also be used to collect number of messages, etc.
>
> Best, Fabian
>
> 2015-06-29 9:55 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
> > Why don't you use Flink dataset output functions (like writeAsText,
> > writeAsCsv, etc..)?
> > Or if they are not sufficient you can implement/override your own
> > InputFormat.
> >
> > From what is my experience static variables are evil in distributed
> > environments..
> > Moreover, one of the main strengths of Flink are its input/output APIs
> so I
> > would avoid to write to a file in that way.
> >
> > Of course, dataset.append() will be a very convenient API to add (IMHO).
> >
> > Best,
> > Flavio
> >
> >
> > On Sat, Jun 27, 2015 at 4:19 PM, Andra Lungu <lungu.an...@gmail.com>
> > wrote:
> >
> > > Hey guys,
> > >
> > > Me again :) So now that my wonderful job finishes, I would like to
> > monitor
> > > it a bit (i.e. build some charts on the number of messages per vertex,
> > > compute the total amount of time elapsed per computation per vertex,
> > etc).
> > >
> > > The main computational-intensive operation is a coGroup. There, within
> > the
> > > iteration I count the number of "messages" sent and then I do simple:
> > >
> > > Files.append(messages, messagesTempFile, Charsets.UTF_8);
> > >
> > > The problem is that with this approach, I get a deadlock (yes!! Now
> that
> > I
> > > know the code itself works I am positive that the deadlock comes from
> the
> > > append -this regarding my previous mail-). It is normal if you come to
> > > think of it 200 something threads are trying to write to the same
> file...
> > >
> > > A possible workaround is this one:
> > >
> > > public class Singleton {
> > >     private static final Singleton inst= new Singleton();
> > >
> > >     private Singleton() {
> > >         super();
> > >     }
> > >
> > >     public synchronized void writeToFile(String str) {
> > >         // Do whatever
> > >     }
> > >
> > >     public Singleton getInstance() {
> > >         return inst;
> > >     }
> > > }
> > >
> > > Singleton.getInstance().writeToFile("Hello!!");
> > >
> > > However, I am not sure how well Flink plays with synchronised....
> > >
> > > Is there a smarter way to do it?
> > >
> > > Thanks!
> > >
> > > Andra
> > >
> >
>

Reply via email to