Hi Thomas, Thank you very much for your prompt response. It was extremely informative and tremendously helpful.
I've got a couple follow-up questions below. On Fri, Dec 8, 2017 at 2:30 AM, Thomas Heller <[email protected]> wrote: > Hi Shmuel, > > Am 07.12.2017 2:08 nachm. schrieb "Shmuel Levine" <[email protected] > >: > > > ## Broadcasting information to other nodes > > > > It seems to me that this could handled fairly simply if each component had: > > - Boolean flag > - Action to set flag > > > > This way, each node would test for convergence and if the test succeeds, I > can use broadcast to call this action on all of the other nodes, something > like: > hpx::lcos::broadcast_apply<set_converged_flag_action>(hpx::find_remote_localities() > ); > > > That sounds like the best shot, yes. Some more suggestions here: > You can use your component ids directly here. One good approach would be > to register components with a basename and retrieve the vector using that > same basename. More information about that can be found here: > https://stellar-group.github.io/hpx/docs/html/header/hpx/ > runtime/basename_registration_hpp.html > Thanks! The documentation has only this form of the function: std::vector<hpx::future<hpx::id_type> > find_all_from_basename(std::string const& base_name, std::size_t num_ids); Whereas in the actual file <hpx/runtime/basename_registration.hpp> (at least for a fairly current commit) also includes the form: template <typename Client> std::vector<Client> find_all_from_basename(std::string const& base_name, std::size_t num_ids); If I understand correctly, I would want to use the second form, with an explicit template argument for the client-side component class. Then I can iterate through the vector and apply the action? > > The other suggestion is to use a direct action to set the flag in your > component, they don't spawn a separate thread when being invoked on the > remote end. > Firstly, thanks for that little tidbit about direct actions. I've been wondering for the longest time as to the difference between the two. There is no mention at all in the documentation. Also, just to be sure I understand, this suggestion is complementary to the first - i.e. this action should use the 'direct' versions of the macros? > > > > ## Send interim results to primary node for aggregation and reporting > > > > I spent a while typing up a rough proposal for an idea, and ended up with > a clumsy analog of receive_buffer ( I think -- I’m pretty sure that this is > what receive_buffer is intended to do, although that class isn’t documented > other than a regression test.) > > Instead of using receive_buffer, you should use hpx::lcos::channel. It > uses receive_buffer internally and offers the abstractions you need. > Okay, makes sense. I only looked at the documentation ( https://stellar-group.github.io/hpx/docs/html/hpx.html#hpx.manual.lcos) which did not make mention of the generation parameter. As for using it... I am a bit unsure how to write this code. On the sending side, since every component instance has a send_channel, I can create one such channel at an appropriate scope, using the basename to set the 'target' of the channel. On the receiving side, the only way I can think of having receive channels only for the primary locality would be to instantiate a new channel at every iteration. Is this correct? Would there be any potential issues with the 'generation' parameter, given that the sending side is long-lived, but the receiving end is newly instantiated? Given what I understood of the receive_buffer code, I think this should be okay, but then-again, I'm not completely sure.... Or is there a better way to do this? > > > ## Migrate data conditionally between nodes > > > > My expectations are that > > - I would like to send data based on some to-be-defined criteria which > will likely be ‘data-centric’, not based on a fixed number of iterations. > It is most likely that the receiving node will initiate the transfer (i.e. > request data from one or more adjacent nodes). > - I do not want the program to block waiting for data. I am > envisioning a test in the main control loop, which, if deemed necessary, > would send a request to one or more adjacent nodes for data, and continue > looping. Once all of the new data has been received, then it can be > incorporated at the beginning of the following iteration. To be 100% > clear, as an example: at iteration x, node N_d decides it needs new data > from nodes N_c and N_e and executes an action on those 2 localities, then > immediately starts iteration x+1. Some time during x+1, the data from N_e > arrives. Then, the data from N_c arrives in the middle of iteration X+2. > The new data would be integrated into node N_d at the very beginning of > iteration x+3. > - Integrating received data cannot happen at an arbitrary time > determined by the thread scheduler – this would cause major problems with > the code execution. In my mind, this means that the continuation for this > future (or multiple futures) causes the data to be staged into a temporary > structure (e.g. a queue). > > > > After mulling over this problem in my mind for hours, I think there are a > couple possibilities: > > > > 1. Have the receiving node broadcast an action to the relevant nodes. > For example: > > > > Future<vector<data_type> > data_received = > hpx::lcos::broadcast(adjacent_nodes, > migrate_data_action); > > data_received.then([=](vector<data_type> incoming_data){ > incoming_data_stage.push_back(std::move(incoming_data)); } > > > This sounds good. > > > > 1. I could probably also use receive_buffer: since the data transfer > is initiated by the receiving node, I know which was the last index used > for receiving data. This time, I would not use broadcast, but rather > iterate over the vector of localities, calling a remote action on each node > and providing the index to use. Then, I could create a vector of futures > from calling receive on those same indices, and use when_all to set up the > continuation, as before. > > I think that's overkill if you pull the data (instead of pushing). > Do you mean overkill only in the sense that it's an overly-complicated way of managing data migration where I would only be pulling data? Or that it is also likely to add significant runtime overhead? I thought of the above approach when considering how to handle the case of pushing data - although the more I think of it, the less I like the idea.... Nevertheless, I am curious whether this approach is viable in that case. > > 1. My first thought was to use channels; however, I don’t know how > this could possibly work. Among other things, I don’t know whether it is > possible to check if the channel is empty (testing for c.start() == c.end() > definitely doesn’t work). > > > > > > I apologize if this message is overly verbose – I was aiming for clarity, > but I tend to write too much in general… I’d greatly appreciate any advice > on how to best handle these cases. Also, any feedback on my rough concepts > would be sincerely appreciated. > > I hope that I provided enough feedback to get you going. I'm not entirely > sure what to answer to your last question. I think what you envisioned in > the first point is a good start! > Thanks a lot for the feedback, it is extremely helpful. It's also gratifying to hear that I've got the right general idea. > If the data you send around are PODs, then it might make sense to use > serialize_buffer to exchange data. It's optimized for zero copy transfers. > The data are vectors of large matrices with the underlying data contained in a serialize_buffer. My matrix class contains a Matrix_Data class with a custom allocator (so that I can continually reuse the malloc'd blocks of memory for each matrix size) and a serialize_buffer. The allocator is a static member of Matrix_Data. Since you've brought up the serialize_buffer, it got me wondering if there would be any potential issues with having a custom allocator in conjunction with the serialize_buffer? My allocator itself isn't overly complex. I expect to have a limited number of different matrix sizes. Each matrix size has its own free list. The second map allows me to determine which free list I to use when deallocating: using free_list_map_type = std::map<int64_t, boost::lockfree::queue<T*>>; using allocation_list_type = std::map<T *, int64_t>; Matrix_Allocator::deallocate(Matrix_Allocator::T *ptr) { auto size = allocation_list_[ptr]; free_list_[size].push(ptr); } Matrix_Allocator::T* Matrix_Allocator::allocate(int64_t n) { if (n <= 0) HPX_THROW_EXCEPTION(hpx::bad_parameter, "Matrix_Allocator::allocate(int64_t)", "Cannot allocate a negative number of bytes"); using mutex_type_ = hpx::lcos::local::recursive_mutex; static mutex_type_ alloc_mutex; T *addr_; if (free_list_[n].pop(addr_)) return addr_; // There was an available memory block to re-use else { addr_ = reinterpret_cast<T *>(MKL_malloc( n * sizeof(T), fx::core::detail::Matrix_Allocator::alignment)); if (addr_ == nullptr) HPX_THROW_EXCEPTION(hpx::out_of_memory, "Matrix_Allocator::allocate(int64_t)", "Cannot allocate requested number of bytes"); { std::lock_guard<mutex_type_> l(alloc_mutex); allocation_list_[addr_] = n; } } // Let the lock_guard go out of scope and release the lock return addr_; } I thought this would be the most sensible approach, making best of use memory (i.e. being able to 'right-size' the malloc'd blocks for a given matrix dimension) while allowing very cheap matrix construction/destruction. I initialize the serialize_buffer in the Matrix_Data constructor as follows: Matrix_Data::Matrix_Data(int64_t elements) : data_buffer_elements_(elements), data_{alloc_.allocate(data_buffer_elements_ * sizeof(data_type)), static_cast<size_t>(data_buffer_elements_), buffer_type::take, &Matrix_Data::deallocate} { } // Matrix_Data::deallocate is a static function forwarding a pointer to the allocator's deallocate(T*) function. This more-or-less uses the same approach as in the Futurization example in the documentation. Once again, thanks again so much for your help. > > > > Thanks and best regards, > > Shmuel Levine > > _______________________________________________ > hpx-users mailing list > [email protected] > https://mail.cct.lsu.edu/mailman/listinfo/hpx-users > > > > _______________________________________________ > hpx-users mailing list > [email protected] > https://mail.cct.lsu.edu/mailman/listinfo/hpx-users > >
_______________________________________________ hpx-users mailing list [email protected] https://mail.cct.lsu.edu/mailman/listinfo/hpx-users
