This is an automated email from the ASF dual-hosted git repository.
isapego pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cb69d4f6f2 IGNITE-22527 C++ Add parameter object to Compute API (#4013)
cb69d4f6f2 is described below
commit cb69d4f6f29ad55b2ecaa35e76b74d7ec6ae1fe4
Author: Igor Sapego <[email protected]>
AuthorDate: Fri Jun 28 16:18:38 2024 +0400
IGNITE-22527 C++ Add parameter object to Compute API (#4013)
---
modules/platforms/cpp/ignite/client/CMakeLists.txt | 6 +
.../cpp/ignite/client/compute/compute.cpp | 25 ++---
.../platforms/cpp/ignite/client/compute/compute.h | 54 ++++-----
.../cpp/ignite/client/compute/job_descriptor.h | 116 +++++++++++++++++++
.../ignite/client/compute/job_execution_options.h | 4 +-
.../ignite/client/detail/compute/compute_impl.cpp | 33 +++---
.../ignite/client/detail/compute/compute_impl.h | 17 +--
.../cpp/tests/client-test/compute_test.cpp | 123 ++++++++++++++++-----
.../client-test/schema_synchronization_test.cpp | 3 +-
.../cpp/tests/test-common/basic_auth_test_suite.h | 4 +-
10 files changed, 277 insertions(+), 108 deletions(-)
diff --git a/modules/platforms/cpp/ignite/client/CMakeLists.txt
b/modules/platforms/cpp/ignite/client/CMakeLists.txt
index 32485c55ff..83c55f32ad 100644
--- a/modules/platforms/cpp/ignite/client/CMakeLists.txt
+++ b/modules/platforms/cpp/ignite/client/CMakeLists.txt
@@ -51,11 +51,17 @@ set(PUBLIC_HEADERS
type_mapping.h
compute/compute.h
compute/deployment_unit.h
+ compute/job_descriptor.h
+ compute/job_execution.h
+ compute/job_execution_options.h
+ compute/job_state.h
+ compute/job_status.h
detail/type_mapping_utils.h
network/cluster_node.h
sql/column_metadata.h
sql/column_origin.h
sql/result_set.h
+ sql/result_set_metadata.h
sql/sql.h
sql/sql_statement.h
table/ignite_tuple.h
diff --git a/modules/platforms/cpp/ignite/client/compute/compute.cpp
b/modules/platforms/cpp/ignite/client/compute/compute.cpp
index 2aa50fad0a..d8a5a1242c 100644
--- a/modules/platforms/cpp/ignite/client/compute/compute.cpp
+++ b/modules/platforms/cpp/ignite/client/compute/compute.cpp
@@ -21,22 +21,21 @@
namespace ignite {
-void compute::submit_async(const std::vector<cluster_node> &nodes, const
std::vector<deployment_unit> &units,
- std::string_view job_class_name, const std::vector<primitive> &args, const
job_execution_options &options,
- ignite_callback<job_execution> callback) {
+void compute::submit_async(const std::vector<cluster_node> &nodes,
std::shared_ptr<job_descriptor> descriptor,
+ const std::vector<primitive> &args, ignite_callback<job_execution>
callback) {
detail::arg_check::container_non_empty(nodes, "Nodes container");
- detail::arg_check::container_non_empty(job_class_name, "Job class name");
+ detail::arg_check::container_non_empty(descriptor->get_job_class_name(),
"Job class name");
- m_impl->submit_to_nodes(nodes, units, job_class_name, args, options,
std::move(callback));
+ m_impl->submit_to_nodes(nodes, descriptor, args, std::move(callback));
}
-void compute::submit_broadcast_async(const std::set<cluster_node> &nodes,
const std::vector<deployment_unit> &units,
- std::string_view job_class_name, const std::vector<primitive> &args, const
job_execution_options &options,
+void compute::submit_broadcast_async(const std::set<cluster_node> &nodes,
std::shared_ptr<job_descriptor> descriptor,
+ const std::vector<primitive> &args,
ignite_callback<std::map<cluster_node, ignite_result<job_execution>>>
callback) {
typedef std::map<cluster_node, ignite_result<job_execution>> result_type;
detail::arg_check::container_non_empty(nodes, "Nodes set");
- detail::arg_check::container_non_empty(job_class_name, "Job class name");
+ detail::arg_check::container_non_empty(descriptor->get_job_class_name(),
"Job class name");
struct result_group {
explicit result_group(std::int32_t cnt, ignite_callback<result_type>
&&cb)
@@ -53,7 +52,7 @@ void compute::submit_broadcast_async(const
std::set<cluster_node> &nodes, const
for (const auto &node : nodes) {
std::vector<cluster_node> candidates = {node};
- m_impl->submit_to_nodes(candidates, units, job_class_name, args,
options, [node, shared_res](auto &&res) {
+ m_impl->submit_to_nodes(candidates, descriptor, args, [node,
shared_res](auto &&res) {
auto &val = *shared_res;
std::lock_guard<std::mutex> lock(val.m_mutex);
@@ -66,14 +65,14 @@ void compute::submit_broadcast_async(const
std::set<cluster_node> &nodes, const
}
void compute::submit_colocated_async(std::string_view table_name, const
ignite_tuple &key,
- const std::vector<deployment_unit> &units, std::string_view
job_class_name, const std::vector<primitive> &args,
- const job_execution_options &options, ignite_callback<job_execution>
callback) {
+ std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive>
&args,
+ ignite_callback<job_execution> callback) {
detail::arg_check::container_non_empty(table_name, "Table name");
detail::arg_check::tuple_non_empty(key, "Key tuple");
- detail::arg_check::container_non_empty(job_class_name, "Job class name");
+ detail::arg_check::container_non_empty(descriptor->get_job_class_name(),
"Job class name");
m_impl->submit_colocated_async(
- std::string(table_name), key, units, std::string(job_class_name),
args, options, std::move(callback));
+ std::string(table_name), key, descriptor, args, std::move(callback));
}
} // namespace ignite
diff --git a/modules/platforms/cpp/ignite/client/compute/compute.h
b/modules/platforms/cpp/ignite/client/compute/compute.h
index 2aa789066b..4108d1cc44 100644
--- a/modules/platforms/cpp/ignite/client/compute/compute.h
+++ b/modules/platforms/cpp/ignite/client/compute/compute.h
@@ -18,6 +18,7 @@
#pragma once
#include "ignite/client/compute/deployment_unit.h"
+#include "ignite/client/compute/job_descriptor.h"
#include "ignite/client/compute/job_execution.h"
#include "ignite/client/compute/job_execution_options.h"
#include "ignite/client/network/cluster_node.h"
@@ -54,30 +55,25 @@ public:
* nodes.
*
* @param nodes Nodes to use for the job execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @param callback A callback called on operation completion with job
execution result.
*/
- IGNITE_API void submit_async(const std::vector<cluster_node> &nodes, const
std::vector<deployment_unit> &units,
- std::string_view job_class_name, const std::vector<primitive> &args,
const job_execution_options &options,
- ignite_callback<job_execution> callback);
+ IGNITE_API void submit_async(const std::vector<cluster_node> &nodes,
std::shared_ptr<job_descriptor> descriptor,
+ const std::vector<primitive> &args, ignite_callback<job_execution>
callback);
/**
* Submits for execution a compute job represented by the given class on
one of the specified nodes.
*
* @param nodes Nodes to use for the job execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @return Job execution result.
*/
- IGNITE_API job_execution submit(const std::vector<cluster_node> &nodes,
const std::vector<deployment_unit> &units,
- std::string_view job_class_name, const std::vector<primitive> &args,
const job_execution_options &options) {
+ IGNITE_API job_execution submit(const std::vector<cluster_node> &nodes,
std::shared_ptr<job_descriptor> descriptor,
+ const std::vector<primitive> &args) {
return sync<job_execution>([&](auto callback) mutable {
- submit_async(nodes, units, job_class_name, args, options,
std::move(callback));
+ submit_async(nodes, descriptor, args, std::move(callback));
});
}
@@ -85,32 +81,27 @@ public:
* Broadcast a compute job represented by the given class for an execution
on all of the specified nodes.
*
* @param nodes Nodes to use for the job execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @param callback A callback called on operation completion with jobs
execution result.
*/
IGNITE_API void submit_broadcast_async(const std::set<cluster_node> &nodes,
- const std::vector<deployment_unit> &units, std::string_view
job_class_name, const std::vector<primitive> &args,
- const job_execution_options &options,
+ std::shared_ptr<job_descriptor> descriptor, const
std::vector<primitive> &args,
ignite_callback<std::map<cluster_node, ignite_result<job_execution>>>
callback);
/**
* Broadcast a compute job represented by the given class on all of the
specified nodes.
*
* @param nodes Nodes to use for the job execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @return Job execution result.
*/
IGNITE_API std::map<cluster_node, ignite_result<job_execution>>
submit_broadcast(
- const std::set<cluster_node> &nodes, const
std::vector<deployment_unit> &units, std::string_view job_class_name,
- const std::vector<primitive> &args, const job_execution_options
&options) {
+ const std::set<cluster_node> &nodes, std::shared_ptr<job_descriptor>
descriptor,
+ const std::vector<primitive> &args) {
return sync<std::map<cluster_node,
ignite_result<job_execution>>>([&](auto callback) mutable {
- submit_broadcast_async(nodes, units, job_class_name, args,
options, std::move(callback));
+ submit_broadcast_async(nodes, descriptor, args,
std::move(callback));
});
}
@@ -119,32 +110,27 @@ public:
*
* @param table_name Name of the table to be used with @c key to determine
target node.
* @param key Table key to be used to determine the target node for job
execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @param callback A callback called on operation completion with job
execution result.
*/
IGNITE_API void submit_colocated_async(std::string_view table_name, const
ignite_tuple &key,
- const std::vector<deployment_unit> &units, std::string_view
job_class_name, const std::vector<primitive> &args,
- const job_execution_options &options, ignite_callback<job_execution>
callback);
+ std::shared_ptr<job_descriptor> descriptor, const
std::vector<primitive> &args,
+ ignite_callback<job_execution> callback);
/**
* Synchronously executes a job represented by the given class on one node
where the given key is located.
*
* @param table_name Name of the table to be used with @c key to determine
target node.
* @param key Table key to be used to determine the target node for job
execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @return Job execution result.
*/
IGNITE_API job_execution submit_colocated(std::string_view table_name,
const ignite_tuple &key,
- const std::vector<deployment_unit> &units, std::string_view
job_class_name, const std::vector<primitive> &args,
- const job_execution_options &options) {
+ std::shared_ptr<job_descriptor> descriptor, const
std::vector<primitive> &args) {
return sync<job_execution>([&](auto callback) mutable {
- submit_colocated_async(table_name, key, units, job_class_name,
args, options, std::move(callback));
+ submit_colocated_async(table_name, key, descriptor, args,
std::move(callback));
});
}
diff --git a/modules/platforms/cpp/ignite/client/compute/job_descriptor.h
b/modules/platforms/cpp/ignite/client/compute/job_descriptor.h
new file mode 100644
index 0000000000..d895a18155
--- /dev/null
+++ b/modules/platforms/cpp/ignite/client/compute/job_descriptor.h
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "ignite/client/compute/deployment_unit.h"
+#include "ignite/client/compute/job_execution_options.h"
+
+#include <string>
+#include <vector>
+#include <memory>
+
+namespace ignite {
+
+/**
+ * Compute job descriptor.
+ */
+class job_descriptor {
+public:
+ /**
+ * Default constructor.
+ */
+ job_descriptor() = default;
+
+ /**
+ * Get job class name.
+ *
+ * @return Job class name.
+ */
+ [[nodiscard]] const std::string &get_job_class_name() const { return
m_job_class_name; }
+
+ /**
+ * Get deployment units.
+ *
+ * @return Deployment units.
+ */
+ [[nodiscard]] const std::vector<deployment_unit> &get_deployment_units()
const { return m_units; }
+
+ /**
+ * Get execution options.
+ *
+ * @return Execution options.
+ */
+ [[nodiscard]] const job_execution_options &get_execution_options() const {
return m_options; }
+
+ /**
+ * Builder.
+ */
+ class builder {
+ public:
+ /**
+ * Set job class name.
+ *
+ * @param job_class_name Job class name.
+ */
+ builder& set_job_class_name(const std::string &job_class_name) {
+ m_descriptor->m_job_class_name = job_class_name;
+ return *this;
+ }
+
+ /**
+ * Set deployment units.
+ *
+ * @param units Deployment units to set.
+ */
+ builder& set_deployment_units(std::vector<deployment_unit> units) {
+ m_descriptor->m_units = std::move(units);
+ return *this;
+ }
+
+ /**
+ * Set execution options.
+ *
+ * @param options Execution options.
+ */
+ builder& set_execution_options(const job_execution_options &options) {
+ m_descriptor->m_options = options;
+ return *this;
+ }
+
+ /**
+ * Build Job Descriptor.
+ *
+ * @return An instance of Job Descriptor.
+ */
+ std::shared_ptr<job_descriptor> build() { return
std::move(m_descriptor); }
+ private:
+ /** Descriptor. */
+ std::shared_ptr<job_descriptor>
m_descriptor{std::make_shared<job_descriptor>()};
+ };
+private:
+ /** Job class name. */
+ std::string m_job_class_name;
+
+ /** Units. */
+ std::vector<deployment_unit> m_units;
+
+ /** Options. */
+ job_execution_options m_options;
+};
+
+} // namespace ignite
diff --git
a/modules/platforms/cpp/ignite/client/compute/job_execution_options.h
b/modules/platforms/cpp/ignite/client/compute/job_execution_options.h
index 1a759c05e9..0ff34793aa 100644
--- a/modules/platforms/cpp/ignite/client/compute/job_execution_options.h
+++ b/modules/platforms/cpp/ignite/client/compute/job_execution_options.h
@@ -61,10 +61,10 @@ public:
private:
/** Job execution priority. */
- const std::int32_t m_priority{0};
+ std::int32_t m_priority{0};
/** Max re-tries. */
- const std::int32_t m_max_retries{0};
+ std::int32_t m_max_retries{0};
};
} // namespace ignite
diff --git
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
index a41427a4b2..52903bcd67 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -234,21 +234,20 @@ private:
std::shared_ptr<job_execution_impl> m_execution{};
};
-void compute_impl::submit_to_nodes(const std::vector<cluster_node> &nodes,
const std::vector<deployment_unit> &units,
- std::string_view job_class_name, const std::vector<primitive> &args, const
job_execution_options &options,
- ignite_callback<job_execution> callback) {
+void compute_impl::submit_to_nodes(const std::vector<cluster_node> &nodes,
std::shared_ptr<job_descriptor> descriptor,
+ const std::vector<primitive> &args, ignite_callback<job_execution>
callback) {
- auto writer_func = [&nodes, job_class_name, &units, args,
options](protocol::writer &writer) {
+ auto writer_func = [&nodes, &descriptor, args](protocol::writer &writer) {
auto nodes_num = std::int32_t(nodes.size());
writer.write(nodes_num);
for (const auto &node : nodes) {
writer.write(node.get_name());
}
- write_units(writer, units);
- writer.write(job_class_name);
+ write_units(writer, descriptor->get_deployment_units());
+ writer.write(descriptor->get_job_class_name());
- writer.write(options.get_priority());
- writer.write(options.get_max_retries());
+ writer.write(descriptor->get_execution_options().get_priority());
+ writer.write(descriptor->get_execution_options().get_max_retries());
write_primitives_as_binary_tuple(writer, args);
};
@@ -260,11 +259,11 @@ void compute_impl::submit_to_nodes(const
std::vector<cluster_node> &nodes, const
}
void compute_impl::submit_colocated_async(const std::string &table_name, const
ignite_tuple &key,
- const std::vector<deployment_unit> &units, const std::string &job, const
std::vector<primitive> &args,
- const job_execution_options &options, ignite_callback<job_execution>
callback) {
+ std::shared_ptr<job_descriptor> descriptor, const std::vector<primitive>
&args,
+ ignite_callback<job_execution> callback) {
auto self = shared_from_this();
auto conn = m_connection;
- auto on_table_get = [self, table_name, key, units, job, args, conn,
options, callback](auto &&res) mutable {
+ auto on_table_get = [self, table_name, key, descriptor, args, conn,
callback](auto &&res) mutable {
if (res.has_error()) {
callback({std::move(res.error())});
return;
@@ -277,16 +276,16 @@ void compute_impl::submit_colocated_async(const
std::string &table_name, const i
auto table = table_impl::from_facade(*table_opt);
table->template with_proper_schema_async<job_execution>(
- callback, [self, table, key, units, job, args, conn,
options](const schema &sch, auto callback) mutable {
- auto writer_func = [&key, &units, &sch, &table, &job, &args,
&options](protocol::writer &writer) {
+ callback, [self, table, key, descriptor, args, conn](const schema
&sch, auto callback) mutable {
+ auto writer_func = [&key, &descriptor, &sch, &table,
&args](protocol::writer &writer) {
writer.write(table->get_id());
writer.write(sch.version);
write_tuple(writer, sch, key, true);
- write_units(writer, units);
- writer.write(job);
+ write_units(writer, descriptor->get_deployment_units());
+ writer.write(descriptor->get_job_class_name());
- writer.write(options.get_priority());
- writer.write(options.get_max_retries());
+
writer.write(descriptor->get_execution_options().get_priority());
+
writer.write(descriptor->get_execution_options().get_max_retries());
write_primitives_as_binary_tuple(writer, args);
};
diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
index 68b837a4bc..639450cd29 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
@@ -18,6 +18,7 @@
#pragma once
#include "ignite/client/compute/deployment_unit.h"
+#include "ignite/client/compute/job_descriptor.h"
#include "ignite/client/compute/job_execution.h"
#include "ignite/client/compute/job_execution_options.h"
#include "ignite/client/detail/cluster_connection.h"
@@ -53,15 +54,12 @@ public:
* asynchronously. If the node leaves the cluster, it will be restarted on
one of the candidate nodes.
*
* @param nodes Candidate node to use for the job execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @param callback A callback called on operation completion with job
execution result.
*/
- void submit_to_nodes(const std::vector<cluster_node> &nodes, const
std::vector<deployment_unit> &units,
- std::string_view job_class_name, const std::vector<primitive> &args,
const job_execution_options &options,
- ignite_callback<job_execution> callback);
+ void submit_to_nodes(const std::vector<cluster_node> &nodes,
std::shared_ptr<job_descriptor> descriptor,
+ const std::vector<primitive> &args, ignite_callback<job_execution>
callback);
/**
* Submits a compute job represented by the given class for an execution
on one of the nodes where the given key is
@@ -69,15 +67,12 @@ public:
*
* @param table_name Name of the table to be used with @c key to determine
target node.
* @param key Table key to be used to determine the target node for job
execution.
- * @param units Deployment units. Can be empty.
- * @param job_class_name Java class name of the job to submit.
+ * @param descriptor Descriptor.
* @param args Job arguments.
- * @param options Job execution options.
* @param callback A callback called on operation completion with job
execution result.
*/
void submit_colocated_async(const std::string &table_name, const
ignite_tuple &key,
- const std::vector<deployment_unit> &units, const std::string
&job_class_name,
- const std::vector<primitive> &args, const job_execution_options
&options,
+ std::shared_ptr<job_descriptor> descriptor, const
std::vector<primitive> &args,
ignite_callback<job_execution> callback);
/**
diff --git a/modules/platforms/cpp/tests/client-test/compute_test.cpp
b/modules/platforms/cpp/tests/client-test/compute_test.cpp
index 2868abe78f..8fe007cf18 100644
--- a/modules/platforms/cpp/tests/client-test/compute_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/compute_test.cpp
@@ -71,7 +71,7 @@ protected:
template<typename T>
void check_argument(T value, const std::string &expected_str) {
auto cluster_nodes = m_client.get_cluster_nodes();
- auto execution = m_client.get_compute().submit(cluster_nodes, {},
ECHO_JOB, {value, expected_str}, {});
+ auto execution = m_client.get_compute().submit(cluster_nodes,
m_echo_job_desc, {value, expected_str});
auto result = execution.get_result();
ASSERT_TRUE(result.has_value());
@@ -91,6 +91,21 @@ protected:
/** Ignite client. */
ignite_client m_client;
+
+ /** Node name job descriptor. */
+ std::shared_ptr<job_descriptor>
m_node_name_job_desc{job_descriptor::builder().set_job_class_name(NODE_NAME_JOB).build()};
+
+ /** Echo job descriptor. */
+ std::shared_ptr<job_descriptor>
m_echo_job_desc{job_descriptor::builder().set_job_class_name(ECHO_JOB).build()};
+
+ /** Concat job descriptor. */
+ std::shared_ptr<job_descriptor>
m_concat_job_desc{job_descriptor::builder().set_job_class_name(CONCAT_JOB).build()};
+
+ /** Error job descriptor. */
+ std::shared_ptr<job_descriptor>
m_error_job_desc{job_descriptor::builder().set_job_class_name(ERROR_JOB).build()};
+
+ /** Sleep job descriptor. */
+ std::shared_ptr<job_descriptor>
m_sleep_job_desc{job_descriptor::builder().set_job_class_name(SLEEP_JOB).build()};
};
TEST_F(compute_test, get_cluster_nodes) {
@@ -116,7 +131,7 @@ TEST_F(compute_test, get_cluster_nodes) {
TEST_F(compute_test, execute_on_random_node) {
auto cluster_nodes = m_client.get_cluster_nodes();
- auto execution = m_client.get_compute().submit(cluster_nodes, {},
NODE_NAME_JOB, {}, {});
+ auto execution = m_client.get_compute().submit(cluster_nodes,
m_node_name_job_desc, {});
auto result = execution.get_result();
ASSERT_TRUE(result.has_value());
@@ -124,8 +139,8 @@ TEST_F(compute_test, execute_on_random_node) {
}
TEST_F(compute_test, execute_on_specific_node) {
- auto execution1 = m_client.get_compute().submit({get_node(0)}, {},
NODE_NAME_JOB, {"-", 11}, {});
- auto execution2 = m_client.get_compute().submit({get_node(1)}, {},
NODE_NAME_JOB, {":", 22}, {});
+ auto execution1 = m_client.get_compute().submit({get_node(0)},
m_node_name_job_desc, {"-", 11});
+ auto execution2 = m_client.get_compute().submit({get_node(1)},
m_node_name_job_desc, {":", 22});
auto res1 = execution1.get_result();
auto res2 = execution2.get_result();
@@ -138,7 +153,7 @@ TEST_F(compute_test, execute_on_specific_node) {
}
TEST_F(compute_test, execute_broadcast_one_node) {
- auto res = m_client.get_compute().submit_broadcast({get_node(1)}, {},
NODE_NAME_JOB, {"42"}, {});
+ auto res = m_client.get_compute().submit_broadcast({get_node(1)},
m_node_name_job_desc, {"42"});
ASSERT_EQ(res.size(), 1);
@@ -149,7 +164,7 @@ TEST_F(compute_test, execute_broadcast_one_node) {
}
TEST_F(compute_test, execute_broadcast_all_nodes) {
- auto res = m_client.get_compute().submit_broadcast(get_node_set(), {},
NODE_NAME_JOB, {"42"}, {});
+ auto res = m_client.get_compute().submit_broadcast(get_node_set(),
m_node_name_job_desc, {"42"});
ASSERT_EQ(res.size(), 4);
@@ -162,7 +177,7 @@ TEST_F(compute_test, execute_broadcast_all_nodes) {
TEST_F(compute_test, execute_with_args) {
auto cluster_nodes = m_client.get_cluster_nodes();
- auto execution = m_client.get_compute().submit(cluster_nodes, {},
CONCAT_JOB, {5.3, uuid(), "42", nullptr}, {});
+ auto execution = m_client.get_compute().submit(cluster_nodes,
m_concat_job_desc, {5.3, uuid(), "42", nullptr});
auto result = execution.get_result();
ASSERT_TRUE(result.has_value());
@@ -175,7 +190,7 @@ TEST_F(compute_test, job_error_propagates_to_client) {
EXPECT_THROW(
{
try {
- m_client.get_compute().submit(cluster_nodes, {}, ERROR_JOB,
{"unused"}, {}).get_result();
+ m_client.get_compute().submit(cluster_nodes, m_error_job_desc,
{"unused"}).get_result();
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), testing::HasSubstr("Custom job
error"));
// TODO https://issues.apache.org/jira/browse/IGNITE-19603
@@ -195,7 +210,7 @@ TEST_F(compute_test, unknown_node_execute_throws) {
EXPECT_THROW(
{
try {
- m_client.get_compute().submit({unknown_node}, {}, ECHO_JOB,
{"unused"}, {});
+ m_client.get_compute().submit({unknown_node}, m_echo_job_desc,
{"unused"});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(),
testing::HasSubstr("None of the specified nodes are
present in the cluster: [random]"));
@@ -212,7 +227,7 @@ TEST_F(compute_test,
DISABLED_unknown_node_broadcast_throws) {
EXPECT_THROW(
{
try {
- m_client.get_compute().submit_broadcast({unknown_node}, {},
ECHO_JOB, {"unused"}, {});
+ m_client.get_compute().submit_broadcast({unknown_node},
m_echo_job_desc, {"unused"});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(),
testing::HasSubstr("None of the specified nodes are
present in the cluster: [random]"));
@@ -273,7 +288,7 @@ TEST_F(compute_test, submit_colocated) {
SCOPED_TRACE("key=" + std::to_string(var.first) + ", node=" +
var.second);
auto key = get_tuple(var.first);
- auto execution = m_client.get_compute().submit_colocated(TABLE_1, key,
{}, NODE_NAME_JOB, {}, {});
+ auto execution = m_client.get_compute().submit_colocated(TABLE_1, key,
m_node_name_job_desc, {});
auto res_node_name = execution.get_result();
auto expected_node_name = PLATFORM_TEST_NODE_RUNNER + var.second;
@@ -285,7 +300,7 @@ TEST_F(compute_test,
execute_colocated_throws_when_table_does_not_exist) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit_colocated("unknownTable",
get_tuple(42), {}, ECHO_JOB, {}, {});
+ (void) m_client.get_compute().submit_colocated("unknownTable",
get_tuple(42), m_echo_job_desc, {});
} catch (const ignite_error &e) {
EXPECT_STREQ("Table does not exist: 'unknownTable'", e.what());
throw;
@@ -298,7 +313,7 @@ TEST_F(compute_test,
execute_colocated_throws_when_key_column_is_missing) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit_colocated(TABLE_1,
get_tuple("some"), {}, ECHO_JOB, {}, {});
+ (void) m_client.get_compute().submit_colocated(TABLE_1,
get_tuple("some"), m_echo_job_desc, {});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Missed key
column: KEY"));
throw;
@@ -311,7 +326,7 @@ TEST_F(compute_test,
execute_colocated_throws_when_key_is_empty) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit_colocated(TABLE_1, {},
{}, ECHO_JOB, {}, {});
+ (void) m_client.get_compute().submit_colocated(TABLE_1, {},
m_echo_job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Key tuple can not be empty", e.what_str());
throw;
@@ -325,7 +340,12 @@ TEST_F(compute_test, unknown_unit) {
{
try {
auto cluster_nodes = m_client.get_cluster_nodes();
- (void) m_client.get_compute().submit(cluster_nodes,
{{"unknown"}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{"unknown"}})
+ .build();
+
+ (void) m_client.get_compute().submit(cluster_nodes, job_desc,
{});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:latest doesn't exist"));
throw;
@@ -339,7 +359,12 @@ TEST_F(compute_test, execute_unknown_unit_and_version) {
{
try {
auto cluster_nodes = m_client.get_cluster_nodes();
- (void) m_client.get_compute().submit(cluster_nodes,
{{"unknown", "1.2.3"}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{"unknown", "1.2.3"}})
+ .build();
+
+ (void) m_client.get_compute().submit(cluster_nodes, job_desc,
{});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:1.2.3 doesn't exist"));
throw;
@@ -353,7 +378,12 @@ TEST_F(compute_test,
execute_colocated_unknown_unit_and_version) {
{
try {
auto comp = m_client.get_compute();
- (void) comp.submit_colocated(TABLE_1, get_tuple(1),
{{"unknown", "1.2.3"}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{"unknown", "1.2.3"}})
+ .build();
+
+ (void) comp.submit_colocated(TABLE_1, get_tuple(1), job_desc,
{});
} catch (const ignite_error &e) {
EXPECT_THAT(e.what_str(), ::testing::HasSubstr("Deployment
unit unknown:1.2.3 doesn't exist"));
throw;
@@ -363,7 +393,12 @@ TEST_F(compute_test,
execute_colocated_unknown_unit_and_version) {
}
TEST_F(compute_test, broadcast_unknown_unit_and_version) {
- auto res = m_client.get_compute().submit_broadcast({get_node(1)},
{{"unknown", "1.2.3"}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{"unknown", "1.2.3"}})
+ .build();
+
+ auto res = m_client.get_compute().submit_broadcast({get_node(1)},
job_desc, {});
ASSERT_EQ(res.size(), 1);
@@ -376,7 +411,12 @@ TEST_F(compute_test, execute_empty_unit_name) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit({get_node(1)}, {{""}},
NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{""}})
+ .build();
+
+ (void) m_client.get_compute().submit({get_node(1)}, job_desc,
{});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit name can not be empty",
e.what_str());
throw;
@@ -389,7 +429,12 @@ TEST_F(compute_test, execute_empty_unit_version) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit({get_node(1)}, {{"some",
""}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{"some", ""}})
+ .build();
+
+ (void) m_client.get_compute().submit({get_node(1)}, job_desc,
{});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit version can not be empty",
e.what_str());
throw;
@@ -402,7 +447,12 @@ TEST_F(compute_test, broadcast_empty_unit_name) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit_broadcast({get_node(1)},
{{""}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{""}})
+ .build();
+
+ (void) m_client.get_compute().submit_broadcast({get_node(1)},
job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit name can not be empty",
e.what_str());
throw;
@@ -415,7 +465,12 @@ TEST_F(compute_test, broadcast_empty_unit_version) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit_broadcast({get_node(1)},
{{"some", ""}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{"some", ""}})
+ .build();
+
+ (void) m_client.get_compute().submit_broadcast({get_node(1)},
job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit version can not be empty",
e.what_str());
throw;
@@ -428,7 +483,12 @@ TEST_F(compute_test, execute_colocated_empty_unit_name) {
EXPECT_THROW(
{
try {
- (void) m_client.get_compute().submit_colocated(TABLE_1,
get_tuple(1), {{""}}, NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{""}})
+ .build();
+
+ (void) m_client.get_compute().submit_colocated(TABLE_1,
get_tuple(1), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit name can not be empty",
e.what_str());
throw;
@@ -442,7 +502,12 @@ TEST_F(compute_test, execute_colocated_empty_unit_version)
{
{
try {
auto comp = m_client.get_compute();
- comp.submit_colocated(TABLE_1, get_tuple(1), {{"some", ""}},
NODE_NAME_JOB, {}, {});
+ auto job_desc = job_descriptor::builder()
+ .set_job_class_name(NODE_NAME_JOB)
+ .set_deployment_units({{"some", ""}})
+ .build();
+
+ comp.submit_colocated(TABLE_1, get_tuple(1), job_desc, {});
} catch (const ignite_error &e) {
EXPECT_EQ("Deployment unit version can not be empty",
e.what_str());
throw;
@@ -454,7 +519,7 @@ TEST_F(compute_test, execute_colocated_empty_unit_version) {
TEST_F(compute_test, job_execution_status_executing) {
const std::int32_t sleep_ms = 3000;
- auto execution = m_client.get_compute().submit({get_node(1)}, {},
SLEEP_JOB, {sleep_ms}, {});
+ auto execution = m_client.get_compute().submit({get_node(1)},
m_sleep_job_desc, {sleep_ms});
auto state = execution.get_state();
@@ -465,7 +530,7 @@ TEST_F(compute_test, job_execution_status_executing) {
TEST_F(compute_test, DISABLED_job_execution_status_completed) {
const std::int32_t sleep_ms = 1;
- auto execution = m_client.get_compute().submit({get_node(1)}, {},
SLEEP_JOB, {sleep_ms}, {});
+ auto execution = m_client.get_compute().submit({get_node(1)},
m_sleep_job_desc, {sleep_ms});
execution.get_result();
auto state = execution.get_state();
@@ -475,7 +540,7 @@ TEST_F(compute_test,
DISABLED_job_execution_status_completed) {
}
TEST_F(compute_test, job_execution_status_failed) {
- auto execution = m_client.get_compute().submit({get_node(1)}, {},
ERROR_JOB, {"unused"}, {});
+ auto execution = m_client.get_compute().submit({get_node(1)},
m_error_job_desc, {"unused"});
EXPECT_THROW(
{
@@ -497,7 +562,7 @@ TEST_F(compute_test, job_execution_status_failed) {
TEST_F(compute_test, job_execution_cancel) {
const std::int32_t sleep_ms = 5000;
- auto execution = m_client.get_compute().submit({get_node(1)}, {},
SLEEP_JOB, {sleep_ms}, {});
+ auto execution = m_client.get_compute().submit({get_node(1)},
m_sleep_job_desc, {sleep_ms});
execution.cancel();
auto state = execution.get_state();
@@ -509,7 +574,7 @@ TEST_F(compute_test, job_execution_cancel) {
TEST_F(compute_test, job_execution_change_priority) {
const std::int32_t sleep_ms = 5000;
- auto execution = m_client.get_compute().submit({get_node(1)}, {},
SLEEP_JOB, {sleep_ms}, {});
+ auto execution = m_client.get_compute().submit({get_node(1)},
m_sleep_job_desc, {sleep_ms});
auto res = execution.change_priority(123);
EXPECT_EQ(res, job_execution::operation_result::INVALID_STATE);
diff --git
a/modules/platforms/cpp/tests/client-test/schema_synchronization_test.cpp
b/modules/platforms/cpp/tests/client-test/schema_synchronization_test.cpp
index 001f87e10d..42eb8fcdc6 100644
--- a/modules/platforms/cpp/tests/client-test/schema_synchronization_test.cpp
+++ b/modules/platforms/cpp/tests/client-test/schema_synchronization_test.cpp
@@ -98,8 +98,9 @@ TEST_F(schema_synchronization_test,
upsert_add_column_compute) {
tuple_view.upsert(nullptr, val1);
m_client.get_sql().execute(nullptr, {"ALTER TABLE SCHEMA_SYN_TEST ADD
COLUMN VAL2 INT"}, {});
+ auto descriptor =
job_descriptor::builder().set_job_class_name(NODE_NAME_JOB).build();
- m_client.get_compute().submit_colocated("SCHEMA_SYN_TEST", {key}, {},
NODE_NAME_JOB, {}, {}).get_result();
+ m_client.get_compute().submit_colocated("SCHEMA_SYN_TEST", {key},
descriptor, {}).get_result();
}
TEST_F(schema_synchronization_test, upsert_add_column_upsert_all) {
diff --git a/modules/platforms/cpp/tests/test-common/basic_auth_test_suite.h
b/modules/platforms/cpp/tests/test-common/basic_auth_test_suite.h
index d81ecf5dee..8f2c8e1074 100644
--- a/modules/platforms/cpp/tests/test-common/basic_auth_test_suite.h
+++ b/modules/platforms/cpp/tests/test-common/basic_auth_test_suite.h
@@ -97,7 +97,9 @@ public:
auto client = ignite_client::start(cfg, std::chrono::seconds(30));
auto nodes = client.get_cluster_nodes();
- client.get_compute().submit(nodes, {}, ENABLE_AUTHN_JOB, {enable ?
1 : 0}, {}).get_result();
+ auto descriptor =
job_descriptor::builder().set_job_class_name(ENABLE_AUTHN_JOB).build();
+
+ client.get_compute().submit(nodes, descriptor, {enable ? 1 :
0}).get_result();
} catch (const ignite_error &) {
// Ignore.
// As a result of this call, the client may be disconnected from
the server due to authn config change.