This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch ignite-17607
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 27c15ab6aa1d9ad5ae2da5e82452403b1b543fa9
Author: Igor Sapego <[email protected]>
AuthorDate: Tue Mar 7 17:44:00 2023 +0300

    IGNITE-17607 Implement broadcast
---
 .../cpp/ignite/client/compute/compute.cpp          | 32 ++++++++
 .../platforms/cpp/ignite/client/compute/compute.h  | 40 ++++++++--
 .../cpp/ignite/client/network/cluster_node.h       | 85 ++++++++++++++++++++++
 .../cpp/tests/client-test/compute_test.cpp         |  2 +
 4 files changed, 154 insertions(+), 5 deletions(-)

diff --git a/modules/platforms/cpp/ignite/client/compute/compute.cpp 
b/modules/platforms/cpp/ignite/client/compute/compute.cpp
index 2869346abd..eab6b63092 100644
--- a/modules/platforms/cpp/ignite/client/compute/compute.cpp
+++ b/modules/platforms/cpp/ignite/client/compute/compute.cpp
@@ -49,6 +49,38 @@ void inline check_non_empty(const T &cont, const 
std::string& title) {
         throw ignite_error(title + " can not be empty");
 }
 
+void compute::broadcast_async(const std::set<cluster_node>& nodes, 
std::string_view job_class_name,
+    const std::vector<primitive>& args,
+    ignite_callback<std::map<cluster_node, 
ignite_result<std::optional<primitive>>>> callback) {
+    typedef std::map<cluster_node, ignite_result<std::optional<primitive>>> 
result_type;
+
+    check_non_empty(nodes, "Nodes set");
+    check_non_empty(job_class_name, "Job class name");
+
+    struct result_group {
+        explicit result_group(std::int32_t cnt, ignite_callback<result_type> 
&&cb) : m_cnt(cnt), m_callback(cb) {}
+
+        std::mutex m_mutex;
+        result_type m_res_map;
+        ignite_callback<result_type> m_callback;
+        std::int32_t m_cnt{0};
+    };
+
+    auto shared_res = 
std::make_shared<result_group>(std::int32_t(nodes.size()), std::move(callback));
+
+    for (const auto &node : nodes) {
+        m_impl->execute_on_one_node(node, job_class_name, args, [node, 
shared_res](auto &&res) {
+            auto &val = *shared_res;
+
+            std::lock_guard<std::mutex> lock(val.m_mutex);
+            val.m_res_map.emplace(node, res);
+            --val.m_cnt;
+            if (val.m_cnt == 0)
+                val.m_callback(std::move(val.m_res_map));
+        });
+    }
+}
+
 void compute::execute_async(const std::vector<cluster_node>& nodes, 
std::string_view job_class_name,
     const std::vector<primitive>& args, 
ignite_callback<std::optional<primitive>> callback) {
     check_non_empty(nodes, "Nodes container");
diff --git a/modules/platforms/cpp/ignite/client/compute/compute.h 
b/modules/platforms/cpp/ignite/client/compute/compute.h
index 5599ac3126..312bc04875 100644
--- a/modules/platforms/cpp/ignite/client/compute/compute.h
+++ b/modules/platforms/cpp/ignite/client/compute/compute.h
@@ -25,6 +25,9 @@
 
 #include <memory>
 #include <utility>
+#include <vector>
+#include <map>
+#include <set>
 
 namespace ignite {
 
@@ -61,14 +64,41 @@ public:
      * @param args Job arguments.
      * @return Job execution result.
      */
-    IGNITE_API std::optional<primitive> execute(std::vector<cluster_node> 
nodes, std::string_view job_class_name,
-        const std::vector<primitive>& args) {
-        return sync<std::optional<primitive>>(
-            [this, nodes = std::move(nodes), job_class_name, args](auto 
callback) mutable {
-            execute_async(std::move(nodes), job_class_name, args, 
std::move(callback));
+    IGNITE_API std::optional<primitive> execute(
+        const std::vector<cluster_node>& nodes, std::string_view 
job_class_name, const std::vector<primitive>& args) {
+        return sync<std::optional<primitive>>([this, nodes, job_class_name, 
args](auto callback) mutable {
+            execute_async(nodes, job_class_name, args, std::move(callback));
         });
     }
 
+    /**
+     * Executes a compute job represented by the given class on all of the 
specified nodes asynchronously.
+     *
+     * @param nodes Nodes to use for the job execution.
+     * @param job_class_name Java class name of the job to execute.
+     * @param args Job arguments.
+     * @param callback A callback called on operation completion with jobs 
execution result.
+     */
+    IGNITE_API void broadcast_async(const std::set<cluster_node>& nodes, 
std::string_view job_class_name,
+        const std::vector<primitive>& args,
+        ignite_callback<std::map<cluster_node, 
ignite_result<std::optional<primitive>>>> callback);
+
+    /**
+     * Executes a compute job represented by the given class on one of the 
specified nodes.
+     *
+     * @param nodes Nodes to use for the job execution.
+     * @param job_class_name Java class name of the job to execute.
+     * @param args Job arguments.
+     * @return Job execution result.
+     */
+    IGNITE_API std::map<cluster_node, ignite_result<std::optional<primitive>>> 
broadcast(
+        const std::set<cluster_node>& nodes, std::string_view job_class_name, 
const std::vector<primitive>& args) {
+        return sync<std::map<cluster_node, 
ignite_result<std::optional<primitive>>>>(
+            [this, nodes, job_class_name, args](auto callback) mutable {
+                broadcast_async(nodes, job_class_name, args, 
std::move(callback));
+            });
+    }
+
 private:
     /**
      * Constructor
diff --git a/modules/platforms/cpp/ignite/client/network/cluster_node.h 
b/modules/platforms/cpp/ignite/client/network/cluster_node.h
index 1392b17d6d..4e0a5f6e08 100644
--- a/modules/platforms/cpp/ignite/client/network/cluster_node.h
+++ b/modules/platforms/cpp/ignite/client/network/cluster_node.h
@@ -73,6 +73,25 @@ public:
         return m_address;
     }
 
+    /**
+     * Compare to another instance.
+     *
+     * @param other Another instance.
+     * @return Negative value if less, positive if larger and zero, if equals
+     *   another instance.
+     */
+    [[nodiscard]] int compare(const cluster_node &other) const {
+        auto name_comp = m_name.compare(other.m_name);
+        if (name_comp)
+            return name_comp;
+
+        auto id_comp = m_id.compare(other.m_id);
+        if (id_comp)
+            return id_comp;
+
+        return m_address.compare(other.m_address);
+    }
+
 private:
     /** Local ID. */
     std::string m_id{};
@@ -84,4 +103,70 @@ private:
     network::end_point m_address{};
 };
 
+/**
+ * Comparison operator.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ * @return True if equal.
+ */
+inline bool operator==(const cluster_node &val1, const cluster_node &val2) {
+    return val1.compare(val2) == 0;
+}
+
+/**
+ * Comparison operator.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ * @return True if not equal.
+ */
+inline bool operator!=(const cluster_node &val1, const cluster_node &val2) {
+    return !(val1 == val2);
+}
+
+/**
+ * Comparison operator.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ * @return True if less.
+ */
+inline bool operator<(const cluster_node &val1, const cluster_node &val2) {
+    return val1.compare(val2) < 0;
+}
+
+/**
+ * Comparison operator.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ * @return True if less or equal.
+ */
+inline bool operator<=(const cluster_node &val1, const cluster_node &val2) {
+    return val1.compare(val2) <= 0;
+}
+
+/**
+ * Comparison operator.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ * @return True if greater.
+ */
+inline bool operator>(const cluster_node &val1, const cluster_node &val2) {
+    return val1.compare(val2) > 0;
+}
+
+/**
+ * Comparison operator.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ * @return True if greater or equal.
+ */
+inline bool operator>=(const cluster_node &val1, const cluster_node &val2) {
+    return val1.compare(val2) >= 0;
+}
+
 } // namespace ignite
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp 
b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index b09fd1b11f..7cbd15cc85 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -100,3 +100,5 @@ TEST_F(compute_test, execute_on_specific_node) {
     EXPECT_EQ(res1.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + 
"-_11");
     EXPECT_EQ(res2.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + 
"_2:_22");
 }
+
+

Reply via email to