This is an automated email from the ASF dual-hosted git repository. isapego pushed a commit to branch IGNITE-22478 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 0c344f9e7183258def4c82bb7b94e9f4899be226 Author: Igor Sapego <[email protected]> AuthorDate: Fri Jun 28 17:15:06 2024 +0200 IGNITE-22478 Compute argument -> single --- .../cpp/ignite/client/compute/compute.cpp | 13 +-- .../platforms/cpp/ignite/client/compute/compute.h | 31 ++--- .../ignite/client/detail/compute/compute_impl.cpp | 35 +++--- .../ignite/client/detail/compute/compute_impl.h | 9 +- .../platforms/cpp/ignite/common/binary_object.h | 52 +++++++++ .../cpp/tests/client-test/compute_test.cpp | 129 ++++++++++----------- 6 files changed, 151 insertions(+), 118 deletions(-) diff --git a/modules/platforms/cpp/ignite/client/compute/compute.cpp b/modules/platforms/cpp/ignite/client/compute/compute.cpp index d8a5a1242c..21d9ac31e0 100644 --- a/modules/platforms/cpp/ignite/client/compute/compute.cpp +++ b/modules/platforms/cpp/ignite/client/compute/compute.cpp @@ -22,15 +22,15 @@ namespace ignite { void compute::submit_async(const std::vector<cluster_node> &nodes, std::shared_ptr<job_descriptor> descriptor, - const std::vector<primitive> &args, ignite_callback<job_execution> callback) { + const binary_object &arg, ignite_callback<job_execution> callback) { detail::arg_check::container_non_empty(nodes, "Nodes container"); detail::arg_check::container_non_empty(descriptor->get_job_class_name(), "Job class name"); - m_impl->submit_to_nodes(nodes, descriptor, args, std::move(callback)); + m_impl->submit_to_nodes(nodes, descriptor, arg, std::move(callback)); } void compute::submit_broadcast_async(const std::set<cluster_node> &nodes, std::shared_ptr<job_descriptor> descriptor, - const std::vector<primitive> &args, + const binary_object &arg, ignite_callback<std::map<cluster_node, ignite_result<job_execution>>> callback) { typedef std::map<cluster_node, ignite_result<job_execution>> result_type; @@ -52,7 +52,7 @@ void compute::submit_broadcast_async(const std::set<cluster_node> &nodes, std::s for (const auto &node : nodes) { std::vector<cluster_node> candidates = {node}; - m_impl->submit_to_nodes(candidates, descriptor, args, [node, shared_res](auto &&res) { + m_impl->submit_to_nodes(candidates, descriptor, arg, [node, shared_res](auto &&res) { auto &val = *shared_res; std::lock_guard<std::mutex> lock(val.m_mutex); @@ -65,14 +65,13 @@ void compute::submit_broadcast_async(const std::set<cluster_node> &nodes, std::s } void compute::submit_colocated_async(std::string_view table_name, const ignite_tuple &key, - std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive> &args, - ignite_callback<job_execution> callback) { + std::shared_ptr<job_descriptor> descriptor, const binary_object &arg, ignite_callback<job_execution> callback) { detail::arg_check::container_non_empty(table_name, "Table name"); detail::arg_check::tuple_non_empty(key, "Key tuple"); detail::arg_check::container_non_empty(descriptor->get_job_class_name(), "Job class name"); m_impl->submit_colocated_async( - std::string(table_name), key, descriptor, args, std::move(callback)); + std::string(table_name), key, descriptor, arg, std::move(callback)); } } // namespace ignite diff --git a/modules/platforms/cpp/ignite/client/compute/compute.h b/modules/platforms/cpp/ignite/client/compute/compute.h index 4108d1cc44..1c62618183 100644 --- a/modules/platforms/cpp/ignite/client/compute/compute.h +++ b/modules/platforms/cpp/ignite/client/compute/compute.h @@ -24,6 +24,7 @@ #include "ignite/client/network/cluster_node.h" #include "ignite/client/table/ignite_tuple.h" #include "ignite/client/transaction/transaction.h" +#include "ignite/common/binary_object.h" #include "ignite/common/detail/config.h" #include "ignite/common/ignite_result.h" #include "ignite/common/primitive.h" @@ -56,24 +57,24 @@ public: * * @param nodes Nodes to use for the job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @param callback A callback called on operation completion with job execution result. */ IGNITE_API void submit_async(const std::vector<cluster_node> &nodes, std::shared_ptr<job_descriptor> descriptor, - const std::vector<primitive> &args, ignite_callback<job_execution> callback); + const binary_object &arg, ignite_callback<job_execution> callback); /** * Submits for execution a compute job represented by the given class on one of the specified nodes. * * @param nodes Nodes to use for the job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @return Job execution result. */ IGNITE_API job_execution submit(const std::vector<cluster_node> &nodes, std::shared_ptr<job_descriptor> descriptor, - const std::vector<primitive> &args) { + const binary_object &arg) { return sync<job_execution>([&](auto callback) mutable { - submit_async(nodes, descriptor, args, std::move(callback)); + submit_async(nodes, descriptor, arg, std::move(callback)); }); } @@ -82,11 +83,11 @@ public: * * @param nodes Nodes to use for the job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @param callback A callback called on operation completion with jobs execution result. */ IGNITE_API void submit_broadcast_async(const std::set<cluster_node> &nodes, - std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive> &args, + std::shared_ptr<job_descriptor> descriptor, const binary_object &arg, ignite_callback<std::map<cluster_node, ignite_result<job_execution>>> callback); /** @@ -94,14 +95,14 @@ public: * * @param nodes Nodes to use for the job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @return Job execution result. */ IGNITE_API std::map<cluster_node, ignite_result<job_execution>> submit_broadcast( const std::set<cluster_node> &nodes, std::shared_ptr<job_descriptor> descriptor, - const std::vector<primitive> &args) { + const binary_object &arg) { return sync<std::map<cluster_node, ignite_result<job_execution>>>([&](auto callback) mutable { - submit_broadcast_async(nodes, descriptor, args, std::move(callback)); + submit_broadcast_async(nodes, descriptor, arg, std::move(callback)); }); } @@ -111,11 +112,11 @@ public: * @param table_name Name of the table to be used with @c key to determine target node. * @param key Table key to be used to determine the target node for job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @param callback A callback called on operation completion with job execution result. */ IGNITE_API void submit_colocated_async(std::string_view table_name, const ignite_tuple &key, - std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive> &args, + std::shared_ptr<job_descriptor> descriptor, const binary_object &arg, ignite_callback<job_execution> callback); /** @@ -124,13 +125,13 @@ public: * @param table_name Name of the table to be used with @c key to determine target node. * @param key Table key to be used to determine the target node for job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @return Job execution result. */ IGNITE_API job_execution submit_colocated(std::string_view table_name, const ignite_tuple &key, - std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive> &args) { + std::shared_ptr<job_descriptor> descriptor, const binary_object &arg) { return sync<job_execution>([&](auto callback) mutable { - submit_colocated_async(table_name, key, descriptor, args, std::move(callback)); + submit_colocated_async(table_name, key, descriptor, arg, std::move(callback)); }); } diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp index 52903bcd67..9439f794cb 100644 --- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp +++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp @@ -28,24 +28,15 @@ namespace ignite::detail { * Write a collection of primitives as a binary tuple. * * @param writer Writer to use. - * @param args Arguments. + * @param arg Argument. */ -void write_primitives_as_binary_tuple(protocol::writer &writer, const std::vector<primitive> &args) { - auto args_num = std::int32_t(args.size()); - - writer.write(args_num); - - binary_tuple_builder args_builder{args_num * 3}; +void write_object_as_binary_tuple(protocol::writer &writer, const binary_object &arg) { + binary_tuple_builder args_builder{1}; args_builder.start(); - for (const auto &arg : args) { - protocol::claim_primitive_with_type(args_builder, arg); - } - + protocol::claim_primitive_with_type(args_builder, arg.get_primitive()); args_builder.layout(); - for (const auto &arg : args) { - protocol::append_primitive_with_type(args_builder, arg); - } + protocol::append_primitive_with_type(args_builder, arg.get_primitive()); auto args_data = args_builder.build(); writer.write_binary(args_data); @@ -235,9 +226,9 @@ private: }; void compute_impl::submit_to_nodes(const std::vector<cluster_node> &nodes, std::shared_ptr<job_descriptor> descriptor, - const std::vector<primitive> &args, ignite_callback<job_execution> callback) { + const binary_object &arg, ignite_callback<job_execution> callback) { - auto writer_func = [&nodes, &descriptor, args](protocol::writer &writer) { + auto writer_func = [&nodes, &descriptor, arg](protocol::writer &writer) { auto nodes_num = std::int32_t(nodes.size()); writer.write(nodes_num); for (const auto &node : nodes) { @@ -249,7 +240,7 @@ void compute_impl::submit_to_nodes(const std::vector<cluster_node> &nodes, std:: writer.write(descriptor->get_execution_options().get_priority()); writer.write(descriptor->get_execution_options().get_max_retries()); - write_primitives_as_binary_tuple(writer, args); + write_object_as_binary_tuple(writer, arg); }; auto handler = std::make_shared<response_handler_compute>(shared_from_this(), std::move(callback), false); @@ -259,11 +250,11 @@ void compute_impl::submit_to_nodes(const std::vector<cluster_node> &nodes, std:: } void compute_impl::submit_colocated_async(const std::string &table_name, const ignite_tuple &key, - std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive> &args, + std::shared_ptr<job_descriptor> descriptor, const binary_object &arg, ignite_callback<job_execution> callback) { auto self = shared_from_this(); auto conn = m_connection; - auto on_table_get = [self, table_name, key, descriptor, args, conn, callback](auto &&res) mutable { + auto on_table_get = [self, table_name, key, descriptor, arg, conn, callback](auto &&res) mutable { if (res.has_error()) { callback({std::move(res.error())}); return; @@ -276,8 +267,8 @@ void compute_impl::submit_colocated_async(const std::string &table_name, const i auto table = table_impl::from_facade(*table_opt); table->template with_proper_schema_async<job_execution>( - callback, [self, table, key, descriptor, args, conn](const schema &sch, auto callback) mutable { - auto writer_func = [&key, &descriptor, &sch, &table, &args](protocol::writer &writer) { + callback, [self, table, key, descriptor, arg, conn](const schema &sch, auto callback) mutable { + auto writer_func = [&key, &descriptor, &sch, &table, &arg](protocol::writer &writer) { writer.write(table->get_id()); writer.write(sch.version); write_tuple(writer, sch, key, true); @@ -287,7 +278,7 @@ void compute_impl::submit_colocated_async(const std::string &table_name, const i writer.write(descriptor->get_execution_options().get_priority()); writer.write(descriptor->get_execution_options().get_max_retries()); - write_primitives_as_binary_tuple(writer, args); + write_object_as_binary_tuple(writer, arg); }; auto handler = std::make_shared<response_handler_compute>(self, std::move(callback), true); diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h index 639450cd29..9df6736e33 100644 --- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h +++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h @@ -25,6 +25,7 @@ #include "ignite/client/detail/table/tables_impl.h" #include "ignite/client/network/cluster_node.h" #include "ignite/client/table/ignite_tuple.h" +#include "ignite/common/binary_object.h" #include "ignite/common/ignite_result.h" #include "ignite/common/primitive.h" @@ -55,11 +56,11 @@ public: * * @param nodes Candidate node to use for the job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @param callback A callback called on operation completion with job execution result. */ void submit_to_nodes(const std::vector<cluster_node> &nodes, std::shared_ptr<job_descriptor> descriptor, - const std::vector<primitive> &args, ignite_callback<job_execution> callback); + const binary_object &arg, ignite_callback<job_execution> callback); /** * Submits a compute job represented by the given class for an execution on one of the nodes where the given key is @@ -68,11 +69,11 @@ public: * @param table_name Name of the table to be used with @c key to determine target node. * @param key Table key to be used to determine the target node for job execution. * @param descriptor Descriptor. - * @param args Job arguments. + * @param arg Job argument. * @param callback A callback called on operation completion with job execution result. */ void submit_colocated_async(const std::string &table_name, const ignite_tuple &key, - std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive> &args, + std::shared_ptr<job_descriptor> descriptor, const binary_object &arg, ignite_callback<job_execution> callback); /** diff --git a/modules/platforms/cpp/ignite/common/binary_object.h b/modules/platforms/cpp/ignite/common/binary_object.h new file mode 100644 index 0000000000..80e575bf3b --- /dev/null +++ b/modules/platforms/cpp/ignite/common/binary_object.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "primitive.h" + +namespace ignite { + +/** + * Ignite binary_object type. + */ +class binary_object { +public: + // Default + binary_object() = default; + + /** + * Primitive constructor. + * + * @param value Primitive type value. + */ + binary_object(primitive value) {} // NOLINT(google-explicit-constructor) + + /** + * Get underlying primitive value. + * + * @throw ignite_error If the packed value is not a primitive. + * @return Primitive value. + */ + primitive get_primitive() const { return m_value; } + +private: + /** Value. */ + primitive m_value; +}; + +} // namespace ignite diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp b/modules/platforms/cpp/tests/client-test/compute_test.cpp index 8fe007cf18..f1a0949230 100644 --- a/modules/platforms/cpp/tests/client-test/compute_test.cpp +++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp @@ -71,11 +71,17 @@ protected: template<typename T> void check_argument(T value, const std::string &expected_str) { auto cluster_nodes = m_client.get_cluster_nodes(); - auto execution = m_client.get_compute().submit(cluster_nodes, m_echo_job_desc, {value, expected_str}); + auto execution = m_client.get_compute().submit(cluster_nodes, m_echo_job, {value}); auto result = execution.get_result(); ASSERT_TRUE(result.has_value()); EXPECT_EQ(result.value().template get<T>(), value); + + execution = m_client.get_compute().submit(cluster_nodes, m_to_string_job, {value}); + result = execution.get_result(); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(result.value().template get<std::string>(), expected_str); } /** @@ -92,20 +98,23 @@ protected: /** Ignite client. */ ignite_client m_client; - /** Node name job descriptor. */ - std::shared_ptr<job_descriptor> m_node_name_job_desc{job_descriptor::builder().set_job_class_name(NODE_NAME_JOB).build()}; + /** Node name job. */ + std::shared_ptr<job_descriptor> m_node_name_job{job_descriptor::builder(NODE_NAME_JOB).build()}; - /** Echo job descriptor. */ - std::shared_ptr<job_descriptor> m_echo_job_desc{job_descriptor::builder().set_job_class_name(ECHO_JOB).build()}; + /** Echo job. */ + std::shared_ptr<job_descriptor> m_echo_job{job_descriptor::builder(ECHO_JOB).build()}; - /** Concat job descriptor. */ - std::shared_ptr<job_descriptor> m_concat_job_desc{job_descriptor::builder().set_job_class_name(CONCAT_JOB).build()}; + /** Concat job. */ + std::shared_ptr<job_descriptor> m_concat_job{job_descriptor::builder(CONCAT_JOB).build()}; - /** Error job descriptor. */ - std::shared_ptr<job_descriptor> m_error_job_desc{job_descriptor::builder().set_job_class_name(ERROR_JOB).build()}; + /** Error job. */ + std::shared_ptr<job_descriptor> m_error_job{job_descriptor::builder(ERROR_JOB).build()}; - /** Sleep job descriptor. */ - std::shared_ptr<job_descriptor> m_sleep_job_desc{job_descriptor::builder().set_job_class_name(SLEEP_JOB).build()}; + /** Sleep job. */ + std::shared_ptr<job_descriptor> m_sleep_job{job_descriptor::builder(SLEEP_JOB).build()}; + + /** ToString job. */ + std::shared_ptr<job_descriptor> m_to_string_job{job_descriptor::builder(TO_STRING_JOB).build()}; }; TEST_F(compute_test, get_cluster_nodes) { @@ -131,7 +140,7 @@ TEST_F(compute_test, get_cluster_nodes) { TEST_F(compute_test, execute_on_random_node) { auto cluster_nodes = m_client.get_cluster_nodes(); - auto execution = m_client.get_compute().submit(cluster_nodes, m_node_name_job_desc, {}); + auto execution = m_client.get_compute().submit(cluster_nodes, m_node_name_job, {}); auto result = execution.get_result(); ASSERT_TRUE(result.has_value()); @@ -139,8 +148,8 @@ TEST_F(compute_test, execute_on_random_node) { } TEST_F(compute_test, execute_on_specific_node) { - auto execution1 = m_client.get_compute().submit({get_node(0)}, m_node_name_job_desc, {"-", 11}); - auto execution2 = m_client.get_compute().submit({get_node(1)}, m_node_name_job_desc, {":", 22}); + auto execution1 = m_client.get_compute().submit({get_node(0)}, m_node_name_job, {"-11"}); + auto execution2 = m_client.get_compute().submit({get_node(1)}, m_node_name_job, {42}); auto res1 = execution1.get_result(); auto res2 = execution2.get_result(); @@ -148,12 +157,12 @@ TEST_F(compute_test, execute_on_specific_node) { ASSERT_TRUE(res1.has_value()); ASSERT_TRUE(res2.has_value()); - EXPECT_EQ(res1.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "-_11"); - EXPECT_EQ(res2.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "_2:_22"); + EXPECT_EQ(res1.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "-11"); + EXPECT_EQ(res2.value().get<std::string>(), PLATFORM_TEST_NODE_RUNNER + "_242"); } TEST_F(compute_test, execute_broadcast_one_node) { - auto res = m_client.get_compute().submit_broadcast({get_node(1)}, m_node_name_job_desc, {"42"}); + auto res = m_client.get_compute().submit_broadcast({get_node(1)}, m_node_name_job, {"42"}); ASSERT_EQ(res.size(), 1); @@ -164,7 +173,7 @@ TEST_F(compute_test, execute_broadcast_one_node) { } TEST_F(compute_test, execute_broadcast_all_nodes) { - auto res = m_client.get_compute().submit_broadcast(get_node_set(), m_node_name_job_desc, {"42"}); + auto res = m_client.get_compute().submit_broadcast(get_node_set(), m_node_name_job, {"42"}); ASSERT_EQ(res.size(), 4); @@ -174,23 +183,13 @@ TEST_F(compute_test, execute_broadcast_all_nodes) { EXPECT_EQ(res[get_node(3)].value().get_result(), get_node(3).get_name() + "42"); } -TEST_F(compute_test, execute_with_args) { - auto cluster_nodes = m_client.get_cluster_nodes(); - - auto execution = m_client.get_compute().submit(cluster_nodes, m_concat_job_desc, {5.3, uuid(), "42", nullptr}); - auto result = execution.get_result(); - - ASSERT_TRUE(result.has_value()); - EXPECT_EQ(result.value().get<std::string>(), "5.3_00000000-0000-0000-0000-000000000000_42_null"); -} - TEST_F(compute_test, job_error_propagates_to_client) { auto cluster_nodes = m_client.get_cluster_nodes(); EXPECT_THROW( { try { - m_client.get_compute().submit(cluster_nodes, m_error_job_desc, {"unused"}).get_result(); + m_client.get_compute().submit(cluster_nodes, m_error_job, {"unused"}).get_result(); } catch (const ignite_error &e) { EXPECT_THAT(e.what_str(), testing::HasSubstr("Custom job error")); // TODO https://issues.apache.org/jira/browse/IGNITE-19603 @@ -210,7 +209,7 @@ TEST_F(compute_test, unknown_node_execute_throws) { EXPECT_THROW( { try { - m_client.get_compute().submit({unknown_node}, m_echo_job_desc, {"unused"}); + m_client.get_compute().submit({unknown_node}, m_echo_job, {"unused"}); } catch (const ignite_error &e) { EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]")); @@ -227,7 +226,7 @@ TEST_F(compute_test, DISABLED_unknown_node_broadcast_throws) { EXPECT_THROW( { try { - m_client.get_compute().submit_broadcast({unknown_node}, m_echo_job_desc, {"unused"}); + m_client.get_compute().submit_broadcast({unknown_node}, m_echo_job, {"unused"}); } catch (const ignite_error &e) { EXPECT_THAT(e.what_str(), testing::HasSubstr("None of the specified nodes are present in the cluster: [random]")); @@ -288,7 +287,7 @@ TEST_F(compute_test, submit_colocated) { SCOPED_TRACE("key=" + std::to_string(var.first) + ", node=" + var.second); auto key = get_tuple(var.first); - auto execution = m_client.get_compute().submit_colocated(TABLE_1, key, m_node_name_job_desc, {}); + auto execution = m_client.get_compute().submit_colocated(TABLE_1, key, m_node_name_job, {}); auto res_node_name = execution.get_result(); auto expected_node_name = PLATFORM_TEST_NODE_RUNNER + var.second; @@ -300,7 +299,7 @@ TEST_F(compute_test, execute_colocated_throws_when_table_does_not_exist) { EXPECT_THROW( { try { - (void) m_client.get_compute().submit_colocated("unknownTable", get_tuple(42), m_echo_job_desc, {}); + (void) m_client.get_compute().submit_colocated("unknownTable", get_tuple(42), m_echo_job, {}); } catch (const ignite_error &e) { EXPECT_STREQ("Table does not exist: 'unknownTable'", e.what()); throw; @@ -313,7 +312,7 @@ TEST_F(compute_test, execute_colocated_throws_when_key_column_is_missing) { EXPECT_THROW( { try { - (void) m_client.get_compute().submit_colocated(TABLE_1, get_tuple("some"), m_echo_job_desc, {}); + (void) m_client.get_compute().submit_colocated(TABLE_1, get_tuple("some"), m_echo_job, {}); } catch (const ignite_error &e) { EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Missed key column: KEY")); throw; @@ -326,7 +325,7 @@ TEST_F(compute_test, execute_colocated_throws_when_key_is_empty) { EXPECT_THROW( { try { - (void) m_client.get_compute().submit_colocated(TABLE_1, {}, m_echo_job_desc, {}); + (void) m_client.get_compute().submit_colocated(TABLE_1, {}, m_echo_job, {}); } catch (const ignite_error &e) { EXPECT_EQ("Key tuple can not be empty", e.what_str()); throw; @@ -340,9 +339,8 @@ TEST_F(compute_test, unknown_unit) { { try { auto cluster_nodes = m_client.get_cluster_nodes(); - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{"unknown"}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{"unknown"}}) .build(); (void) m_client.get_compute().submit(cluster_nodes, job_desc, {}); @@ -359,9 +357,8 @@ TEST_F(compute_test, execute_unknown_unit_and_version) { { try { auto cluster_nodes = m_client.get_cluster_nodes(); - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{"unknown", "1.2.3"}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{"unknown", "1.2.3"}}) .build(); (void) m_client.get_compute().submit(cluster_nodes, job_desc, {}); @@ -378,9 +375,8 @@ TEST_F(compute_test, execute_colocated_unknown_unit_and_version) { { try { auto comp = m_client.get_compute(); - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{"unknown", "1.2.3"}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{"unknown", "1.2.3"}}) .build(); (void) comp.submit_colocated(TABLE_1, get_tuple(1), job_desc, {}); @@ -393,9 +389,8 @@ TEST_F(compute_test, execute_colocated_unknown_unit_and_version) { } TEST_F(compute_test, broadcast_unknown_unit_and_version) { - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{"unknown", "1.2.3"}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{"unknown", "1.2.3"}}) .build(); auto res = m_client.get_compute().submit_broadcast({get_node(1)}, job_desc, {}); @@ -411,9 +406,8 @@ TEST_F(compute_test, execute_empty_unit_name) { EXPECT_THROW( { try { - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{""}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{""}}) .build(); (void) m_client.get_compute().submit({get_node(1)}, job_desc, {}); @@ -429,9 +423,8 @@ TEST_F(compute_test, execute_empty_unit_version) { EXPECT_THROW( { try { - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{"some", ""}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{"some", ""}}) .build(); (void) m_client.get_compute().submit({get_node(1)}, job_desc, {}); @@ -447,9 +440,8 @@ TEST_F(compute_test, broadcast_empty_unit_name) { EXPECT_THROW( { try { - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{""}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{""}}) .build(); (void) m_client.get_compute().submit_broadcast({get_node(1)}, job_desc, {}); @@ -465,9 +457,8 @@ TEST_F(compute_test, broadcast_empty_unit_version) { EXPECT_THROW( { try { - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{"some", ""}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{"some", ""}}) .build(); (void) m_client.get_compute().submit_broadcast({get_node(1)}, job_desc, {}); @@ -483,9 +474,8 @@ TEST_F(compute_test, execute_colocated_empty_unit_name) { EXPECT_THROW( { try { - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{""}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{""}}) .build(); (void) m_client.get_compute().submit_colocated(TABLE_1, get_tuple(1), job_desc, {}); @@ -502,9 +492,8 @@ TEST_F(compute_test, execute_colocated_empty_unit_version) { { try { auto comp = m_client.get_compute(); - auto job_desc = job_descriptor::builder() - .set_job_class_name(NODE_NAME_JOB) - .set_deployment_units({{"some", ""}}) + auto job_desc = job_descriptor::builder(NODE_NAME_JOB) + .deployment_units({{"some", ""}}) .build(); comp.submit_colocated(TABLE_1, get_tuple(1), job_desc, {}); @@ -519,7 +508,7 @@ TEST_F(compute_test, execute_colocated_empty_unit_version) { TEST_F(compute_test, job_execution_status_executing) { const std::int32_t sleep_ms = 3000; - auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job_desc, {sleep_ms}); + auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job, {sleep_ms}); auto state = execution.get_state(); @@ -530,7 +519,7 @@ TEST_F(compute_test, job_execution_status_executing) { TEST_F(compute_test, DISABLED_job_execution_status_completed) { const std::int32_t sleep_ms = 1; - auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job_desc, {sleep_ms}); + auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job, {sleep_ms}); execution.get_result(); auto state = execution.get_state(); @@ -540,7 +529,7 @@ TEST_F(compute_test, DISABLED_job_execution_status_completed) { } TEST_F(compute_test, job_execution_status_failed) { - auto execution = m_client.get_compute().submit({get_node(1)}, m_error_job_desc, {"unused"}); + auto execution = m_client.get_compute().submit({get_node(1)}, m_error_job, {"unused"}); EXPECT_THROW( { @@ -562,7 +551,7 @@ TEST_F(compute_test, job_execution_status_failed) { TEST_F(compute_test, job_execution_cancel) { const std::int32_t sleep_ms = 5000; - auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job_desc, {sleep_ms}); + auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job, {sleep_ms}); execution.cancel(); auto state = execution.get_state(); @@ -574,7 +563,7 @@ TEST_F(compute_test, job_execution_cancel) { TEST_F(compute_test, job_execution_change_priority) { const std::int32_t sleep_ms = 5000; - auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job_desc, {sleep_ms}); + auto execution = m_client.get_compute().submit({get_node(1)}, m_sleep_job, {sleep_ms}); auto res = execution.change_priority(123); EXPECT_EQ(res, job_execution::operation_result::INVALID_STATE);
