This is an automated email from the ASF dual-hosted git repository. isapego pushed a commit to branch ignite-17607 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit aca718633ececced935c2935e792aff67b4b928f Author: Igor Sapego <[email protected]> AuthorDate: Mon Mar 20 17:23:08 2023 +0300 IGNITE-17607 Execute colocated, untested --- .../ignite/client/detail/compute/compute_impl.cpp | 82 ++++++++++++---------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp index ef845a3ff6..b6ebd4dc0b 100644 --- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp +++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp @@ -21,45 +21,62 @@ namespace ignite::detail { -void compute_impl::execute_on_one_node(cluster_node node, std::string_view job_class_name, - const std::vector<primitive>& args, ignite_callback<std::optional<primitive>> callback) { +/** + * Write a collection of primitives as a binary tuple. + * + * @param writer Writer to use. + * @param args Arguments. + */ +void write_primitives_as_binary_tuple(protocol::writer &writer, const std::vector<primitive>& args) { + auto args_num = std::int32_t(args.size()); - auto writer_func = [&node, job_class_name, args](protocol::writer &writer) { - writer.write(node.get_name()); - writer.write(job_class_name); + writer.write(args_num); - // TODO: Move to a separate function. - auto args_num = std::int32_t(args.size()); + binary_tuple_builder args_builder{args_num * 3}; - writer.write(args_num); + args_builder.start(); + for (const auto &arg : args) { + claim_primitive_with_type(args_builder, arg); + } - binary_tuple_builder args_builder{args_num * 3}; + args_builder.layout(); + for (const auto &arg : args) { + append_primitive_with_type(args_builder, arg); + } - args_builder.start(); - for (const auto &arg : args) { - claim_primitive_with_type(args_builder, arg); - } + auto args_data = args_builder.build(); + writer.write_binary(args_data); +} - args_builder.layout(); - for (const auto &arg : args) { - append_primitive_with_type(args_builder, arg); - } +/** + * Read primitive from a stream, which is encoded as a binary tuple. + * + * @param reader Reader. + * @return Value. + */ +std::optional<primitive> read_primitive_from_binary_tuple(protocol::reader &reader) { + auto tuple_data = reader.read_binary(); + binary_tuple_parser parser(3, tuple_data); + + auto typ = ignite_type(binary_tuple_parser::get_int32(parser.get_next().value())); + auto scale = binary_tuple_parser::get_int32(parser.get_next().value()); + return read_next_column(parser, typ, scale); +} + +void compute_impl::execute_on_one_node(cluster_node node, std::string_view job_class_name, + const std::vector<primitive>& args, ignite_callback<std::optional<primitive>> callback) { - auto args_data = args_builder.build(); - writer.write_binary(args_data); + auto writer_func = [&node, job_class_name, args](protocol::writer &writer) { + writer.write(node.get_name()); + writer.write(job_class_name); + write_primitives_as_binary_tuple(writer, args); }; auto reader_func = [](protocol::reader &reader) -> std::optional<primitive> { if (reader.try_read_nil()) return std::nullopt; - auto tuple_data = reader.read_binary(); - binary_tuple_parser parser(3, tuple_data); - - // TODO: Move to separate func - auto typ = ignite_type(binary_tuple_parser::get_int32(parser.get_next().value())); - auto scale = binary_tuple_parser::get_int32(parser.get_next().value()); - return read_next_column(parser, typ, scale); + return read_primitive_from_binary_tuple(reader); }; m_connection->perform_request<std::optional<primitive>>( @@ -86,26 +103,19 @@ void compute_impl::execute_colocated_async(const std::string &table_name, const table->template with_latest_schema_async<std::optional<primitive>>(std::move(callback), [table, key = std::move(key), job = std::move(job), args = std::move(args), conn] // NOLINT(performance-move-const-arg) (const schema& sch, auto callback) mutable { - auto writer_func = [&key, &sch, &table, &job](protocol::writer &writer) { + auto writer_func = [&key, &sch, &table, &job, &args](protocol::writer &writer) { writer.write(table->get_id()); writer.write(sch.version); write_tuple(writer, sch, key, true); writer.write(job); - // TODO: write arguments. + write_primitives_as_binary_tuple(writer, args); }; auto reader_func = [](protocol::reader &reader) -> std::optional<primitive> { if (reader.try_read_nil()) return std::nullopt; - // TODO: Tuple to object. - auto tuple_data = reader.read_binary(); - ignite_tuple res(3); - binary_tuple_parser parser(3, tuple_data); - auto typ = column_type(binary_tuple_parser::get_int32(*parser.get_next())); - auto scale = binary_tuple_parser::get_int32(*parser.get_next()); - - return read_next_column(parser, typ, scale); + return read_primitive_from_binary_tuple(reader); }; conn->perform_request<std::optional<primitive>>(
