This is an automated email from the ASF dual-hosted git repository. isapego pushed a commit to branch ignite-17607 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 27c15ab6aa1d9ad5ae2da5e82452403b1b543fa9 Author: Igor Sapego <[email protected]> AuthorDate: Tue Mar 7 17:44:00 2023 +0300 IGNITE-17607 Implement broadcast --- .../cpp/ignite/client/compute/compute.cpp | 32 ++++++++ .../platforms/cpp/ignite/client/compute/compute.h | 40 ++++++++-- .../cpp/ignite/client/network/cluster_node.h | 85 ++++++++++++++++++++++ .../cpp/tests/client-test/compute_test.cpp | 2 + 4 files changed, 154 insertions(+), 5 deletions(-) diff --git a/modules/platforms/cpp/ignite/client/compute/compute.cpp b/modules/platforms/cpp/ignite/client/compute/compute.cpp index 2869346abd..eab6b63092 100644 --- a/modules/platforms/cpp/ignite/client/compute/compute.cpp +++ b/modules/platforms/cpp/ignite/client/compute/compute.cpp @@ -49,6 +49,38 @@ void inline check_non_empty(const T &cont, const std::string& title) { throw ignite_error(title + " can not be empty"); } +void compute::broadcast_async(const std::set<cluster_node>& nodes, std::string_view job_class_name, + const std::vector<primitive>& args, + ignite_callback<std::map<cluster_node, ignite_result<std::optional<primitive>>>> callback) { + typedef std::map<cluster_node, ignite_result<std::optional<primitive>>> result_type; + + check_non_empty(nodes, "Nodes set"); + check_non_empty(job_class_name, "Job class name"); + + struct result_group { + explicit result_group(std::int32_t cnt, ignite_callback<result_type> &&cb) : m_cnt(cnt), m_callback(cb) {} + + std::mutex m_mutex; + result_type m_res_map; + ignite_callback<result_type> m_callback; + std::int32_t m_cnt{0}; + }; + + auto shared_res = std::make_shared<result_group>(std::int32_t(nodes.size()), std::move(callback)); + + for (const auto &node : nodes) { + m_impl->execute_on_one_node(node, job_class_name, args, [node, shared_res](auto &&res) { + auto &val = *shared_res; + + std::lock_guard<std::mutex> lock(val.m_mutex); + val.m_res_map.emplace(node, res); + --val.m_cnt; + if (val.m_cnt == 0) + val.m_callback(std::move(val.m_res_map)); + }); + } +} + void compute::execute_async(const std::vector<cluster_node>& nodes, std::string_view job_class_name, const std::vector<primitive>& args, ignite_callback<std::optional<primitive>> callback) { check_non_empty(nodes, "Nodes container"); diff --git a/modules/platforms/cpp/ignite/client/compute/compute.h b/modules/platforms/cpp/ignite/client/compute/compute.h index 5599ac3126..312bc04875 100644 --- a/modules/platforms/cpp/ignite/client/compute/compute.h +++ b/modules/platforms/cpp/ignite/client/compute/compute.h @@ -25,6 +25,9 @@ #include <memory> #include <utility> +#include <vector> +#include <map> +#include <set> namespace ignite { @@ -61,14 +64,41 @@ public: * @param args Job arguments. * @return Job execution result. */ - IGNITE_API std::optional<primitive> execute(std::vector<cluster_node> nodes, std::string_view job_class_name, - const std::vector<primitive>& args) { - return sync<std::optional<primitive>>( - [this, nodes = std::move(nodes), job_class_name, args](auto callback) mutable { - execute_async(std::move(nodes), job_class_name, args, std::move(callback)); + IGNITE_API std::optional<primitive> execute( + const std::vector<cluster_node>& nodes, std::string_view job_class_name, const std::vector<primitive>& args) { + return sync<std::optional<primitive>>([this, nodes, job_class_name, args](auto callback) mutable { + execute_async(nodes, job_class_name, args, std::move(callback)); }); } + /** + * Executes a compute job represented by the given class on all of the specified nodes asynchronously. + * + * @param nodes Nodes to use for the job execution. + * @param job_class_name Java class name of the job to execute. + * @param args Job arguments. + * @param callback A callback called on operation completion with jobs execution result. + */ + IGNITE_API void broadcast_async(const std::set<cluster_node>& nodes, std::string_view job_class_name, + const std::vector<primitive>& args, + ignite_callback<std::map<cluster_node, ignite_result<std::optional<primitive>>>> callback); + + /** + * Executes a compute job represented by the given class on one of the specified nodes. + * + * @param nodes Nodes to use for the job execution. + * @param job_class_name Java class name of the job to execute. + * @param args Job arguments. + * @return Job execution result. + */ + IGNITE_API std::map<cluster_node, ignite_result<std::optional<primitive>>> broadcast( + const std::set<cluster_node>& nodes, std::string_view job_class_name, const std::vector<primitive>& args) { + return sync<std::map<cluster_node, ignite_result<std::optional<primitive>>>>( + [this, nodes, job_class_name, args](auto callback) mutable { + broadcast_async(nodes, job_class_name, args, std::move(callback)); + }); + } + private: /** * Constructor diff --git a/modules/platforms/cpp/ignite/client/network/cluster_node.h b/modules/platforms/cpp/ignite/client/network/cluster_node.h index 1392b17d6d..4e0a5f6e08 100644 --- a/modules/platforms/cpp/ignite/client/network/cluster_node.h +++ b/modules/platforms/cpp/ignite/client/network/cluster_node.h @@ -73,6 +73,25 @@ public: return m_address; } + /** + * Compare to another instance. + * + * @param other Another instance. + * @return Negative value if less, positive if larger and zero, if equals + * another instance. + */ + [[nodiscard]] int compare(const cluster_node &other) const { + auto name_comp = m_name.compare(other.m_name); + if (name_comp) + return name_comp; + + auto id_comp = m_id.compare(other.m_id); + if (id_comp) + return id_comp; + + return m_address.compare(other.m_address); + } + private: /** Local ID. */ std::string m_id{}; @@ -84,4 +103,70 @@ private: network::end_point m_address{}; }; +/** + * Comparison operator. + * + * @param val1 First value. + * @param val2 Second value. + * @return True if equal. + */ +inline bool operator==(const cluster_node &val1, const cluster_node &val2) { + return val1.compare(val2) == 0; +} + +/** + * Comparison operator. + * + * @param val1 First value. + * @param val2 Second value. + * @return True if not equal. + */ +inline bool operator!=(const cluster_node &val1, const cluster_node &val2) { + return !(val1 == val2); +} + +/** + * Comparison operator. + * + * @param val1 First value. + * @param val2 Second value. + * @return True if less. + */ +inline bool operator<(const cluster_node &val1, const cluster_node &val2) { + return val1.compare(val2) < 0; +} + +/** + * Comparison operator. + * + * @param val1 First value. + * @param val2 Second value. + * @return True if less or equal. + */ +inline bool operator<=(const cluster_node &val1, const cluster_node &val2) { + return val1.compare(val2) <= 0; +} + +/** + * Comparison operator. + * + * @param val1 First value. + * @param val2 Second value. + * @return True if greater. + */ +inline bool operator>(const cluster_node &val1, const cluster_node &val2) { + return val1.compare(val2) > 0; +} + +/** + * Comparison operator. + * + * @param val1 First value. + * @param val2 Second value. + * @return True if greater or equal. + */ +inline bool operator>=(const cluster_node &val1, const cluster_node &val2) { + return val1.compare(val2) >= 0; +} + } // namespace ignite diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp b/modules/platforms/cpp/tests/client-test/compute_test.cpp index b09fd1b11f..7cbd15cc85 100644 --- a/modules/platforms/cpp/tests/client-test/compute_test.cpp +++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp @@ -100,3 +100,5 @@ TEST_F(compute_test, execute_on_specific_node) { EXPECT_EQ(res1.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "-_11"); EXPECT_EQ(res2.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "_2:_22"); } + +
