This is an automated email from the ASF dual-hosted git repository.

isapego pushed a commit to branch ignite-17607
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit aca718633ececced935c2935e792aff67b4b928f
Author: Igor Sapego <[email protected]>
AuthorDate: Mon Mar 20 17:23:08 2023 +0300

    IGNITE-17607 Execute colocated, untested
---
 .../ignite/client/detail/compute/compute_impl.cpp  | 82 ++++++++++++----------
 1 file changed, 46 insertions(+), 36 deletions(-)

diff --git 
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp 
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
index ef845a3ff6..b6ebd4dc0b 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -21,45 +21,62 @@
 
 namespace ignite::detail {
 
-void compute_impl::execute_on_one_node(cluster_node node, std::string_view 
job_class_name,
-    const std::vector<primitive>& args, 
ignite_callback<std::optional<primitive>> callback) {
+/**
+ * Write a collection of primitives as a binary tuple.
+ *
+ * @param writer Writer to use.
+ * @param args Arguments.
+ */
+void write_primitives_as_binary_tuple(protocol::writer &writer, const 
std::vector<primitive>& args) {
+    auto args_num = std::int32_t(args.size());
 
-    auto writer_func = [&node, job_class_name, args](protocol::writer &writer) 
{
-        writer.write(node.get_name());
-        writer.write(job_class_name);
+    writer.write(args_num);
 
-        // TODO: Move to a separate function.
-        auto args_num = std::int32_t(args.size());
+    binary_tuple_builder args_builder{args_num * 3};
 
-        writer.write(args_num);
+    args_builder.start();
+    for (const auto &arg : args) {
+        claim_primitive_with_type(args_builder, arg);
+    }
 
-        binary_tuple_builder args_builder{args_num * 3};
+    args_builder.layout();
+    for (const auto &arg : args) {
+        append_primitive_with_type(args_builder, arg);
+    }
 
-        args_builder.start();
-        for (const auto &arg : args) {
-            claim_primitive_with_type(args_builder, arg);
-        }
+    auto args_data = args_builder.build();
+    writer.write_binary(args_data);
+}
 
-        args_builder.layout();
-        for (const auto &arg : args) {
-            append_primitive_with_type(args_builder, arg);
-        }
+/**
+ * Read primitive from a stream, which is encoded as a binary tuple.
+ *
+ * @param reader Reader.
+ * @return Value.
+ */
+std::optional<primitive> read_primitive_from_binary_tuple(protocol::reader 
&reader) {
+    auto tuple_data = reader.read_binary();
+    binary_tuple_parser parser(3, tuple_data);
+
+    auto typ = 
ignite_type(binary_tuple_parser::get_int32(parser.get_next().value()));
+    auto scale = binary_tuple_parser::get_int32(parser.get_next().value());
+    return read_next_column(parser, typ, scale);
+}
+
+void compute_impl::execute_on_one_node(cluster_node node, std::string_view 
job_class_name,
+    const std::vector<primitive>& args, 
ignite_callback<std::optional<primitive>> callback) {
 
-        auto args_data = args_builder.build();
-        writer.write_binary(args_data);
+    auto writer_func = [&node, job_class_name, args](protocol::writer &writer) 
{
+        writer.write(node.get_name());
+        writer.write(job_class_name);
+        write_primitives_as_binary_tuple(writer, args);
     };
 
     auto reader_func = [](protocol::reader &reader) -> 
std::optional<primitive> {
         if (reader.try_read_nil())
             return std::nullopt;
 
-        auto tuple_data = reader.read_binary();
-        binary_tuple_parser parser(3, tuple_data);
-
-        // TODO: Move to separate func
-        auto typ = 
ignite_type(binary_tuple_parser::get_int32(parser.get_next().value()));
-        auto scale = binary_tuple_parser::get_int32(parser.get_next().value());
-        return read_next_column(parser, typ, scale);
+        return read_primitive_from_binary_tuple(reader);
     };
 
     m_connection->perform_request<std::optional<primitive>>(
@@ -86,26 +103,19 @@ void compute_impl::execute_colocated_async(const 
std::string &table_name, const
         table->template 
with_latest_schema_async<std::optional<primitive>>(std::move(callback),
             [table, key = std::move(key), job = std::move(job), args = 
std::move(args), conn] // NOLINT(performance-move-const-arg)
             (const schema& sch, auto callback) mutable {
-                auto writer_func = [&key, &sch, &table, &job](protocol::writer 
&writer) {
+                auto writer_func = [&key, &sch, &table, &job, 
&args](protocol::writer &writer) {
                     writer.write(table->get_id());
                     writer.write(sch.version);
                     write_tuple(writer, sch, key, true);
                     writer.write(job);
-                    // TODO: write arguments.
+                    write_primitives_as_binary_tuple(writer, args);
                 };
 
                 auto reader_func = [](protocol::reader &reader) -> 
std::optional<primitive> {
                     if (reader.try_read_nil())
                         return std::nullopt;
 
-                    // TODO: Tuple to object.
-                    auto tuple_data = reader.read_binary();
-                    ignite_tuple res(3);
-                    binary_tuple_parser parser(3, tuple_data);
-                    auto typ = 
column_type(binary_tuple_parser::get_int32(*parser.get_next()));
-                    auto scale = 
binary_tuple_parser::get_int32(*parser.get_next());
-
-                    return read_next_column(parser, typ, scale);
+                    return read_primitive_from_binary_tuple(reader);
                 };
 
                 conn->perform_request<std::optional<primitive>>(

Reply via email to