Hi All,
I'm looking for some help and guidance regarding passing along data between
localities. I've got some ideas which I think would work, but I'd really
like some feedback, guidance and/or suggestions on this.
As a very brief background note-
after noting that the performance of the first version of my code was not
scaling as expected when run on multiple localities, I concluded that I was
trying to move too much data between localities (rather than the
generally-recommended moving actions to the data.). Moving the data took so
long that it cancelled out any advantage from additional processing cores.
I will rectify this by assigning a given amount of data to each locality,
rather than keeping it all in a single, central locality (which was calling
actions - with the data as an argument - on each locality in a round-robin).
However, this then requires the localities to share information between each
other. I got a little stuck trying to figure out [ (how) | (the best way) ]
to implement this. I've come up with a few concepts but I'd like to get
some feedback from those wiser and more experienced on this.
One more background note, although I'm not sure it's relevant to this matter
- I'm expecting this to run in a heterogenous cluster, so I expect a fairly
large variance regarding how long an iteration takes across the entire
cluster. Obviously, I want to ensure that this communication is handled
asynchronously and does not cause blocking on either the sending or
receiving ends.
In general, all of the localities run the following conceptual loop:
a. Test for convergence
b. Run current iteration
c. Migrate data / integrate data, if required
There are 3 communications scenarios I need to account for:
1. Broadcasting some information to all other nodes. For example, if
the convergence tests succeed on one node, notify other nodes that they can
stop processing.
2. Send interim results to primary node for aggregation and reporting
3. Migrate data conditionally
I am going to assume - for now, at least - that the *best* approach for each
scenario might be different, so I've separated the details / my thoughts
into separate sections below.
## Broadcasting information to other nodes
It seems to me that this could handled fairly simply if each component had:
* Boolean flag
* Action to set flag
This way, each node would test for convergence and if the test succeeds, I
can use broadcast to call this action on all of the other nodes, something
like:
hpx::lcos::broadcast_apply<set_converged_flag_action>(hpx::find_remote_local
ities() );
## Send interim results to primary node for aggregation and reporting
I spent a while typing up a rough proposal for an idea, and ended up with a
clumsy analog of receive_buffer ( I think -- I'm pretty sure that this is
what receive_buffer is intended to do, although that class isn't documented
other than a regression test.)
## Migrate data conditionally between nodes
My expectations are that
* I would like to send data based on some to-be-defined criteria which
will likely be 'data-centric', not based on a fixed number of iterations.
It is most likely that the receiving node will initiate the transfer (i.e.
request data from one or more adjacent nodes).
* I do not want the program to block waiting for data. I am
envisioning a test in the main control loop, which, if deemed necessary,
would send a request to one or more adjacent nodes for data, and continue
looping. Once all of the new data has been received, then it can be
incorporated at the beginning of the following iteration. To be 100% clear,
as an example: at iteration x, node N_d decides it needs new data from nodes
N_c and N_e and executes an action on those 2 localities, then immediately
starts iteration x+1. Some time during x+1, the data from N_e arrives.
Then, the data from N_c arrives in the middle of iteration X+2. The new
data would be integrated into node N_d at the very beginning of iteration
x+3.
* Integrating received data cannot happen at an arbitrary time
determined by the thread scheduler - this would cause major problems with
the code execution. In my mind, this means that the continuation for this
future (or multiple futures) causes the data to be staged into a temporary
structure (e.g. a queue).
After mulling over this problem in my mind for hours, I think there are a
couple possibilities:
1. Have the receiving node broadcast an action to the relevant nodes.
For example:
Future<vector<data_type> > data_received =
hpx::lcos::broadcast(adjacent_nodes, migrate_data_action);
data_received.then([=](vector<data_type> incoming_data){
incoming_data_stage.push_back(std::move(incoming_data)); }
2. I could probably also use receive_buffer: since the data transfer is
initiated by the receiving node, I know which was the last index used for
receiving data. This time, I would not use broadcast, but rather iterate
over the vector of localities, calling a remote action on each node and
providing the index to use. Then, I could create a vector of futures from
calling receive on those same indices, and use when_all to set up the
continuation, as before.
3. My first thought was to use channels; however, I don't know how this
could possibly work. Among other things, I don't know whether it is
possible to check if the channel is empty (testing for c.start() == c.end()
definitely doesn't work).
I apologize if this message is overly verbose - I was aiming for clarity,
but I tend to write too much in general. I'd greatly appreciate any advice
on how to best handle these cases. Also, any feedback on my rough concepts
would be sincerely appreciated.
Thanks and best regards,
Shmuel Levine
_______________________________________________
hpx-users mailing list
[email protected]
https://mail.cct.lsu.edu/mailman/listinfo/hpx-users